Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
41171 views
/*1* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.BufferedReader;28import java.io.FilePermission;29import java.io.IOException;30import java.io.InputStream;31import java.io.InputStreamReader;32import java.nio.ByteBuffer;33import java.nio.channels.FileChannel;34import java.nio.charset.Charset;35import java.nio.file.OpenOption;36import java.nio.file.Path;37import java.security.AccessControlContext;38import java.security.AccessController;39import java.security.PrivilegedAction;40import java.security.PrivilegedActionException;41import java.security.PrivilegedExceptionAction;42import java.util.ArrayList;43import java.util.Iterator;44import java.util.List;45import java.util.Objects;46import java.util.Optional;47import java.util.concurrent.ArrayBlockingQueue;48import java.util.concurrent.BlockingQueue;49import java.util.concurrent.CompletableFuture;50import java.util.concurrent.CompletionStage;51import java.util.concurrent.Executor;52import java.util.concurrent.Flow;53import java.util.concurrent.Flow.Subscriber;54import java.util.concurrent.Flow.Subscription;55import java.util.concurrent.atomic.AtomicBoolean;56import java.util.concurrent.atomic.AtomicReference;57import java.util.function.Consumer;58import java.util.function.Function;59import java.util.stream.Stream;60import java.net.http.HttpResponse.BodySubscriber;61import jdk.internal.net.http.common.Log;62import jdk.internal.net.http.common.Logger;63import jdk.internal.net.http.common.MinimalFuture;64import jdk.internal.net.http.common.Utils;65import static java.nio.charset.StandardCharsets.UTF_8;6667public class ResponseSubscribers {6869/**70* This interface is used by our BodySubscriber implementations to71* declare whether calling getBody() inline is safe, or whether72* it needs to be called asynchronously in an executor thread.73* Calling getBody() inline is usually safe except when it74* might block - which can be the case if the BodySubscriber75* is provided by custom code, or if it uses a finisher that76* might be called and might block before the last bit is77* received (for instance, if a mapping subscriber is used with78* a mapper function that maps an InputStream to a GZIPInputStream,79* as the the constructor of GZIPInputStream calls read()).80* @param <T> The response type.81*/82public interface TrustedSubscriber<T> extends BodySubscriber<T> {83/**84* Returns true if getBody() should be called asynchronously.85* @implSpec The default implementation of this method returns86* false.87* @return true if getBody() should be called asynchronously.88*/89default boolean needsExecutor() { return false;}9091/**92* Returns true if calling {@code bs::getBody} might block93* and requires an executor.94*95* @implNote96* In particular this method returns97* true if {@code bs} is not a {@code TrustedSubscriber}.98* If it is a {@code TrustedSubscriber}, it returns99* {@code ((TrustedSubscriber) bs).needsExecutor()}.100*101* @param bs A BodySubscriber.102* @return true if calling {@code bs::getBody} requires using103* an executor.104*/105static boolean needsExecutor(BodySubscriber<?> bs) {106if (bs instanceof TrustedSubscriber) {107return ((TrustedSubscriber) bs).needsExecutor();108} else return true;109}110}111112public static class ConsumerSubscriber implements TrustedSubscriber<Void> {113private final Consumer<Optional<byte[]>> consumer;114private Flow.Subscription subscription;115private final CompletableFuture<Void> result = new MinimalFuture<>();116private final AtomicBoolean subscribed = new AtomicBoolean();117118public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {119this.consumer = Objects.requireNonNull(consumer);120}121122@Override123public CompletionStage<Void> getBody() {124return result;125}126127@Override128public void onSubscribe(Flow.Subscription subscription) {129Objects.requireNonNull(subscription);130if (!subscribed.compareAndSet(false, true)) {131subscription.cancel();132} else {133this.subscription = subscription;134subscription.request(1);135}136}137138@Override139public void onNext(List<ByteBuffer> items) {140Objects.requireNonNull(items);141for (ByteBuffer item : items) {142byte[] buf = new byte[item.remaining()];143item.get(buf);144consumer.accept(Optional.of(buf));145}146subscription.request(1);147}148149@Override150public void onError(Throwable throwable) {151Objects.requireNonNull(throwable);152result.completeExceptionally(throwable);153}154155@Override156public void onComplete() {157consumer.accept(Optional.empty());158result.complete(null);159}160161}162163/**164* A Subscriber that writes the flow of data to a given file.165*166* Privileged actions are performed within a limited doPrivileged that only167* asserts the specific, write, file permissions that were checked during168* the construction of this PathSubscriber.169*/170public static class PathSubscriber implements TrustedSubscriber<Path> {171172private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];173174private final Path file;175private final OpenOption[] options;176@SuppressWarnings("removal")177private final AccessControlContext acc;178private final FilePermission[] filePermissions;179private final boolean isDefaultFS;180private final CompletableFuture<Path> result = new MinimalFuture<>();181182private final AtomicBoolean subscribed = new AtomicBoolean();183private volatile Flow.Subscription subscription;184private volatile FileChannel out;185186private static final String pathForSecurityCheck(Path path) {187return path.toFile().getPath();188}189190/**191* Factory for creating PathSubscriber.192*193* Permission checks are performed here before construction of the194* PathSubscriber. Permission checking and construction are deliberately195* and tightly co-located.196*/197public static PathSubscriber create(Path file,198List<OpenOption> options) {199@SuppressWarnings("removal")200SecurityManager sm = System.getSecurityManager();201FilePermission filePermission = null;202if (sm != null) {203try {204String fn = pathForSecurityCheck(file);205FilePermission writePermission = new FilePermission(fn, "write");206sm.checkPermission(writePermission);207filePermission = writePermission;208} catch (UnsupportedOperationException ignored) {209// path not associated with the default file system provider210}211}212213assert filePermission == null || filePermission.getActions().equals("write");214@SuppressWarnings("removal")215AccessControlContext acc = sm != null ? AccessController.getContext() : null;216return new PathSubscriber(file, options, acc, filePermission);217}218219// pp so handler implementations in the same package can construct220/*package-private*/ PathSubscriber(Path file,221List<OpenOption> options,222@SuppressWarnings("removal") AccessControlContext acc,223FilePermission... filePermissions) {224this.file = file;225this.options = options.stream().toArray(OpenOption[]::new);226this.acc = acc;227this.filePermissions = filePermissions == null || filePermissions[0] == null228? EMPTY_FILE_PERMISSIONS : filePermissions;229this.isDefaultFS = isDefaultFS(file);230}231232private static boolean isDefaultFS(Path file) {233try {234file.toFile();235return true;236} catch (UnsupportedOperationException uoe) {237return false;238}239}240241@SuppressWarnings("removal")242@Override243public void onSubscribe(Flow.Subscription subscription) {244Objects.requireNonNull(subscription);245if (!subscribed.compareAndSet(false, true)) {246subscription.cancel();247return;248}249250this.subscription = subscription;251if (acc == null) {252try {253out = FileChannel.open(file, options);254} catch (IOException ioe) {255result.completeExceptionally(ioe);256subscription.cancel();257return;258}259} else {260try {261PrivilegedExceptionAction<FileChannel> pa =262() -> FileChannel.open(file, options);263out = isDefaultFS264? AccessController.doPrivileged(pa, acc, filePermissions)265: AccessController.doPrivileged(pa, acc);266} catch (PrivilegedActionException pae) {267Throwable t = pae.getCause() != null ? pae.getCause() : pae;268result.completeExceptionally(t);269subscription.cancel();270return;271} catch (Exception e) {272result.completeExceptionally(e);273subscription.cancel();274return;275}276}277subscription.request(1);278}279280@Override281public void onNext(List<ByteBuffer> items) {282try {283out.write(items.toArray(Utils.EMPTY_BB_ARRAY));284} catch (IOException ex) {285close();286subscription.cancel();287result.completeExceptionally(ex);288}289subscription.request(1);290}291292@Override293public void onError(Throwable e) {294result.completeExceptionally(e);295close();296}297298@Override299public void onComplete() {300close();301result.complete(file);302}303304@Override305public CompletionStage<Path> getBody() {306return result;307}308309@SuppressWarnings("removal")310private void close() {311if (acc == null) {312Utils.close(out);313} else {314PrivilegedAction<Void> pa = () -> {315Utils.close(out);316return null;317};318if (isDefaultFS) {319AccessController.doPrivileged(pa, acc, filePermissions);320} else {321AccessController.doPrivileged(pa, acc);322}323}324}325}326327public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {328private final Function<byte[], T> finisher;329private final CompletableFuture<T> result = new MinimalFuture<>();330private final List<ByteBuffer> received = new ArrayList<>();331332private volatile Flow.Subscription subscription;333334public ByteArraySubscriber(Function<byte[],T> finisher) {335this.finisher = finisher;336}337338@Override339public void onSubscribe(Flow.Subscription subscription) {340if (this.subscription != null) {341subscription.cancel();342return;343}344this.subscription = subscription;345// We can handle whatever you've got346subscription.request(Long.MAX_VALUE);347}348349@Override350public void onNext(List<ByteBuffer> items) {351// incoming buffers are allocated by http client internally,352// and won't be used anywhere except this place.353// So it's free simply to store them for further processing.354assert Utils.hasRemaining(items);355received.addAll(items);356}357358@Override359public void onError(Throwable throwable) {360received.clear();361result.completeExceptionally(throwable);362}363364static private byte[] join(List<ByteBuffer> bytes) {365int size = Utils.remaining(bytes, Integer.MAX_VALUE);366byte[] res = new byte[size];367int from = 0;368for (ByteBuffer b : bytes) {369int l = b.remaining();370b.get(res, from, l);371from += l;372}373return res;374}375376@Override377public void onComplete() {378try {379result.complete(finisher.apply(join(received)));380received.clear();381} catch (IllegalArgumentException e) {382result.completeExceptionally(e);383}384}385386@Override387public CompletionStage<T> getBody() {388return result;389}390}391392/**393* An InputStream built on top of the Flow API.394*/395public static class HttpResponseInputStream extends InputStream396implements TrustedSubscriber<InputStream>397{398final static int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer399400// An immutable ByteBuffer sentinel to mark that the last byte was received.401private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);402private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);403private static final Logger debug =404Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);405406// A queue of yet unprocessed ByteBuffers received from the flow API.407private final BlockingQueue<List<ByteBuffer>> buffers;408private volatile Flow.Subscription subscription;409private volatile boolean closed;410private volatile Throwable failed;411private volatile Iterator<ByteBuffer> currentListItr;412private volatile ByteBuffer currentBuffer;413private final AtomicBoolean subscribed = new AtomicBoolean();414415public HttpResponseInputStream() {416this(MAX_BUFFERS_IN_QUEUE);417}418419HttpResponseInputStream(int maxBuffers) {420int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);421// 1 additional slot needed for LAST_LIST added by onComplete422this.buffers = new ArrayBlockingQueue<>(capacity + 1);423}424425@Override426public CompletionStage<InputStream> getBody() {427// Returns the stream immediately, before the428// response body is received.429// This makes it possible for sendAsync().get().body()430// to complete before the response body is received.431return CompletableFuture.completedStage(this);432}433434// Returns the current byte buffer to read from.435// If the current buffer has no remaining data, this method will take the436// next buffer from the buffers queue, possibly blocking until437// a new buffer is made available through the Flow API, or the438// end of the flow has been reached.439private ByteBuffer current() throws IOException {440while (currentBuffer == null || !currentBuffer.hasRemaining()) {441// Check whether the stream is closed or exhausted442if (closed || failed != null) {443throw new IOException("closed", failed);444}445if (currentBuffer == LAST_BUFFER) break;446447try {448if (currentListItr == null || !currentListItr.hasNext()) {449// Take a new list of buffers from the queue, blocking450// if none is available yet...451452if (debug.on()) debug.log("Taking list of Buffers");453List<ByteBuffer> lb = buffers.take();454currentListItr = lb.iterator();455if (debug.on()) debug.log("List of Buffers Taken");456457// Check whether an exception was encountered upstream458if (closed || failed != null)459throw new IOException("closed", failed);460461// Check whether we're done.462if (lb == LAST_LIST) {463currentListItr = null;464currentBuffer = LAST_BUFFER;465break;466}467468// Request another upstream item ( list of buffers )469Flow.Subscription s = subscription;470if (s != null) {471if (debug.on()) debug.log("Increased demand by 1");472s.request(1);473}474assert currentListItr != null;475if (lb.isEmpty()) continue;476}477assert currentListItr != null;478assert currentListItr.hasNext();479if (debug.on()) debug.log("Next Buffer");480currentBuffer = currentListItr.next();481} catch (InterruptedException ex) {482// continue483}484}485assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();486return currentBuffer;487}488489@Override490public int read(byte[] bytes, int off, int len) throws IOException {491Objects.checkFromIndexSize(off, len, bytes.length);492if (len == 0) {493return 0;494}495// get the buffer to read from, possibly blocking if496// none is available497ByteBuffer buffer;498if ((buffer = current()) == LAST_BUFFER) return -1;499500// don't attempt to read more than what is available501// in the current buffer.502int read = Math.min(buffer.remaining(), len);503assert read > 0 && read <= buffer.remaining();504505// buffer.get() will do the boundary check for us.506buffer.get(bytes, off, read);507return read;508}509510@Override511public int read() throws IOException {512ByteBuffer buffer;513if ((buffer = current()) == LAST_BUFFER) return -1;514return buffer.get() & 0xFF;515}516517@Override518public int available() throws IOException {519// best effort: returns the number of remaining bytes in520// the current buffer if any, or 1 if the current buffer521// is null or empty but the queue or current buffer list522// are not empty. Returns 0 otherwise.523if (closed) return 0;524int available = 0;525ByteBuffer current = currentBuffer;526if (current == LAST_BUFFER) return 0;527if (current != null) available = current.remaining();528if (available != 0) return available;529Iterator<?> iterator = currentListItr;530if (iterator != null && iterator.hasNext()) return 1;531if (buffers.isEmpty()) return 0;532return 1;533}534535@Override536public void onSubscribe(Flow.Subscription s) {537Objects.requireNonNull(s);538try {539if (!subscribed.compareAndSet(false, true)) {540s.cancel();541} else {542// check whether the stream is already closed.543// if so, we should cancel the subscription544// immediately.545boolean closed;546synchronized (this) {547closed = this.closed;548if (!closed) {549this.subscription = s;550}551}552if (closed) {553s.cancel();554return;555}556assert buffers.remainingCapacity() > 1; // should contain at least 2557if (debug.on())558debug.log("onSubscribe: requesting "559+ Math.max(1, buffers.remainingCapacity() - 1));560s.request(Math.max(1, buffers.remainingCapacity() - 1));561}562} catch (Throwable t) {563failed = t;564try {565close();566} catch (IOException x) {567// OK568} finally {569onError(t);570}571}572}573574@Override575public void onNext(List<ByteBuffer> t) {576Objects.requireNonNull(t);577try {578if (debug.on()) debug.log("next item received");579if (!buffers.offer(t)) {580throw new IllegalStateException("queue is full");581}582if (debug.on()) debug.log("item offered");583} catch (Throwable ex) {584failed = ex;585try {586close();587} catch (IOException ex1) {588// OK589} finally {590onError(ex);591}592}593}594595@Override596public void onError(Throwable thrwbl) {597subscription = null;598failed = Objects.requireNonNull(thrwbl);599// The client process that reads the input stream might600// be blocked in queue.take().601// Tries to offer LAST_LIST to the queue. If the queue is602// full we don't care if we can't insert this buffer, as603// the client can't be blocked in queue.take() in that case.604// Adding LAST_LIST to the queue is harmless, as the client605// should find failed != null before handling LAST_LIST.606buffers.offer(LAST_LIST);607}608609@Override610public void onComplete() {611subscription = null;612onNext(LAST_LIST);613}614615@Override616public void close() throws IOException {617Flow.Subscription s;618synchronized (this) {619if (closed) return;620closed = true;621s = subscription;622subscription = null;623}624// s will be null if already completed625try {626if (s != null) {627s.cancel();628}629} finally {630buffers.offer(LAST_LIST);631super.close();632}633}634635}636637public static BodySubscriber<Stream<String>> createLineStream() {638return createLineStream(UTF_8);639}640641public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {642Objects.requireNonNull(charset);643BodySubscriber<InputStream> s = new HttpResponseInputStream();644// Creates a MappingSubscriber with a trusted finisher that is645// trusted not to block.646return new MappingSubscriber<InputStream,Stream<String>>(s,647(InputStream stream) -> {648return new BufferedReader(new InputStreamReader(stream, charset))649.lines().onClose(() -> Utils.close(stream));650}, true);651}652653/**654* Currently this consumes all of the data and ignores it655*/656public static class NullSubscriber<T> implements TrustedSubscriber<T> {657658private final CompletableFuture<T> cf = new MinimalFuture<>();659private final Optional<T> result;660private final AtomicBoolean subscribed = new AtomicBoolean();661662public NullSubscriber(Optional<T> result) {663this.result = result;664}665666@Override667public void onSubscribe(Flow.Subscription subscription) {668Objects.requireNonNull(subscription);669if (!subscribed.compareAndSet(false, true)) {670subscription.cancel();671} else {672subscription.request(Long.MAX_VALUE);673}674}675676@Override677public void onNext(List<ByteBuffer> items) {678Objects.requireNonNull(items);679}680681@Override682public void onError(Throwable throwable) {683Objects.requireNonNull(throwable);684cf.completeExceptionally(throwable);685}686687@Override688public void onComplete() {689if (result.isPresent()) {690cf.complete(result.get());691} else {692cf.complete(null);693}694}695696@Override697public CompletionStage<T> getBody() {698return cf;699}700}701702/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */703public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>704implements TrustedSubscriber<R>705{706private final CompletableFuture<R> cf = new MinimalFuture<>();707private final S subscriber;708private final Function<? super S,? extends R> finisher;709private volatile Subscription subscription;710711// The finisher isn't called until all bytes have been received,712// and so shouldn't need an executor. No need to override713// TrustedSubscriber::needsExecutor714public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {715this.subscriber = Objects.requireNonNull(subscriber);716this.finisher = Objects.requireNonNull(finisher);717}718719@Override720public void onSubscribe(Subscription subscription) {721Objects.requireNonNull(subscription);722if (this.subscription != null) {723subscription.cancel();724} else {725this.subscription = subscription;726subscriber.onSubscribe(subscription);727}728}729730@Override731public void onNext(List<ByteBuffer> item) {732Objects.requireNonNull(item);733try {734subscriber.onNext(item);735} catch (Throwable throwable) {736subscription.cancel();737onError(throwable);738}739}740741@Override742public void onError(Throwable throwable) {743Objects.requireNonNull(throwable);744try {745subscriber.onError(throwable);746} finally {747cf.completeExceptionally(throwable);748}749}750751@Override752public void onComplete() {753try {754subscriber.onComplete();755} finally {756try {757cf.complete(finisher.apply(subscriber));758} catch (Throwable throwable) {759cf.completeExceptionally(throwable);760}761}762}763764@Override765public CompletionStage<R> getBody() {766return cf;767}768}769770/**771* A body subscriber which receives input from an upstream subscriber772* and maps that subscriber's body type to a new type. The upstream subscriber773* delegates all flow operations directly to this object. The774* {@link CompletionStage} returned by {@link #getBody()}} takes the output775* of the upstream {@code getBody()} and applies the mapper function to776* obtain the new {@code CompletionStage} type.777*778* @param <T> the upstream body type779* @param <U> this subscriber's body type780*/781public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {782private final BodySubscriber<T> upstream;783private final Function<? super T,? extends U> mapper;784private final boolean trusted;785786public MappingSubscriber(BodySubscriber<T> upstream,787Function<? super T,? extends U> mapper) {788this(upstream, mapper, false);789}790791// creates a MappingSubscriber with a mapper that is trusted792// to not block when called.793MappingSubscriber(BodySubscriber<T> upstream,794Function<? super T,? extends U> mapper,795boolean trusted) {796this.upstream = Objects.requireNonNull(upstream);797this.mapper = Objects.requireNonNull(mapper);798this.trusted = trusted;799}800801// There is no way to know whether a custom mapper function802// might block or not - so we should return true unless the803// mapper is implemented and trusted by our own code not to804// block.805@Override806public boolean needsExecutor() {807return !trusted || TrustedSubscriber.needsExecutor(upstream);808}809810// If upstream.getBody() is already completed (case of InputStream),811// then calling upstream.getBody().thenApply(mapper) might block812// if the mapper blocks. We should probably add a variant of813// MappingSubscriber that calls thenApplyAsync instead, but this814// needs a new public API point. See needsExecutor() above.815@Override816public CompletionStage<U> getBody() {817return upstream.getBody().thenApply(mapper);818}819820@Override821public void onSubscribe(Flow.Subscription subscription) {822upstream.onSubscribe(subscription);823}824825@Override826public void onNext(List<ByteBuffer> item) {827upstream.onNext(item);828}829830@Override831public void onError(Throwable throwable) {832upstream.onError(throwable);833}834835@Override836public void onComplete() {837upstream.onComplete();838}839}840841// A BodySubscriber that returns a Publisher<List<ByteBuffer>>842static class PublishingBodySubscriber843implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {844private final MinimalFuture<Flow.Subscription>845subscriptionCF = new MinimalFuture<>();846private final MinimalFuture<SubscriberRef>847subscribedCF = new MinimalFuture<>();848private AtomicReference<SubscriberRef>849subscriberRef = new AtomicReference<>();850private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =851subscriptionCF.thenCompose(852(s) -> MinimalFuture.completedFuture(this::subscribe));853854// We use the completionCF to ensure that only one of855// onError or onComplete is ever called.856private final MinimalFuture<Void> completionCF;857private PublishingBodySubscriber() {858completionCF = new MinimalFuture<>();859completionCF.whenComplete(860(r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));861}862863// An object that holds a reference to a Flow.Subscriber.864// The reference is cleared when the subscriber is completed - either865// normally or exceptionally, or when the subscription is cancelled.866static final class SubscriberRef {867volatile Flow.Subscriber<? super List<ByteBuffer>> ref;868SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {869ref = subscriber;870}871Flow.Subscriber<? super List<ByteBuffer>> get() {872return ref;873}874Flow.Subscriber<? super List<ByteBuffer>> clear() {875Flow.Subscriber<? super List<ByteBuffer>> res = ref;876ref = null;877return res;878}879}880881// A subscription that wraps an upstream subscription and882// holds a reference to a subscriber. The subscriber reference883// is cleared when the subscription is cancelled884final static class SubscriptionRef implements Flow.Subscription {885final Flow.Subscription subscription;886final SubscriberRef subscriberRef;887SubscriptionRef(Flow.Subscription subscription,888SubscriberRef subscriberRef) {889this.subscription = subscription;890this.subscriberRef = subscriberRef;891}892@Override893public void request(long n) {894if (subscriberRef.get() != null) {895subscription.request(n);896}897}898@Override899public void cancel() {900subscription.cancel();901subscriberRef.clear();902}903904void subscribe() {905Subscriber<?> subscriber = subscriberRef.get();906if (subscriber != null) {907subscriber.onSubscribe(this);908}909}910911@Override912public String toString() {913return "SubscriptionRef/"914+ subscription.getClass().getName()915+ "@"916+ System.identityHashCode(subscription);917}918}919920// This is a callback for the subscribedCF.921// Do not call directly!922private void complete(SubscriberRef ref, Throwable t) {923assert ref != null;924Subscriber<?> s = ref.clear();925// maybe null if subscription was cancelled926if (s == null) return;927if (t == null) {928try {929s.onComplete();930} catch (Throwable x) {931s.onError(x);932}933} else {934s.onError(t);935}936}937938private void signalError(Throwable err) {939if (err == null) {940err = new NullPointerException("null throwable");941}942completionCF.completeExceptionally(err);943}944945private void signalComplete() {946completionCF.complete(null);947}948949private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {950Objects.requireNonNull(subscriber, "subscriber must not be null");951SubscriberRef ref = new SubscriberRef(subscriber);952if (subscriberRef.compareAndSet(null, ref)) {953subscriptionCF.thenAccept((s) -> {954SubscriptionRef subscription = new SubscriptionRef(s,ref);955try {956subscription.subscribe();957subscribedCF.complete(ref);958} catch (Throwable t) {959if (Log.errors()) {960Log.logError("Failed to call onSubscribe: " +961"cancelling subscription: " + t);962Log.logError(t);963}964subscription.cancel();965}966});967} else {968subscriber.onSubscribe(new Flow.Subscription() {969@Override public void request(long n) { }970@Override public void cancel() { }971});972subscriber.onError(new IllegalStateException(973"This publisher has already one subscriber"));974}975}976977private final AtomicBoolean subscribed = new AtomicBoolean();978979@Override980public void onSubscribe(Flow.Subscription subscription) {981Objects.requireNonNull(subscription);982if (!subscribed.compareAndSet(false, true)) {983subscription.cancel();984} else {985subscriptionCF.complete(subscription);986}987}988989@Override990public void onNext(List<ByteBuffer> item) {991Objects.requireNonNull(item);992try {993// cannot be called before onSubscribe()994assert subscriptionCF.isDone();995SubscriberRef ref = subscriberRef.get();996// cannot be called before subscriber calls request(1)997assert ref != null;998Flow.Subscriber<? super List<ByteBuffer>>999subscriber = ref.get();1000if (subscriber != null) {1001// may be null if subscription was cancelled.1002subscriber.onNext(item);1003}1004} catch (Throwable err) {1005signalError(err);1006subscriptionCF.thenAccept(s -> s.cancel());1007}1008}10091010@Override1011public void onError(Throwable throwable) {1012// cannot be called before onSubscribe();1013assert suppress(subscriptionCF.isDone(),1014"onError called before onSubscribe",1015throwable);1016// onError can be called before request(1), and therefore can1017// be called before subscriberRef is set.1018signalError(throwable);1019Objects.requireNonNull(throwable);1020}10211022@Override1023public void onComplete() {1024// cannot be called before onSubscribe()1025if (!subscriptionCF.isDone()) {1026signalError(new InternalError(1027"onComplete called before onSubscribed"));1028} else {1029// onComplete can be called before request(1),1030// and therefore can be called before subscriberRef1031// is set.1032signalComplete();1033}1034}10351036@Override1037public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {1038return body;1039}10401041private boolean suppress(boolean condition,1042String assertion,1043Throwable carrier) {1044if (!condition) {1045if (carrier != null) {1046carrier.addSuppressed(new AssertionError(assertion));1047} else if (Log.errors()) {1048Log.logError(new AssertionError(assertion));1049}1050}1051return true;1052}10531054}10551056public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>1057createPublisher() {1058return new PublishingBodySubscriber();1059}106010611062/**1063* Tries to determine whether bs::getBody must be invoked asynchronously,1064* and if so, uses the provided executor to do it.1065* If the executor is a {@link HttpClientImpl.DelegatingExecutor},1066* uses the executor's delegate.1067* @param e The executor to use if an executor is required.1068* @param bs The BodySubscriber (trusted or not)1069* @param <T> The type of the response.1070* @return A completion stage that completes when the completion1071* stage returned by bs::getBody completes. This may, or1072* may not, be the same completion stage.1073*/1074public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {1075if (TrustedSubscriber.needsExecutor(bs)) {1076// getBody must be called in the executor1077return getBodyAsync(e, bs, new MinimalFuture<>());1078} else {1079// No executor needed1080return bs.getBody();1081}1082}10831084/**1085* Invokes bs::getBody using the provided executor.1086* If invoking bs::getBody requires an executor, and the given executor1087* is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's1088* delegate is used. If an error occurs anywhere then the given {code cf}1089* is completed exceptionally (this method does not throw).1090* @param e The executor that should be used to call bs::getBody1091* @param bs The BodySubscriber1092* @param cf A completable future that this function will set up1093* to complete when the completion stage returned by1094* bs::getBody completes.1095* In case of any error while trying to set up the1096* completion chain, {@code cf} will be completed1097* exceptionally with that error.1098* @param <T> The response type.1099* @return The provided {@code cf}.1100*/1101public static <T> CompletableFuture<T> getBodyAsync(Executor e,1102BodySubscriber<T> bs,1103CompletableFuture<T> cf) {1104return getBodyAsync(e, bs, cf, cf::completeExceptionally);1105}11061107/**1108* Invokes bs::getBody using the provided executor.1109* If invoking bs::getBody requires an executor, and the given executor1110* is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's1111* delegate is used.1112* The provided {@code cf} is completed with the result (exceptional1113* or not) of the completion stage returned by bs::getBody.1114* If an error occurs when trying to set up the1115* completion chain, the provided {@code errorHandler} is invoked,1116* but {@code cf} is not necessarily affected.1117* This method does not throw.1118* @param e The executor that should be used to call bs::getBody1119* @param bs The BodySubscriber1120* @param cf A completable future that this function will set up1121* to complete when the completion stage returned by1122* bs::getBody completes.1123* In case of any error while trying to set up the1124* completion chain, {@code cf} will be completed1125* exceptionally with that error.1126* @param errorHandler The handler to invoke if an error is raised1127* while trying to set up the completion chain.1128* @param <T> The response type.1129* @return The provide {@code cf}. If the {@code errorHandler} is1130* invoked, it is the responsibility of the {@code errorHandler} to1131* complete the {@code cf}, if needed.1132*/1133public static <T> CompletableFuture<T> getBodyAsync(Executor e,1134BodySubscriber<T> bs,1135CompletableFuture<T> cf,1136Consumer<Throwable> errorHandler) {1137assert errorHandler != null;1138try {1139assert e != null;1140assert cf != null;11411142if (TrustedSubscriber.needsExecutor(bs)) {1143e = (e instanceof HttpClientImpl.DelegatingExecutor)1144? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;1145}11461147e.execute(() -> {1148try {1149bs.getBody().whenComplete((r, t) -> {1150if (t != null) {1151cf.completeExceptionally(t);1152} else {1153cf.complete(r);1154}1155});1156} catch (Throwable t) {1157errorHandler.accept(t);1158}1159});1160return cf;11611162} catch (Throwable t) {1163errorHandler.accept(t);1164}1165return cf;1166}1167}116811691170