Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
41171 views
/*1* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.EOFException;28import java.io.IOException;29import java.nio.ByteBuffer;30import java.util.Arrays;31import java.util.HashSet;32import java.util.List;33import java.util.Set;34import java.util.concurrent.ConcurrentLinkedDeque;35import java.util.concurrent.Executor;36import java.util.concurrent.ExecutorService;37import java.util.concurrent.Flow;38import java.util.concurrent.atomic.AtomicBoolean;39import java.util.concurrent.atomic.AtomicLong;40import java.util.concurrent.atomic.AtomicReference;41import java.util.function.Consumer;42import jdk.internal.net.http.common.Demand;43import jdk.internal.net.http.common.FlowTube.TubeSubscriber;44import jdk.internal.net.http.common.Log;45import jdk.internal.net.http.common.Logger;46import jdk.internal.net.http.common.MinimalFuture;47import jdk.internal.net.http.common.SequentialScheduler;48import jdk.internal.net.http.common.ConnectionExpiredException;49import jdk.internal.net.http.common.Utils;505152/**53* A helper class that will queue up incoming data until the receiving54* side is ready to handle it.55*/56class Http1AsyncReceiver {5758final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);5960/**61* A delegate that can asynchronously receive data from an upstream flow,62* parse, it, then possibly transform it and either store it (response63* headers) or possibly pass it to a downstream subscriber (response body).64* Usually, there will be one Http1AsyncDelegate in charge of receiving65* and parsing headers, and another one in charge of receiving, parsing,66* and forwarding body. Each will sequentially subscribe with the67* Http1AsyncReceiver in turn. There may be additional delegates which68* subscribe to the Http1AsyncReceiver, mainly for the purpose of handling69* errors while the connection is busy transmitting the request body and the70* Http1Exchange::readBody method hasn't been called yet, and response71* delegates haven't subscribed yet.72*/73static interface Http1AsyncDelegate {74/**75* Receives and handles a byte buffer reference.76* @param ref A byte buffer reference coming from upstream.77* @return false, if the byte buffer reference should be kept in the queue.78* Usually, this means that either the byte buffer reference79* was handled and parsing is finished, or that the receiver80* didn't handle the byte reference at all.81* There may or may not be any remaining data in the82* byte buffer, and the byte buffer reference must not have83* been cleared.84* true, if the byte buffer reference was fully read and85* more data can be received.86*/87public boolean tryAsyncReceive(ByteBuffer ref);8889/**90* Called when an exception is raised.91* @param ex The raised Throwable.92*/93public void onReadError(Throwable ex);9495/**96* Must be called before any other method on the delegate.97* The subscription can be either used directly by the delegate98* to request more data (e.g. if the delegate is a header parser),99* or can be forwarded to a downstream subscriber (if the delegate100* is a body parser that wraps a response BodySubscriber).101* In all cases, it is the responsibility of the delegate to ensure102* that request(n) and demand.tryDecrement() are called appropriately.103* No data will be sent to {@code tryAsyncReceive} unless104* the subscription has some demand.105*106* @param s A subscription that allows the delegate to control the107* data flow.108*/109public void onSubscribe(AbstractSubscription s);110111/**112* Returns the subscription that was passed to {@code onSubscribe}113* @return the subscription that was passed to {@code onSubscribe}..114*/115public AbstractSubscription subscription();116117/**118* Called to make sure resources are released when the119* when the Http1AsyncReceiver is stopped.120* @param error The Http1AsyncReceiver pending error ref,121* if any.122*/123public void close(Throwable error);124125}126127/**128* A simple subclass of AbstractSubscription that ensures the129* SequentialScheduler will be run when request() is called and demand130* becomes positive again.131*/132private static final class Http1AsyncDelegateSubscription133extends AbstractSubscription134{135private final Runnable onCancel;136private final Consumer<Throwable> onError;137private final SequentialScheduler scheduler;138private volatile boolean cancelled;139Http1AsyncDelegateSubscription(SequentialScheduler scheduler,140Runnable onCancel,141Consumer<Throwable> onError) {142this.scheduler = scheduler;143this.onCancel = onCancel;144this.onError = onError;145}146@Override147public void request(long n) {148if (cancelled) return;149try {150final Demand demand = demand();151if (demand.increase(n)) {152scheduler.runOrSchedule();153}154} catch (IllegalArgumentException x) {155cancelled = true;156onError.accept(x);157}158}159@Override160public void cancel() {161cancelled = true;162onCancel.run();163}164}165166private final ConcurrentLinkedDeque<ByteBuffer> queue167= new ConcurrentLinkedDeque<>();168private final SequentialScheduler scheduler =169SequentialScheduler.lockingScheduler(this::flush);170final MinimalFuture<Void> whenFinished;171private final Executor executor;172private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();173private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;174private final AtomicLong received = new AtomicLong();175final AtomicBoolean canRequestMore = new AtomicBoolean();176177private volatile Throwable error;178private volatile Http1AsyncDelegate delegate;179// This reference is only used to prevent early GC of the exchange.180private volatile Http1Exchange<?> owner;181// Only used for checking whether we run on the selector manager thread.182private final HttpClientImpl client;183private boolean retry;184private volatile boolean stopRequested;185186public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {187this.pendingDelegateRef = new AtomicReference<>();188this.executor = executor;189this.whenFinished = new MinimalFuture<>();190this.owner = owner;191this.client = owner.client;192}193194// This is the main loop called by the SequentialScheduler.195// It attempts to empty the queue until the scheduler is stopped,196// or the delegate is unregistered, or the delegate is unable to197// process the data (because it's not ready or already done), which198// it signals by returning 'true';199private void flush() {200ByteBuffer buf;201try {202// we should not be running in the selector here,203// except if the custom Executor supplied to the client is204// something like (r) -> r.run();205assert !client.isSelectorThread()206|| !(client.theExecutor().delegate() instanceof ExecutorService) :207"Http1AsyncReceiver::flush should not run in the selector: "208+ Thread.currentThread().getName();209210// First check whether we have a pending delegate that has211// just subscribed, and if so, create a Subscription for it212// and call onSubscribe.213handlePendingDelegate();214215// Then start emptying the queue, if possible.216while ((buf = queue.peek()) != null && !stopRequested) {217Http1AsyncDelegate delegate = this.delegate;218if (debug.on())219debug.log("Got %s bytes for delegate %s",220buf.remaining(), delegate);221if (!hasDemand(delegate)) {222// The scheduler will be invoked again later when the demand223// becomes positive.224return;225}226227assert delegate != null;228if (debug.on())229debug.log("Forwarding %s bytes to delegate %s",230buf.remaining(), delegate);231// The delegate has demand: feed it the next buffer.232if (!delegate.tryAsyncReceive(buf)) {233final long remaining = buf.remaining();234if (debug.on()) debug.log(() -> {235// If the scheduler is stopped, the queue may already236// be empty and the reference may already be released.237String remstr = scheduler.isStopped() ? "" :238" remaining in ref: "239+ remaining;240remstr += remstr241+ " total remaining: " + remaining();242return "Delegate done: " + remaining;243});244canRequestMore.set(false);245// The last buffer parsed may have remaining unparsed bytes.246// Don't take it out of the queue.247return; // done.248}249250// removed parsed buffer from queue, and continue with next251// if available252ByteBuffer parsed = queue.remove();253canRequestMore.set(queue.isEmpty() && !stopRequested);254assert parsed == buf;255}256257// queue is empty: let's see if we should request more258checkRequestMore();259260} catch (Throwable t) {261Throwable x = error;262if (x == null) error = t; // will be handled in the finally block263if (debug.on()) debug.log("Unexpected error caught in flush()", t);264} finally {265// Handles any pending error.266// The most recently subscribed delegate will get the error.267checkForErrors();268}269}270271private String describe() {272Http1Exchange<?> exchange = owner;273if (exchange != null) {274return String.valueOf(exchange.request());275}276return "<uri unavailable>";277}278279/**280* Must be called from within the scheduler main loop.281* Handles any pending errors by calling delegate.onReadError().282* If the error can be forwarded to the delegate, stops the scheduler.283*/284private void checkForErrors() {285// Handles any pending error.286// The most recently subscribed delegate will get the error.287// If the delegate is null, the error will be handled by the next288// delegate that subscribes.289// If the queue is not empty, wait until it is empty before290// handling the error.291Http1AsyncDelegate delegate = pendingDelegateRef.get();292if (delegate == null) delegate = this.delegate;293Throwable x = error;294if (delegate != null && x != null && (stopRequested || queue.isEmpty())) {295// forward error only after emptying the queue.296final Object captured = delegate;297if (debug.on())298debug.log(() -> "flushing " + x + "\n\t delegate: " + captured299+ "\t\t queue.isEmpty: " + queue.isEmpty());300scheduler.stop();301delegate.onReadError(x);302whenFinished.completeExceptionally(x);303if (Log.channel()) {304Log.logChannel("HTTP/1 read subscriber stopped for: {0}", describe());305}306if (stopRequested) {307// This is the special case where the subscriber308// has requested an illegal number of items.309// In this case, the error doesn't come from310// upstream, but from downstream, and we need to311// close the upstream connection.312Http1Exchange<?> exchg = owner;313stop();314if (exchg != null) exchg.connection().close();315}316}317}318319/**320* Must be called from within the scheduler main loop.321* Figure out whether more data should be requested from the322* Http1TubeSubscriber.323*/324private void checkRequestMore() {325Http1AsyncDelegate delegate = this.delegate;326boolean more = this.canRequestMore.get();327boolean hasDemand = hasDemand(delegate);328if (debug.on())329debug.log("checkRequestMore: " + "canRequestMore=" + more330+ ", hasDemand=" + hasDemand331+ (delegate == null ? ", delegate=null" : ""));332if (hasDemand) {333subscriber.requestMore();334}335}336337/**338* Must be called from within the scheduler main loop.339* Return true if the delegate is not null and has some demand.340* @param delegate The Http1AsyncDelegate delegate341* @return true if the delegate is not null and has some demand342*/343private boolean hasDemand(Http1AsyncDelegate delegate) {344if (delegate == null) return false;345AbstractSubscription subscription = delegate.subscription();346long demand = subscription.demand().get();347if (debug.on())348debug.log("downstream subscription demand is %s", demand);349return demand > 0;350}351352/**353* Must be called from within the scheduler main loop.354* Handles pending delegate subscription.355* Return true if there was some pending delegate subscription and a new356* delegate was subscribed, false otherwise.357*358* @return true if there was some pending delegate subscription and a new359* delegate was subscribed, false otherwise.360*/361private boolean handlePendingDelegate() {362Http1AsyncDelegate pending = pendingDelegateRef.get();363if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {364Http1AsyncDelegate delegate = this.delegate;365if (delegate != null) unsubscribe(delegate);366Consumer<Throwable> onSubscriptionError = (x) -> {367setRetryOnError(false);368stopRequested = true;369onReadError(x);370};371Runnable cancel = () -> {372if (debug.on())373debug.log("Downstream subscription cancelled by %s", pending);374// The connection should be closed, as some data may375// be left over in the stream.376try {377setRetryOnError(false);378pending.close(null);379onReadError(new IOException("subscription cancelled"));380unsubscribe(pending);381} finally {382Http1Exchange<?> exchg = owner;383stop();384if (exchg != null) exchg.connection().close();385}386};387// The subscription created by a delegate is only loosely388// coupled with the upstream subscription. This is partly because389// the header/body parser work with a flow of ByteBuffer, whereas390// we have a flow List<ByteBuffer> upstream.391Http1AsyncDelegateSubscription subscription =392new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);393try {394pending.onSubscribe(subscription);395} finally {396this.delegate = delegate = pending;397}398final Object captured = delegate;399if (debug.on())400debug.log("delegate is now " + captured401+ ", demand=" + subscription.demand().get()402+ ", canRequestMore=" + canRequestMore.get()403+ ", queue.isEmpty=" + queue.isEmpty());404return true;405}406return false;407}408409synchronized void setRetryOnError(boolean retry) {410this.retry = retry;411}412413void clear() {414if (debug.on()) debug.log("cleared");415this.pendingDelegateRef.set(null);416this.delegate = null;417this.owner = null;418}419420void subscribe(Http1AsyncDelegate delegate) {421synchronized(this) {422pendingDelegateRef.set(delegate);423}424if (queue.isEmpty()) {425canRequestMore.set(true);426}427if (debug.on())428debug.log("Subscribed pending " + delegate + " queue.isEmpty: "429+ queue.isEmpty());430// Everything may have been received already. Make sure431// we parse it.432if (client.isSelectorThread()) {433scheduler.runOrSchedule(executor);434} else {435scheduler.runOrSchedule();436}437}438439// Used for debugging only!440long remaining() {441return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY));442}443444void unsubscribe(Http1AsyncDelegate delegate) {445synchronized(this) {446if (this.delegate == delegate) {447if (debug.on()) debug.log("Unsubscribed %s", delegate);448this.delegate = null;449}450}451}452453// Callback: Consumer of ByteBuffer454private void asyncReceive(ByteBuffer buf) {455if (debug.on())456debug.log("Putting %s bytes into the queue", buf.remaining());457received.addAndGet(buf.remaining());458queue.offer(buf);459460// This callback is called from within the selector thread.461// Use an executor here to avoid doing the heavy lifting in the462// selector.463scheduler.runOrSchedule(executor);464}465466// Callback: Consumer of Throwable467void onReadError(Throwable ex) {468Http1AsyncDelegate delegate;469Throwable recorded;470if (debug.on()) debug.log("onError: %s", (Object) ex);471synchronized (this) {472delegate = this.delegate;473recorded = error;474if (recorded == null) {475// retry is set to true by HttpExchange when the connection is476// already connected, which means it's been retrieved from477// the pool.478if (retry && (ex instanceof IOException)) {479// could be either EOFException, or480// IOException("connection reset by peer), or481// SSLHandshakeException resulting from the server having482// closed the SSL session.483if (received.get() == 0) {484// If we receive such an exception before having485// received any byte, then in this case, we will486// throw ConnectionExpiredException487// to try & force a retry of the request.488retry = false;489ex = new ConnectionExpiredException(ex);490}491}492error = ex;493}494}495496final Throwable t = (recorded == null ? ex : recorded);497if (debug.on())498debug.log("recorded " + t + "\n\t delegate: " + delegate499+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);500if (Log.errors()) {501Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);502}503if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) {504// This callback is called from within the selector thread.505// Use an executor here to avoid doing the heavy lifting in the506// selector.507if (Log.errors()) {508Log.logError("HTTP/1 propagating recorded error: {0} - {1}", describe(), t);509}510scheduler.runOrSchedule(executor);511}512}513514void stop() {515if (debug.on()) debug.log("stopping");516if (Log.channel() && !scheduler.isStopped()) {517Log.logChannel("HTTP/1 read subscriber stopped for {0}", describe());518}519scheduler.stop();520// make sure ref count is handled properly by521// closing the delegate.522Http1AsyncDelegate previous = delegate;523if (previous != null) previous.close(error);524delegate = null;525owner = null;526whenFinished.complete(null);527}528529/**530* Returns the TubeSubscriber for reading from the connection flow.531* @return the TubeSubscriber for reading from the connection flow.532*/533TubeSubscriber subscriber() {534return subscriber;535}536537/**538* A simple tube subscriber for reading from the connection flow.539*/540final class Http1TubeSubscriber implements TubeSubscriber {541volatile Flow.Subscription subscription;542volatile boolean completed;543volatile boolean dropped;544545public void onSubscribe(Flow.Subscription subscription) {546// supports being called multiple time.547// doesn't cancel the previous subscription, since that is548// most probably the same as the new subscription.549if (debug.on()) debug.log("Received onSubscribed from upstream");550if (Log.channel()) {551Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe());552}553assert this.subscription == null || dropped == false;554this.subscription = subscription;555dropped = false;556canRequestMore.set(true);557if (delegate != null) {558scheduler.runOrSchedule(executor);559} else {560if (debug.on()) debug.log("onSubscribe: read delegate not present yet");561}562}563564void requestMore() {565Flow.Subscription s = subscription;566if (s == null) return;567if (canRequestMore.compareAndSet(true, false)) {568if (!completed && !dropped) {569if (debug.on())570debug.log("Http1TubeSubscriber: requesting one more from upstream");571s.request(1);572return;573}574}575if (debug.on())576debug.log("Http1TubeSubscriber: no need to request more");577}578579@Override580public void onNext(List<ByteBuffer> item) {581canRequestMore.set(item.isEmpty());582for (ByteBuffer buffer : item) {583asyncReceive(buffer);584}585}586587@Override588public void onError(Throwable throwable) {589onReadError(throwable);590completed = true;591}592593@Override594public void onComplete() {595onReadError(new EOFException("EOF reached while reading"));596completed = true;597}598599public void dropSubscription() {600if (debug.on()) debug.log("Http1TubeSubscriber: dropSubscription");601// we could probably set subscription to null here...602// then we might not need the 'dropped' boolean?603dropped = true;604}605606}607608// Drains the content of the queue into a single ByteBuffer.609// The scheduler must be permanently stopped before calling drain().610ByteBuffer drain(ByteBuffer initial) {611// Revisit: need to clean that up.612//613ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial);614assert scheduler.isStopped();615616if (queue.isEmpty()) return b;617618// sanity check: we shouldn't have queued the same619// buffer twice.620ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]);621622// the assertion looks suspicious, more investigation needed623//624// assert java.util.stream.Stream.of(qbb)625// .collect(Collectors.toSet())626// .size() == qbb.length : debugQBB(qbb);627628// compute the number of bytes in the queue, the number of bytes629// in the initial buffer630// TODO: will need revisiting - as it is not guaranteed that all631// data will fit in single BB!632int size = Utils.remaining(qbb, Integer.MAX_VALUE);633int remaining = b.remaining();634int free = b.capacity() - b.position() - remaining;635if (debug.on())636debug.log("Flushing %s bytes from queue into initial buffer "637+ "(remaining=%s, free=%s)", size, remaining, free);638639// check whether the initial buffer has enough space640if (size > free) {641if (debug.on())642debug.log("Allocating new buffer for initial: %s", (size + remaining));643// allocates a new buffer and copy initial to it644b = ByteBuffer.allocate(size + remaining);645Utils.copy(initial, b);646assert b.position() == remaining;647b.flip();648assert b.position() == 0;649assert b.limit() == remaining;650assert b.remaining() == remaining;651}652653// store position and limit654int pos = b.position();655int limit = b.limit();656assert limit - pos == remaining;657assert b.capacity() >= remaining + size658: "capacity: " + b.capacity()659+ ", remaining: " + b.remaining()660+ ", size: " + size;661662// prepare to copy the content of the queue663b.position(limit);664b.limit(pos + remaining + size);665assert b.remaining() >= size :666"remaining: " + b.remaining() + ", size: " + size;667668// copy the content of the queue669int count = 0;670for (int i=0; i<qbb.length; i++) {671ByteBuffer b2 = qbb[i];672int r = b2.remaining();673assert b.remaining() >= r : "need at least " + r + " only "674+ b.remaining() + " available";675int copied = Utils.copy(b2, b);676assert copied == r : "copied="+copied+" available="+r;677assert b2.remaining() == 0;678count += copied;679}680assert count == size;681assert b.position() == pos + remaining + size :682"b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size;683684// reset limit and position685b.limit(limit+size);686b.position(pos);687688// we can clear the refs689queue.clear();690final ByteBuffer bb = b;691if (debug.on())692debug.log("Initial buffer now has " + bb.remaining()693+ " pos=" + bb.position() + " limit=" + bb.limit());694695return b;696}697698private String debugQBB(ByteBuffer[] qbb) {699StringBuilder msg = new StringBuilder();700List<ByteBuffer> lbb = Arrays.asList(qbb);701Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb));702703int uniquebb = sbb.size();704msg.append("qbb: ").append(lbb.size())705.append(" (unique: ").append(uniquebb).append("), ")706.append("duplicates: ");707String sep = "";708for (ByteBuffer b : lbb) {709if (!sbb.remove(b)) {710msg.append(sep)711.append(String.valueOf(b))712.append("[remaining=")713.append(b.remaining())714.append(", position=")715.append(b.position())716.append(", capacity=")717.append(b.capacity())718.append("]");719sep = ", ";720}721}722return msg.toString();723}724725volatile String dbgTag;726String dbgString() {727String tag = dbgTag;728if (tag == null) {729String flowTag = null;730Http1Exchange<?> exchg = owner;731Object flow = (exchg != null)732? exchg.connection().getConnectionFlow()733: null;734flowTag = tag = flow == null ? null: (String.valueOf(flow));735if (flowTag != null) {736dbgTag = tag = "Http1AsyncReceiver("+ flowTag + ")";737} else {738tag = "Http1AsyncReceiver(?)";739}740}741return tag;742}743}744745746