Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
41171 views
1
/*
2
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.BufferedReader;
29
import java.io.FilePermission;
30
import java.io.IOException;
31
import java.io.InputStream;
32
import java.io.InputStreamReader;
33
import java.nio.ByteBuffer;
34
import java.nio.channels.FileChannel;
35
import java.nio.charset.Charset;
36
import java.nio.file.OpenOption;
37
import java.nio.file.Path;
38
import java.security.AccessControlContext;
39
import java.security.AccessController;
40
import java.security.PrivilegedAction;
41
import java.security.PrivilegedActionException;
42
import java.security.PrivilegedExceptionAction;
43
import java.util.ArrayList;
44
import java.util.Iterator;
45
import java.util.List;
46
import java.util.Objects;
47
import java.util.Optional;
48
import java.util.concurrent.ArrayBlockingQueue;
49
import java.util.concurrent.BlockingQueue;
50
import java.util.concurrent.CompletableFuture;
51
import java.util.concurrent.CompletionStage;
52
import java.util.concurrent.Executor;
53
import java.util.concurrent.Flow;
54
import java.util.concurrent.Flow.Subscriber;
55
import java.util.concurrent.Flow.Subscription;
56
import java.util.concurrent.atomic.AtomicBoolean;
57
import java.util.concurrent.atomic.AtomicReference;
58
import java.util.function.Consumer;
59
import java.util.function.Function;
60
import java.util.stream.Stream;
61
import java.net.http.HttpResponse.BodySubscriber;
62
import jdk.internal.net.http.common.Log;
63
import jdk.internal.net.http.common.Logger;
64
import jdk.internal.net.http.common.MinimalFuture;
65
import jdk.internal.net.http.common.Utils;
66
import static java.nio.charset.StandardCharsets.UTF_8;
67
68
public class ResponseSubscribers {
69
70
/**
71
* This interface is used by our BodySubscriber implementations to
72
* declare whether calling getBody() inline is safe, or whether
73
* it needs to be called asynchronously in an executor thread.
74
* Calling getBody() inline is usually safe except when it
75
* might block - which can be the case if the BodySubscriber
76
* is provided by custom code, or if it uses a finisher that
77
* might be called and might block before the last bit is
78
* received (for instance, if a mapping subscriber is used with
79
* a mapper function that maps an InputStream to a GZIPInputStream,
80
* as the the constructor of GZIPInputStream calls read()).
81
* @param <T> The response type.
82
*/
83
public interface TrustedSubscriber<T> extends BodySubscriber<T> {
84
/**
85
* Returns true if getBody() should be called asynchronously.
86
* @implSpec The default implementation of this method returns
87
* false.
88
* @return true if getBody() should be called asynchronously.
89
*/
90
default boolean needsExecutor() { return false;}
91
92
/**
93
* Returns true if calling {@code bs::getBody} might block
94
* and requires an executor.
95
*
96
* @implNote
97
* In particular this method returns
98
* true if {@code bs} is not a {@code TrustedSubscriber}.
99
* If it is a {@code TrustedSubscriber}, it returns
100
* {@code ((TrustedSubscriber) bs).needsExecutor()}.
101
*
102
* @param bs A BodySubscriber.
103
* @return true if calling {@code bs::getBody} requires using
104
* an executor.
105
*/
106
static boolean needsExecutor(BodySubscriber<?> bs) {
107
if (bs instanceof TrustedSubscriber) {
108
return ((TrustedSubscriber) bs).needsExecutor();
109
} else return true;
110
}
111
}
112
113
public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
114
private final Consumer<Optional<byte[]>> consumer;
115
private Flow.Subscription subscription;
116
private final CompletableFuture<Void> result = new MinimalFuture<>();
117
private final AtomicBoolean subscribed = new AtomicBoolean();
118
119
public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
120
this.consumer = Objects.requireNonNull(consumer);
121
}
122
123
@Override
124
public CompletionStage<Void> getBody() {
125
return result;
126
}
127
128
@Override
129
public void onSubscribe(Flow.Subscription subscription) {
130
Objects.requireNonNull(subscription);
131
if (!subscribed.compareAndSet(false, true)) {
132
subscription.cancel();
133
} else {
134
this.subscription = subscription;
135
subscription.request(1);
136
}
137
}
138
139
@Override
140
public void onNext(List<ByteBuffer> items) {
141
Objects.requireNonNull(items);
142
for (ByteBuffer item : items) {
143
byte[] buf = new byte[item.remaining()];
144
item.get(buf);
145
consumer.accept(Optional.of(buf));
146
}
147
subscription.request(1);
148
}
149
150
@Override
151
public void onError(Throwable throwable) {
152
Objects.requireNonNull(throwable);
153
result.completeExceptionally(throwable);
154
}
155
156
@Override
157
public void onComplete() {
158
consumer.accept(Optional.empty());
159
result.complete(null);
160
}
161
162
}
163
164
/**
165
* A Subscriber that writes the flow of data to a given file.
166
*
167
* Privileged actions are performed within a limited doPrivileged that only
168
* asserts the specific, write, file permissions that were checked during
169
* the construction of this PathSubscriber.
170
*/
171
public static class PathSubscriber implements TrustedSubscriber<Path> {
172
173
private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
174
175
private final Path file;
176
private final OpenOption[] options;
177
@SuppressWarnings("removal")
178
private final AccessControlContext acc;
179
private final FilePermission[] filePermissions;
180
private final boolean isDefaultFS;
181
private final CompletableFuture<Path> result = new MinimalFuture<>();
182
183
private final AtomicBoolean subscribed = new AtomicBoolean();
184
private volatile Flow.Subscription subscription;
185
private volatile FileChannel out;
186
187
private static final String pathForSecurityCheck(Path path) {
188
return path.toFile().getPath();
189
}
190
191
/**
192
* Factory for creating PathSubscriber.
193
*
194
* Permission checks are performed here before construction of the
195
* PathSubscriber. Permission checking and construction are deliberately
196
* and tightly co-located.
197
*/
198
public static PathSubscriber create(Path file,
199
List<OpenOption> options) {
200
@SuppressWarnings("removal")
201
SecurityManager sm = System.getSecurityManager();
202
FilePermission filePermission = null;
203
if (sm != null) {
204
try {
205
String fn = pathForSecurityCheck(file);
206
FilePermission writePermission = new FilePermission(fn, "write");
207
sm.checkPermission(writePermission);
208
filePermission = writePermission;
209
} catch (UnsupportedOperationException ignored) {
210
// path not associated with the default file system provider
211
}
212
}
213
214
assert filePermission == null || filePermission.getActions().equals("write");
215
@SuppressWarnings("removal")
216
AccessControlContext acc = sm != null ? AccessController.getContext() : null;
217
return new PathSubscriber(file, options, acc, filePermission);
218
}
219
220
// pp so handler implementations in the same package can construct
221
/*package-private*/ PathSubscriber(Path file,
222
List<OpenOption> options,
223
@SuppressWarnings("removal") AccessControlContext acc,
224
FilePermission... filePermissions) {
225
this.file = file;
226
this.options = options.stream().toArray(OpenOption[]::new);
227
this.acc = acc;
228
this.filePermissions = filePermissions == null || filePermissions[0] == null
229
? EMPTY_FILE_PERMISSIONS : filePermissions;
230
this.isDefaultFS = isDefaultFS(file);
231
}
232
233
private static boolean isDefaultFS(Path file) {
234
try {
235
file.toFile();
236
return true;
237
} catch (UnsupportedOperationException uoe) {
238
return false;
239
}
240
}
241
242
@SuppressWarnings("removal")
243
@Override
244
public void onSubscribe(Flow.Subscription subscription) {
245
Objects.requireNonNull(subscription);
246
if (!subscribed.compareAndSet(false, true)) {
247
subscription.cancel();
248
return;
249
}
250
251
this.subscription = subscription;
252
if (acc == null) {
253
try {
254
out = FileChannel.open(file, options);
255
} catch (IOException ioe) {
256
result.completeExceptionally(ioe);
257
subscription.cancel();
258
return;
259
}
260
} else {
261
try {
262
PrivilegedExceptionAction<FileChannel> pa =
263
() -> FileChannel.open(file, options);
264
out = isDefaultFS
265
? AccessController.doPrivileged(pa, acc, filePermissions)
266
: AccessController.doPrivileged(pa, acc);
267
} catch (PrivilegedActionException pae) {
268
Throwable t = pae.getCause() != null ? pae.getCause() : pae;
269
result.completeExceptionally(t);
270
subscription.cancel();
271
return;
272
} catch (Exception e) {
273
result.completeExceptionally(e);
274
subscription.cancel();
275
return;
276
}
277
}
278
subscription.request(1);
279
}
280
281
@Override
282
public void onNext(List<ByteBuffer> items) {
283
try {
284
out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
285
} catch (IOException ex) {
286
close();
287
subscription.cancel();
288
result.completeExceptionally(ex);
289
}
290
subscription.request(1);
291
}
292
293
@Override
294
public void onError(Throwable e) {
295
result.completeExceptionally(e);
296
close();
297
}
298
299
@Override
300
public void onComplete() {
301
close();
302
result.complete(file);
303
}
304
305
@Override
306
public CompletionStage<Path> getBody() {
307
return result;
308
}
309
310
@SuppressWarnings("removal")
311
private void close() {
312
if (acc == null) {
313
Utils.close(out);
314
} else {
315
PrivilegedAction<Void> pa = () -> {
316
Utils.close(out);
317
return null;
318
};
319
if (isDefaultFS) {
320
AccessController.doPrivileged(pa, acc, filePermissions);
321
} else {
322
AccessController.doPrivileged(pa, acc);
323
}
324
}
325
}
326
}
327
328
public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
329
private final Function<byte[], T> finisher;
330
private final CompletableFuture<T> result = new MinimalFuture<>();
331
private final List<ByteBuffer> received = new ArrayList<>();
332
333
private volatile Flow.Subscription subscription;
334
335
public ByteArraySubscriber(Function<byte[],T> finisher) {
336
this.finisher = finisher;
337
}
338
339
@Override
340
public void onSubscribe(Flow.Subscription subscription) {
341
if (this.subscription != null) {
342
subscription.cancel();
343
return;
344
}
345
this.subscription = subscription;
346
// We can handle whatever you've got
347
subscription.request(Long.MAX_VALUE);
348
}
349
350
@Override
351
public void onNext(List<ByteBuffer> items) {
352
// incoming buffers are allocated by http client internally,
353
// and won't be used anywhere except this place.
354
// So it's free simply to store them for further processing.
355
assert Utils.hasRemaining(items);
356
received.addAll(items);
357
}
358
359
@Override
360
public void onError(Throwable throwable) {
361
received.clear();
362
result.completeExceptionally(throwable);
363
}
364
365
static private byte[] join(List<ByteBuffer> bytes) {
366
int size = Utils.remaining(bytes, Integer.MAX_VALUE);
367
byte[] res = new byte[size];
368
int from = 0;
369
for (ByteBuffer b : bytes) {
370
int l = b.remaining();
371
b.get(res, from, l);
372
from += l;
373
}
374
return res;
375
}
376
377
@Override
378
public void onComplete() {
379
try {
380
result.complete(finisher.apply(join(received)));
381
received.clear();
382
} catch (IllegalArgumentException e) {
383
result.completeExceptionally(e);
384
}
385
}
386
387
@Override
388
public CompletionStage<T> getBody() {
389
return result;
390
}
391
}
392
393
/**
394
* An InputStream built on top of the Flow API.
395
*/
396
public static class HttpResponseInputStream extends InputStream
397
implements TrustedSubscriber<InputStream>
398
{
399
final static int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer
400
401
// An immutable ByteBuffer sentinel to mark that the last byte was received.
402
private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
403
private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
404
private static final Logger debug =
405
Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
406
407
// A queue of yet unprocessed ByteBuffers received from the flow API.
408
private final BlockingQueue<List<ByteBuffer>> buffers;
409
private volatile Flow.Subscription subscription;
410
private volatile boolean closed;
411
private volatile Throwable failed;
412
private volatile Iterator<ByteBuffer> currentListItr;
413
private volatile ByteBuffer currentBuffer;
414
private final AtomicBoolean subscribed = new AtomicBoolean();
415
416
public HttpResponseInputStream() {
417
this(MAX_BUFFERS_IN_QUEUE);
418
}
419
420
HttpResponseInputStream(int maxBuffers) {
421
int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
422
// 1 additional slot needed for LAST_LIST added by onComplete
423
this.buffers = new ArrayBlockingQueue<>(capacity + 1);
424
}
425
426
@Override
427
public CompletionStage<InputStream> getBody() {
428
// Returns the stream immediately, before the
429
// response body is received.
430
// This makes it possible for sendAsync().get().body()
431
// to complete before the response body is received.
432
return CompletableFuture.completedStage(this);
433
}
434
435
// Returns the current byte buffer to read from.
436
// If the current buffer has no remaining data, this method will take the
437
// next buffer from the buffers queue, possibly blocking until
438
// a new buffer is made available through the Flow API, or the
439
// end of the flow has been reached.
440
private ByteBuffer current() throws IOException {
441
while (currentBuffer == null || !currentBuffer.hasRemaining()) {
442
// Check whether the stream is closed or exhausted
443
if (closed || failed != null) {
444
throw new IOException("closed", failed);
445
}
446
if (currentBuffer == LAST_BUFFER) break;
447
448
try {
449
if (currentListItr == null || !currentListItr.hasNext()) {
450
// Take a new list of buffers from the queue, blocking
451
// if none is available yet...
452
453
if (debug.on()) debug.log("Taking list of Buffers");
454
List<ByteBuffer> lb = buffers.take();
455
currentListItr = lb.iterator();
456
if (debug.on()) debug.log("List of Buffers Taken");
457
458
// Check whether an exception was encountered upstream
459
if (closed || failed != null)
460
throw new IOException("closed", failed);
461
462
// Check whether we're done.
463
if (lb == LAST_LIST) {
464
currentListItr = null;
465
currentBuffer = LAST_BUFFER;
466
break;
467
}
468
469
// Request another upstream item ( list of buffers )
470
Flow.Subscription s = subscription;
471
if (s != null) {
472
if (debug.on()) debug.log("Increased demand by 1");
473
s.request(1);
474
}
475
assert currentListItr != null;
476
if (lb.isEmpty()) continue;
477
}
478
assert currentListItr != null;
479
assert currentListItr.hasNext();
480
if (debug.on()) debug.log("Next Buffer");
481
currentBuffer = currentListItr.next();
482
} catch (InterruptedException ex) {
483
// continue
484
}
485
}
486
assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
487
return currentBuffer;
488
}
489
490
@Override
491
public int read(byte[] bytes, int off, int len) throws IOException {
492
Objects.checkFromIndexSize(off, len, bytes.length);
493
if (len == 0) {
494
return 0;
495
}
496
// get the buffer to read from, possibly blocking if
497
// none is available
498
ByteBuffer buffer;
499
if ((buffer = current()) == LAST_BUFFER) return -1;
500
501
// don't attempt to read more than what is available
502
// in the current buffer.
503
int read = Math.min(buffer.remaining(), len);
504
assert read > 0 && read <= buffer.remaining();
505
506
// buffer.get() will do the boundary check for us.
507
buffer.get(bytes, off, read);
508
return read;
509
}
510
511
@Override
512
public int read() throws IOException {
513
ByteBuffer buffer;
514
if ((buffer = current()) == LAST_BUFFER) return -1;
515
return buffer.get() & 0xFF;
516
}
517
518
@Override
519
public int available() throws IOException {
520
// best effort: returns the number of remaining bytes in
521
// the current buffer if any, or 1 if the current buffer
522
// is null or empty but the queue or current buffer list
523
// are not empty. Returns 0 otherwise.
524
if (closed) return 0;
525
int available = 0;
526
ByteBuffer current = currentBuffer;
527
if (current == LAST_BUFFER) return 0;
528
if (current != null) available = current.remaining();
529
if (available != 0) return available;
530
Iterator<?> iterator = currentListItr;
531
if (iterator != null && iterator.hasNext()) return 1;
532
if (buffers.isEmpty()) return 0;
533
return 1;
534
}
535
536
@Override
537
public void onSubscribe(Flow.Subscription s) {
538
Objects.requireNonNull(s);
539
try {
540
if (!subscribed.compareAndSet(false, true)) {
541
s.cancel();
542
} else {
543
// check whether the stream is already closed.
544
// if so, we should cancel the subscription
545
// immediately.
546
boolean closed;
547
synchronized (this) {
548
closed = this.closed;
549
if (!closed) {
550
this.subscription = s;
551
}
552
}
553
if (closed) {
554
s.cancel();
555
return;
556
}
557
assert buffers.remainingCapacity() > 1; // should contain at least 2
558
if (debug.on())
559
debug.log("onSubscribe: requesting "
560
+ Math.max(1, buffers.remainingCapacity() - 1));
561
s.request(Math.max(1, buffers.remainingCapacity() - 1));
562
}
563
} catch (Throwable t) {
564
failed = t;
565
try {
566
close();
567
} catch (IOException x) {
568
// OK
569
} finally {
570
onError(t);
571
}
572
}
573
}
574
575
@Override
576
public void onNext(List<ByteBuffer> t) {
577
Objects.requireNonNull(t);
578
try {
579
if (debug.on()) debug.log("next item received");
580
if (!buffers.offer(t)) {
581
throw new IllegalStateException("queue is full");
582
}
583
if (debug.on()) debug.log("item offered");
584
} catch (Throwable ex) {
585
failed = ex;
586
try {
587
close();
588
} catch (IOException ex1) {
589
// OK
590
} finally {
591
onError(ex);
592
}
593
}
594
}
595
596
@Override
597
public void onError(Throwable thrwbl) {
598
subscription = null;
599
failed = Objects.requireNonNull(thrwbl);
600
// The client process that reads the input stream might
601
// be blocked in queue.take().
602
// Tries to offer LAST_LIST to the queue. If the queue is
603
// full we don't care if we can't insert this buffer, as
604
// the client can't be blocked in queue.take() in that case.
605
// Adding LAST_LIST to the queue is harmless, as the client
606
// should find failed != null before handling LAST_LIST.
607
buffers.offer(LAST_LIST);
608
}
609
610
@Override
611
public void onComplete() {
612
subscription = null;
613
onNext(LAST_LIST);
614
}
615
616
@Override
617
public void close() throws IOException {
618
Flow.Subscription s;
619
synchronized (this) {
620
if (closed) return;
621
closed = true;
622
s = subscription;
623
subscription = null;
624
}
625
// s will be null if already completed
626
try {
627
if (s != null) {
628
s.cancel();
629
}
630
} finally {
631
buffers.offer(LAST_LIST);
632
super.close();
633
}
634
}
635
636
}
637
638
public static BodySubscriber<Stream<String>> createLineStream() {
639
return createLineStream(UTF_8);
640
}
641
642
public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
643
Objects.requireNonNull(charset);
644
BodySubscriber<InputStream> s = new HttpResponseInputStream();
645
// Creates a MappingSubscriber with a trusted finisher that is
646
// trusted not to block.
647
return new MappingSubscriber<InputStream,Stream<String>>(s,
648
(InputStream stream) -> {
649
return new BufferedReader(new InputStreamReader(stream, charset))
650
.lines().onClose(() -> Utils.close(stream));
651
}, true);
652
}
653
654
/**
655
* Currently this consumes all of the data and ignores it
656
*/
657
public static class NullSubscriber<T> implements TrustedSubscriber<T> {
658
659
private final CompletableFuture<T> cf = new MinimalFuture<>();
660
private final Optional<T> result;
661
private final AtomicBoolean subscribed = new AtomicBoolean();
662
663
public NullSubscriber(Optional<T> result) {
664
this.result = result;
665
}
666
667
@Override
668
public void onSubscribe(Flow.Subscription subscription) {
669
Objects.requireNonNull(subscription);
670
if (!subscribed.compareAndSet(false, true)) {
671
subscription.cancel();
672
} else {
673
subscription.request(Long.MAX_VALUE);
674
}
675
}
676
677
@Override
678
public void onNext(List<ByteBuffer> items) {
679
Objects.requireNonNull(items);
680
}
681
682
@Override
683
public void onError(Throwable throwable) {
684
Objects.requireNonNull(throwable);
685
cf.completeExceptionally(throwable);
686
}
687
688
@Override
689
public void onComplete() {
690
if (result.isPresent()) {
691
cf.complete(result.get());
692
} else {
693
cf.complete(null);
694
}
695
}
696
697
@Override
698
public CompletionStage<T> getBody() {
699
return cf;
700
}
701
}
702
703
/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
704
public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
705
implements TrustedSubscriber<R>
706
{
707
private final CompletableFuture<R> cf = new MinimalFuture<>();
708
private final S subscriber;
709
private final Function<? super S,? extends R> finisher;
710
private volatile Subscription subscription;
711
712
// The finisher isn't called until all bytes have been received,
713
// and so shouldn't need an executor. No need to override
714
// TrustedSubscriber::needsExecutor
715
public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
716
this.subscriber = Objects.requireNonNull(subscriber);
717
this.finisher = Objects.requireNonNull(finisher);
718
}
719
720
@Override
721
public void onSubscribe(Subscription subscription) {
722
Objects.requireNonNull(subscription);
723
if (this.subscription != null) {
724
subscription.cancel();
725
} else {
726
this.subscription = subscription;
727
subscriber.onSubscribe(subscription);
728
}
729
}
730
731
@Override
732
public void onNext(List<ByteBuffer> item) {
733
Objects.requireNonNull(item);
734
try {
735
subscriber.onNext(item);
736
} catch (Throwable throwable) {
737
subscription.cancel();
738
onError(throwable);
739
}
740
}
741
742
@Override
743
public void onError(Throwable throwable) {
744
Objects.requireNonNull(throwable);
745
try {
746
subscriber.onError(throwable);
747
} finally {
748
cf.completeExceptionally(throwable);
749
}
750
}
751
752
@Override
753
public void onComplete() {
754
try {
755
subscriber.onComplete();
756
} finally {
757
try {
758
cf.complete(finisher.apply(subscriber));
759
} catch (Throwable throwable) {
760
cf.completeExceptionally(throwable);
761
}
762
}
763
}
764
765
@Override
766
public CompletionStage<R> getBody() {
767
return cf;
768
}
769
}
770
771
/**
772
* A body subscriber which receives input from an upstream subscriber
773
* and maps that subscriber's body type to a new type. The upstream subscriber
774
* delegates all flow operations directly to this object. The
775
* {@link CompletionStage} returned by {@link #getBody()}} takes the output
776
* of the upstream {@code getBody()} and applies the mapper function to
777
* obtain the new {@code CompletionStage} type.
778
*
779
* @param <T> the upstream body type
780
* @param <U> this subscriber's body type
781
*/
782
public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
783
private final BodySubscriber<T> upstream;
784
private final Function<? super T,? extends U> mapper;
785
private final boolean trusted;
786
787
public MappingSubscriber(BodySubscriber<T> upstream,
788
Function<? super T,? extends U> mapper) {
789
this(upstream, mapper, false);
790
}
791
792
// creates a MappingSubscriber with a mapper that is trusted
793
// to not block when called.
794
MappingSubscriber(BodySubscriber<T> upstream,
795
Function<? super T,? extends U> mapper,
796
boolean trusted) {
797
this.upstream = Objects.requireNonNull(upstream);
798
this.mapper = Objects.requireNonNull(mapper);
799
this.trusted = trusted;
800
}
801
802
// There is no way to know whether a custom mapper function
803
// might block or not - so we should return true unless the
804
// mapper is implemented and trusted by our own code not to
805
// block.
806
@Override
807
public boolean needsExecutor() {
808
return !trusted || TrustedSubscriber.needsExecutor(upstream);
809
}
810
811
// If upstream.getBody() is already completed (case of InputStream),
812
// then calling upstream.getBody().thenApply(mapper) might block
813
// if the mapper blocks. We should probably add a variant of
814
// MappingSubscriber that calls thenApplyAsync instead, but this
815
// needs a new public API point. See needsExecutor() above.
816
@Override
817
public CompletionStage<U> getBody() {
818
return upstream.getBody().thenApply(mapper);
819
}
820
821
@Override
822
public void onSubscribe(Flow.Subscription subscription) {
823
upstream.onSubscribe(subscription);
824
}
825
826
@Override
827
public void onNext(List<ByteBuffer> item) {
828
upstream.onNext(item);
829
}
830
831
@Override
832
public void onError(Throwable throwable) {
833
upstream.onError(throwable);
834
}
835
836
@Override
837
public void onComplete() {
838
upstream.onComplete();
839
}
840
}
841
842
// A BodySubscriber that returns a Publisher<List<ByteBuffer>>
843
static class PublishingBodySubscriber
844
implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
845
private final MinimalFuture<Flow.Subscription>
846
subscriptionCF = new MinimalFuture<>();
847
private final MinimalFuture<SubscriberRef>
848
subscribedCF = new MinimalFuture<>();
849
private AtomicReference<SubscriberRef>
850
subscriberRef = new AtomicReference<>();
851
private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
852
subscriptionCF.thenCompose(
853
(s) -> MinimalFuture.completedFuture(this::subscribe));
854
855
// We use the completionCF to ensure that only one of
856
// onError or onComplete is ever called.
857
private final MinimalFuture<Void> completionCF;
858
private PublishingBodySubscriber() {
859
completionCF = new MinimalFuture<>();
860
completionCF.whenComplete(
861
(r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
862
}
863
864
// An object that holds a reference to a Flow.Subscriber.
865
// The reference is cleared when the subscriber is completed - either
866
// normally or exceptionally, or when the subscription is cancelled.
867
static final class SubscriberRef {
868
volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
869
SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
870
ref = subscriber;
871
}
872
Flow.Subscriber<? super List<ByteBuffer>> get() {
873
return ref;
874
}
875
Flow.Subscriber<? super List<ByteBuffer>> clear() {
876
Flow.Subscriber<? super List<ByteBuffer>> res = ref;
877
ref = null;
878
return res;
879
}
880
}
881
882
// A subscription that wraps an upstream subscription and
883
// holds a reference to a subscriber. The subscriber reference
884
// is cleared when the subscription is cancelled
885
final static class SubscriptionRef implements Flow.Subscription {
886
final Flow.Subscription subscription;
887
final SubscriberRef subscriberRef;
888
SubscriptionRef(Flow.Subscription subscription,
889
SubscriberRef subscriberRef) {
890
this.subscription = subscription;
891
this.subscriberRef = subscriberRef;
892
}
893
@Override
894
public void request(long n) {
895
if (subscriberRef.get() != null) {
896
subscription.request(n);
897
}
898
}
899
@Override
900
public void cancel() {
901
subscription.cancel();
902
subscriberRef.clear();
903
}
904
905
void subscribe() {
906
Subscriber<?> subscriber = subscriberRef.get();
907
if (subscriber != null) {
908
subscriber.onSubscribe(this);
909
}
910
}
911
912
@Override
913
public String toString() {
914
return "SubscriptionRef/"
915
+ subscription.getClass().getName()
916
+ "@"
917
+ System.identityHashCode(subscription);
918
}
919
}
920
921
// This is a callback for the subscribedCF.
922
// Do not call directly!
923
private void complete(SubscriberRef ref, Throwable t) {
924
assert ref != null;
925
Subscriber<?> s = ref.clear();
926
// maybe null if subscription was cancelled
927
if (s == null) return;
928
if (t == null) {
929
try {
930
s.onComplete();
931
} catch (Throwable x) {
932
s.onError(x);
933
}
934
} else {
935
s.onError(t);
936
}
937
}
938
939
private void signalError(Throwable err) {
940
if (err == null) {
941
err = new NullPointerException("null throwable");
942
}
943
completionCF.completeExceptionally(err);
944
}
945
946
private void signalComplete() {
947
completionCF.complete(null);
948
}
949
950
private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
951
Objects.requireNonNull(subscriber, "subscriber must not be null");
952
SubscriberRef ref = new SubscriberRef(subscriber);
953
if (subscriberRef.compareAndSet(null, ref)) {
954
subscriptionCF.thenAccept((s) -> {
955
SubscriptionRef subscription = new SubscriptionRef(s,ref);
956
try {
957
subscription.subscribe();
958
subscribedCF.complete(ref);
959
} catch (Throwable t) {
960
if (Log.errors()) {
961
Log.logError("Failed to call onSubscribe: " +
962
"cancelling subscription: " + t);
963
Log.logError(t);
964
}
965
subscription.cancel();
966
}
967
});
968
} else {
969
subscriber.onSubscribe(new Flow.Subscription() {
970
@Override public void request(long n) { }
971
@Override public void cancel() { }
972
});
973
subscriber.onError(new IllegalStateException(
974
"This publisher has already one subscriber"));
975
}
976
}
977
978
private final AtomicBoolean subscribed = new AtomicBoolean();
979
980
@Override
981
public void onSubscribe(Flow.Subscription subscription) {
982
Objects.requireNonNull(subscription);
983
if (!subscribed.compareAndSet(false, true)) {
984
subscription.cancel();
985
} else {
986
subscriptionCF.complete(subscription);
987
}
988
}
989
990
@Override
991
public void onNext(List<ByteBuffer> item) {
992
Objects.requireNonNull(item);
993
try {
994
// cannot be called before onSubscribe()
995
assert subscriptionCF.isDone();
996
SubscriberRef ref = subscriberRef.get();
997
// cannot be called before subscriber calls request(1)
998
assert ref != null;
999
Flow.Subscriber<? super List<ByteBuffer>>
1000
subscriber = ref.get();
1001
if (subscriber != null) {
1002
// may be null if subscription was cancelled.
1003
subscriber.onNext(item);
1004
}
1005
} catch (Throwable err) {
1006
signalError(err);
1007
subscriptionCF.thenAccept(s -> s.cancel());
1008
}
1009
}
1010
1011
@Override
1012
public void onError(Throwable throwable) {
1013
// cannot be called before onSubscribe();
1014
assert suppress(subscriptionCF.isDone(),
1015
"onError called before onSubscribe",
1016
throwable);
1017
// onError can be called before request(1), and therefore can
1018
// be called before subscriberRef is set.
1019
signalError(throwable);
1020
Objects.requireNonNull(throwable);
1021
}
1022
1023
@Override
1024
public void onComplete() {
1025
// cannot be called before onSubscribe()
1026
if (!subscriptionCF.isDone()) {
1027
signalError(new InternalError(
1028
"onComplete called before onSubscribed"));
1029
} else {
1030
// onComplete can be called before request(1),
1031
// and therefore can be called before subscriberRef
1032
// is set.
1033
signalComplete();
1034
}
1035
}
1036
1037
@Override
1038
public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
1039
return body;
1040
}
1041
1042
private boolean suppress(boolean condition,
1043
String assertion,
1044
Throwable carrier) {
1045
if (!condition) {
1046
if (carrier != null) {
1047
carrier.addSuppressed(new AssertionError(assertion));
1048
} else if (Log.errors()) {
1049
Log.logError(new AssertionError(assertion));
1050
}
1051
}
1052
return true;
1053
}
1054
1055
}
1056
1057
public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
1058
createPublisher() {
1059
return new PublishingBodySubscriber();
1060
}
1061
1062
1063
/**
1064
* Tries to determine whether bs::getBody must be invoked asynchronously,
1065
* and if so, uses the provided executor to do it.
1066
* If the executor is a {@link HttpClientImpl.DelegatingExecutor},
1067
* uses the executor's delegate.
1068
* @param e The executor to use if an executor is required.
1069
* @param bs The BodySubscriber (trusted or not)
1070
* @param <T> The type of the response.
1071
* @return A completion stage that completes when the completion
1072
* stage returned by bs::getBody completes. This may, or
1073
* may not, be the same completion stage.
1074
*/
1075
public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
1076
if (TrustedSubscriber.needsExecutor(bs)) {
1077
// getBody must be called in the executor
1078
return getBodyAsync(e, bs, new MinimalFuture<>());
1079
} else {
1080
// No executor needed
1081
return bs.getBody();
1082
}
1083
}
1084
1085
/**
1086
* Invokes bs::getBody using the provided executor.
1087
* If invoking bs::getBody requires an executor, and the given executor
1088
* is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1089
* delegate is used. If an error occurs anywhere then the given {code cf}
1090
* is completed exceptionally (this method does not throw).
1091
* @param e The executor that should be used to call bs::getBody
1092
* @param bs The BodySubscriber
1093
* @param cf A completable future that this function will set up
1094
* to complete when the completion stage returned by
1095
* bs::getBody completes.
1096
* In case of any error while trying to set up the
1097
* completion chain, {@code cf} will be completed
1098
* exceptionally with that error.
1099
* @param <T> The response type.
1100
* @return The provided {@code cf}.
1101
*/
1102
public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1103
BodySubscriber<T> bs,
1104
CompletableFuture<T> cf) {
1105
return getBodyAsync(e, bs, cf, cf::completeExceptionally);
1106
}
1107
1108
/**
1109
* Invokes bs::getBody using the provided executor.
1110
* If invoking bs::getBody requires an executor, and the given executor
1111
* is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1112
* delegate is used.
1113
* The provided {@code cf} is completed with the result (exceptional
1114
* or not) of the completion stage returned by bs::getBody.
1115
* If an error occurs when trying to set up the
1116
* completion chain, the provided {@code errorHandler} is invoked,
1117
* but {@code cf} is not necessarily affected.
1118
* This method does not throw.
1119
* @param e The executor that should be used to call bs::getBody
1120
* @param bs The BodySubscriber
1121
* @param cf A completable future that this function will set up
1122
* to complete when the completion stage returned by
1123
* bs::getBody completes.
1124
* In case of any error while trying to set up the
1125
* completion chain, {@code cf} will be completed
1126
* exceptionally with that error.
1127
* @param errorHandler The handler to invoke if an error is raised
1128
* while trying to set up the completion chain.
1129
* @param <T> The response type.
1130
* @return The provide {@code cf}. If the {@code errorHandler} is
1131
* invoked, it is the responsibility of the {@code errorHandler} to
1132
* complete the {@code cf}, if needed.
1133
*/
1134
public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1135
BodySubscriber<T> bs,
1136
CompletableFuture<T> cf,
1137
Consumer<Throwable> errorHandler) {
1138
assert errorHandler != null;
1139
try {
1140
assert e != null;
1141
assert cf != null;
1142
1143
if (TrustedSubscriber.needsExecutor(bs)) {
1144
e = (e instanceof HttpClientImpl.DelegatingExecutor)
1145
? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
1146
}
1147
1148
e.execute(() -> {
1149
try {
1150
bs.getBody().whenComplete((r, t) -> {
1151
if (t != null) {
1152
cf.completeExceptionally(t);
1153
} else {
1154
cf.complete(r);
1155
}
1156
});
1157
} catch (Throwable t) {
1158
errorHandler.accept(t);
1159
}
1160
});
1161
return cf;
1162
1163
} catch (Throwable t) {
1164
errorHandler.accept(t);
1165
}
1166
return cf;
1167
}
1168
}
1169
1170