Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.EOFException;28import java.io.IOException;29import java.io.UncheckedIOException;30import java.lang.invoke.MethodHandles;31import java.lang.invoke.VarHandle;32import java.net.URI;33import java.nio.ByteBuffer;34import java.util.ArrayList;35import java.util.Collections;36import java.util.List;37import java.util.concurrent.CompletableFuture;38import java.util.concurrent.ConcurrentLinkedDeque;39import java.util.concurrent.ConcurrentLinkedQueue;40import java.util.concurrent.Executor;41import java.util.concurrent.Flow;42import java.util.concurrent.Flow.Subscription;43import java.util.concurrent.atomic.AtomicReference;44import java.util.function.BiPredicate;45import java.net.http.HttpClient;46import java.net.http.HttpHeaders;47import java.net.http.HttpRequest;48import java.net.http.HttpResponse;49import java.net.http.HttpResponse.BodySubscriber;50import jdk.internal.net.http.common.*;51import jdk.internal.net.http.frame.*;52import jdk.internal.net.http.hpack.DecodingCallback;5354/**55* Http/2 Stream handling.56*57* REQUESTS58*59* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q60*61* sendRequest() -- sendHeadersOnly() + sendBody()62*63* sendBodyAsync() -- calls sendBody() in an executor thread.64*65* sendHeadersAsync() -- calls sendHeadersOnly() which does not block66*67* sendRequestAsync() -- calls sendRequest() in an executor thread68*69* RESPONSES70*71* Multiple responses can be received per request. Responses are queued up on72* a LinkedList of CF<HttpResponse> and the first one on the list is completed73* with the next response74*75* getResponseAsync() -- queries list of response CFs and returns first one76* if one exists. Otherwise, creates one and adds it to list77* and returns it. Completion is achieved through the78* incoming() upcall from connection reader thread.79*80* getResponse() -- calls getResponseAsync() and waits for CF to complete81*82* responseBodyAsync() -- calls responseBody() in an executor thread.83*84* incoming() -- entry point called from connection reader thread. Frames are85* either handled immediately without blocking or for data frames86* placed on the stream's inputQ which is consumed by the stream's87* reader thread.88*89* PushedStream sub class90* ======================91* Sending side methods are not used because the request comes from a PUSH_PROMISE92* frame sent by the server. When a PUSH_PROMISE is received the PushedStream93* is created. PushedStream does not use responseCF list as there can be only94* one response. The CF is created when the object created and when the response95* HEADERS frame is received the object is completed.96*/97class Stream<T> extends ExchangeImpl<T> {9899final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);100101final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();102final SequentialScheduler sched =103SequentialScheduler.lockingScheduler(this::schedule);104final SubscriptionBase userSubscription =105new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);106107/**108* This stream's identifier. Assigned lazily by the HTTP2Connection before109* the stream's first frame is sent.110*/111protected volatile int streamid;112113long requestContentLen;114115final Http2Connection connection;116final HttpRequestImpl request;117final HeadersConsumer rspHeadersConsumer;118final HttpHeadersBuilder responseHeadersBuilder;119final HttpHeaders requestPseudoHeaders;120volatile HttpResponse.BodySubscriber<T> responseSubscriber;121final HttpRequest.BodyPublisher requestPublisher;122volatile RequestSubscriber requestSubscriber;123volatile int responseCode;124volatile Response response;125// The exception with which this stream was canceled.126private final AtomicReference<Throwable> errorRef = new AtomicReference<>();127final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();128volatile CompletableFuture<T> responseBodyCF;129volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;130volatile boolean stopRequested;131132/** True if END_STREAM has been seen in a frame received on this stream. */133private volatile boolean remotelyClosed;134private volatile boolean closed;135private volatile boolean endStreamSent;136// Indicates the first reason that was invoked when sending a ResetFrame137// to the server. A streamState of 0 indicates that no reset was sent.138// (see markStream(int code)139private volatile int streamState; // assigned using STREAM_STATE varhandle.140private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.141142// state flags143private boolean requestSent, responseReceived;144145// send lock: prevent sending DataFrames after reset occurred.146private final Object sendLock = new Object();147148/**149* A reference to this Stream's connection Send Window controller. The150* stream MUST acquire the appropriate amount of Send Window before151* sending any data. Will be null for PushStreams, as they cannot send data.152*/153private final WindowController windowController;154private final WindowUpdateSender windowUpdater;155156@Override157HttpConnection connection() {158return connection.connection;159}160161/**162* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }163* of after user subscription window has re-opened, from SubscriptionBase.request()164*/165private void schedule() {166boolean onCompleteCalled = false;167HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;168try {169if (subscriber == null) {170subscriber = responseSubscriber = pendingResponseSubscriber;171if (subscriber == null) {172// can't process anything yet173return;174} else {175if (debug.on()) debug.log("subscribing user subscriber");176subscriber.onSubscribe(userSubscription);177}178}179while (!inputQ.isEmpty()) {180Http2Frame frame = inputQ.peek();181if (frame instanceof ResetFrame) {182inputQ.remove();183handleReset((ResetFrame)frame, subscriber);184return;185}186DataFrame df = (DataFrame)frame;187boolean finished = df.getFlag(DataFrame.END_STREAM);188189List<ByteBuffer> buffers = df.getData();190List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);191int size = Utils.remaining(dsts, Integer.MAX_VALUE);192if (size == 0 && finished) {193inputQ.remove();194connection.ensureWindowUpdated(df); // must update connection window195Log.logTrace("responseSubscriber.onComplete");196if (debug.on()) debug.log("incoming: onComplete");197sched.stop();198connection.decrementStreamsCount(streamid);199subscriber.onComplete();200onCompleteCalled = true;201setEndStreamReceived();202return;203} else if (userSubscription.tryDecrement()) {204inputQ.remove();205Log.logTrace("responseSubscriber.onNext {0}", size);206if (debug.on()) debug.log("incoming: onNext(%d)", size);207try {208subscriber.onNext(dsts);209} catch (Throwable t) {210connection.dropDataFrame(df); // must update connection window211throw t;212}213if (consumed(df)) {214Log.logTrace("responseSubscriber.onComplete");215if (debug.on()) debug.log("incoming: onComplete");216sched.stop();217connection.decrementStreamsCount(streamid);218subscriber.onComplete();219onCompleteCalled = true;220setEndStreamReceived();221return;222}223} else {224if (stopRequested) break;225return;226}227}228} catch (Throwable throwable) {229errorRef.compareAndSet(null, throwable);230} finally {231if (sched.isStopped()) drainInputQueue();232}233234Throwable t = errorRef.get();235if (t != null) {236sched.stop();237try {238if (!onCompleteCalled) {239if (debug.on())240debug.log("calling subscriber.onError: %s", (Object) t);241subscriber.onError(t);242} else {243if (debug.on())244debug.log("already completed: dropping error %s", (Object) t);245}246} catch (Throwable x) {247Log.logError("Subscriber::onError threw exception: {0}", (Object) t);248} finally {249cancelImpl(t);250drainInputQueue();251}252}253}254255// must only be called from the scheduler schedule() loop.256// ensure that all received data frames are accounted for257// in the connection window flow control if the scheduler258// is stopped before all the data is consumed.259private void drainInputQueue() {260Http2Frame frame;261while ((frame = inputQ.poll()) != null) {262if (frame instanceof DataFrame) {263connection.dropDataFrame((DataFrame)frame);264}265}266}267268@Override269void nullBody(HttpResponse<T> resp, Throwable t) {270if (debug.on()) debug.log("nullBody: streamid=%d", streamid);271// We should have an END_STREAM data frame waiting in the inputQ.272// We need a subscriber to force the scheduler to process it.273pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);274sched.runOrSchedule();275}276277// Callback invoked after the Response BodySubscriber has consumed the278// buffers contained in a DataFrame.279// Returns true if END_STREAM is reached, false otherwise.280private boolean consumed(DataFrame df) {281// RFC 7540 6.1:282// The entire DATA frame payload is included in flow control,283// including the Pad Length and Padding fields if present284int len = df.payloadLength();285boolean endStream = df.getFlag(DataFrame.END_STREAM);286if (len == 0) return endStream;287288connection.windowUpdater.update(len);289290if (!endStream) {291// Don't send window update on a stream which is292// closed or half closed.293windowUpdater.update(len);294}295296// true: end of stream; false: more data coming297return endStream;298}299300boolean deRegister() {301return DEREGISTERED.compareAndSet(this, false, true);302}303304@Override305CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,306boolean returnConnectionToPool,307Executor executor)308{309try {310Log.logTrace("Reading body on stream {0}", streamid);311debug.log("Getting BodySubscriber for: " + response);312BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));313CompletableFuture<T> cf = receiveData(bodySubscriber, executor);314315PushGroup<?> pg = exchange.getPushGroup();316if (pg != null) {317// if an error occurs make sure it is recorded in the PushGroup318cf = cf.whenComplete((t, e) -> pg.pushError(e));319}320return cf;321} catch (Throwable t) {322// may be thrown by handler.apply323cancelImpl(t);324return MinimalFuture.failedFuture(t);325}326}327328@Override329public String toString() {330StringBuilder sb = new StringBuilder();331sb.append("streamid: ")332.append(streamid);333return sb.toString();334}335336private void receiveDataFrame(DataFrame df) {337inputQ.add(df);338sched.runOrSchedule();339}340341/** Handles a RESET frame. RESET is always handled inline in the queue. */342private void receiveResetFrame(ResetFrame frame) {343inputQ.add(frame);344sched.runOrSchedule();345}346347/**348* Records the first reason which was invoked when sending a ResetFrame349* to the server in the streamState, and return the previous value350* of the streamState. This is an atomic operation.351* A possible use of this method would be to send a ResetFrame only352* if no previous reset frame has been sent.353* For instance: <pre>{@code354* if (markStream(ResetFrame.CANCEL) == 0) {355* connection.sendResetFrame(streamId, ResetFrame.CANCEL);356* }357* }</pre>358* @param code the reason code as per HTTP/2 protocol359* @return the previous value of the stream state.360*/361int markStream(int code) {362if (code == 0) return streamState;363synchronized (sendLock) {364return (int) STREAM_STATE.compareAndExchange(this, 0, code);365}366}367368private void sendDataFrame(DataFrame frame) {369synchronized (sendLock) {370// must not send DataFrame after reset.371if (streamState == 0) {372connection.sendDataFrame(frame);373}374}375}376377// pushes entire response body into response subscriber378// blocking when required by local or remote flow control379CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {380// We want to allow the subscriber's getBody() method to block so it381// can work with InputStreams. So, we offload execution.382responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,383new MinimalFuture<>(), this::cancelImpl);384385if (isCanceled()) {386Throwable t = getCancelCause();387responseBodyCF.completeExceptionally(t);388} else {389pendingResponseSubscriber = bodySubscriber;390sched.runOrSchedule(); // in case data waiting already to be processed391}392return responseBodyCF;393}394395@Override396CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {397return sendBodyImpl().thenApply( v -> this);398}399400@SuppressWarnings("unchecked")401Stream(Http2Connection connection,402Exchange<T> e,403WindowController windowController)404{405super(e);406this.connection = connection;407this.windowController = windowController;408this.request = e.request();409this.requestPublisher = request.requestPublisher; // may be null410this.responseHeadersBuilder = new HttpHeadersBuilder();411this.rspHeadersConsumer = new HeadersConsumer();412this.requestPseudoHeaders = createPseudoHeaders(request);413this.windowUpdater = new StreamWindowUpdateSender(connection);414}415416private boolean checkRequestCancelled() {417if (exchange.multi.requestCancelled()) {418if (errorRef.get() == null) cancel();419else sendCancelStreamFrame();420return true;421}422return false;423}424425/**426* Entry point from Http2Connection reader thread.427*428* Data frames will be removed by response body thread.429*/430void incoming(Http2Frame frame) throws IOException {431if (debug.on()) debug.log("incoming: %s", frame);432var cancelled = checkRequestCancelled() || closed;433if ((frame instanceof HeaderFrame)) {434HeaderFrame hframe = (HeaderFrame) frame;435if (hframe.endHeaders()) {436Log.logTrace("handling response (streamid={0})", streamid);437handleResponse();438}439if (hframe.getFlag(HeaderFrame.END_STREAM)) {440if (debug.on()) debug.log("handling END_STREAM: %d", streamid);441receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));442}443} else if (frame instanceof DataFrame) {444if (cancelled) connection.dropDataFrame((DataFrame) frame);445else receiveDataFrame((DataFrame) frame);446} else {447if (!cancelled) otherFrame(frame);448}449}450451void otherFrame(Http2Frame frame) throws IOException {452switch (frame.type()) {453case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);454case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);455case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);456457default -> throw new IOException("Unexpected frame: " + frame.toString());458}459}460461// The Hpack decoder decodes into one of these consumers of name,value pairs462463DecodingCallback rspHeadersConsumer() {464return rspHeadersConsumer;465}466467protected void handleResponse() throws IOException {468HttpHeaders responseHeaders = responseHeadersBuilder.build();469responseCode = (int)responseHeaders470.firstValueAsLong(":status")471.orElseThrow(() -> new IOException("no statuscode in response"));472473response = new Response(474request, exchange, responseHeaders, connection(),475responseCode, HttpClient.Version.HTTP_2);476477/* TODO: review if needs to be removed478the value is not used, but in case `content-length` doesn't parse as479long, there will be NumberFormatException. If left as is, make sure480code up the stack handles NFE correctly. */481responseHeaders.firstValueAsLong("content-length");482483if (Log.headers()) {484StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");485Log.dumpHeaders(sb, " ", responseHeaders);486Log.logHeaders(sb.toString());487}488489// this will clear the response headers490rspHeadersConsumer.reset();491492completeResponse(response);493}494495void incoming_reset(ResetFrame frame) {496Log.logTrace("Received RST_STREAM on stream {0}", streamid);497if (endStreamReceived()) {498Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);499} else if (closed) {500Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);501} else {502Flow.Subscriber<?> subscriber =503responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;504if (response == null && subscriber == null) {505// we haven't receive the headers yet, and won't receive any!506// handle reset now.507handleReset(frame, subscriber);508} else {509// put it in the input queue in order to read all510// pending data frames first. Indeed, a server may send511// RST_STREAM after sending END_STREAM, in which case we should512// ignore it. However, we won't know if we have received END_STREAM513// or not until all pending data frames are read.514receiveResetFrame(frame);515// RST_STREAM was pushed to the queue. It will be handled by516// asyncReceive after all pending data frames have been517// processed.518Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);519}520}521}522523void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {524Log.logTrace("Handling RST_STREAM on stream {0}", streamid);525if (!closed) {526synchronized (this) {527if (closed) {528if (debug.on()) debug.log("Stream already closed: ignoring RESET");529return;530}531closed = true;532}533try {534int error = frame.getErrorCode();535IOException e = new IOException("Received RST_STREAM: "536+ ErrorFrame.stringForCode(error));537if (errorRef.compareAndSet(null, e)) {538if (subscriber != null) {539subscriber.onError(e);540}541}542completeResponseExceptionally(e);543if (!requestBodyCF.isDone()) {544requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..545}546if (responseBodyCF != null) {547responseBodyCF.completeExceptionally(errorRef.get());548}549} finally {550connection.decrementStreamsCount(streamid);551connection.closeStream(streamid);552}553} else {554Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);555}556}557558void incoming_priority(PriorityFrame frame) {559// TODO: implement priority560throw new UnsupportedOperationException("Not implemented");561}562563private void incoming_windowUpdate(WindowUpdateFrame frame)564throws IOException565{566int amount = frame.getUpdate();567if (amount <= 0) {568Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",569streamid, amount);570connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);571} else {572assert streamid != 0;573boolean success = windowController.increaseStreamWindow(amount, streamid);574if (!success) { // overflow575connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);576}577}578}579580void incoming_pushPromise(HttpRequestImpl pushRequest,581PushedStream<T> pushStream)582throws IOException583{584if (Log.requests()) {585Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());586}587PushGroup<T> pushGroup = exchange.getPushGroup();588if (pushGroup == null || exchange.multi.requestCancelled()) {589Log.logTrace("Rejecting push promise stream " + streamid);590connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);591pushStream.close();592return;593}594595PushGroup.Acceptor<T> acceptor = null;596boolean accepted = false;597try {598acceptor = pushGroup.acceptPushRequest(pushRequest);599accepted = acceptor.accepted();600} catch (Throwable t) {601if (debug.on())602debug.log("PushPromiseHandler::applyPushPromise threw exception %s",603(Object)t);604}605if (!accepted) {606// cancel / reject607IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");608if (Log.trace()) {609Log.logTrace("No body subscriber for {0}: {1}", pushRequest,610ex.getMessage());611}612pushStream.cancelImpl(ex);613return;614}615616assert accepted && acceptor != null;617CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();618HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();619assert pushHandler != null;620621pushStream.requestSent();622pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?623// setup housekeeping for when the push is received624// TODO: deal with ignoring of CF anti-pattern625CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();626cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {627t = Utils.getCompletionCause(t);628if (Log.trace()) {629Log.logTrace("Push completed on stream {0} for {1}{2}",630pushStream.streamid, resp,631((t==null) ? "": " with exception " + t));632}633if (t != null) {634pushGroup.pushError(t);635pushResponseCF.completeExceptionally(t);636} else {637pushResponseCF.complete(resp);638}639pushGroup.pushCompleted();640});641642}643644private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {645HttpHeadersBuilder h = request.getSystemHeadersBuilder();646if (contentLength > 0) {647h.setHeader("content-length", Long.toString(contentLength));648}649HttpHeaders sysh = filterHeaders(h.build());650HttpHeaders userh = filterHeaders(request.getUserHeaders());651// Filter context restricted from userHeaders652userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));653654final HttpHeaders uh = userh;655656// Filter any headers from systemHeaders that are set in userHeaders657sysh = HttpHeaders.of(sysh.map(), (k,v) -> uh.firstValue(k).isEmpty());658659OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);660if (contentLength == 0) {661f.setFlag(HeadersFrame.END_STREAM);662endStreamSent = true;663}664return f;665}666667private boolean hasProxyAuthorization(HttpHeaders headers) {668return headers.firstValue("proxy-authorization")669.isPresent();670}671672// Determines whether we need to build a new HttpHeader object.673//674// Ideally we should pass the filter to OutgoingHeaders refactor the675// code that creates the HeaderFrame to honor the filter.676// We're not there yet - so depending on the filter we need to677// apply and the content of the header we will try to determine678// whether anything might need to be filtered.679// If nothing needs filtering then we can just use the680// original headers.681private boolean needsFiltering(HttpHeaders headers,682BiPredicate<String, String> filter) {683if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {684// we're either connecting or proxying685// slight optimization: we only need to filter out686// disabled schemes, so if there are none just687// pass through.688return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)689&& hasProxyAuthorization(headers);690} else {691// we're talking to a server, either directly or through692// a tunnel.693// Slight optimization: we only need to filter out694// proxy authorization headers, so if there are none just695// pass through.696return hasProxyAuthorization(headers);697}698}699700private HttpHeaders filterHeaders(HttpHeaders headers) {701HttpConnection conn = connection();702BiPredicate<String, String> filter = conn.headerFilter(request);703if (needsFiltering(headers, filter)) {704return HttpHeaders.of(headers.map(), filter);705}706return headers;707}708709private static HttpHeaders createPseudoHeaders(HttpRequest request) {710HttpHeadersBuilder hdrs = new HttpHeadersBuilder();711String method = request.method();712hdrs.setHeader(":method", method);713URI uri = request.uri();714hdrs.setHeader(":scheme", uri.getScheme());715// TODO: userinfo deprecated. Needs to be removed716hdrs.setHeader(":authority", uri.getAuthority());717// TODO: ensure header names beginning with : not in user headers718String query = uri.getRawQuery();719String path = uri.getRawPath();720if (path == null || path.isEmpty()) {721if (method.equalsIgnoreCase("OPTIONS")) {722path = "*";723} else {724path = "/";725}726}727if (query != null) {728path += "?" + query;729}730hdrs.setHeader(":path", Utils.encode(path));731return hdrs.build();732}733734HttpHeaders getRequestPseudoHeaders() {735return requestPseudoHeaders;736}737738/** Sets endStreamReceived. Should be called only once. */739void setEndStreamReceived() {740if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);741assert remotelyClosed == false: "Unexpected endStream already set";742remotelyClosed = true;743responseReceived();744}745746/** Tells whether, or not, the END_STREAM Flag has been seen in any frame747* received on this stream. */748private boolean endStreamReceived() {749return remotelyClosed;750}751752@Override753CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {754if (debug.on()) debug.log("sendHeadersOnly()");755if (Log.requests() && request != null) {756Log.logRequest(request.toString());757}758if (requestPublisher != null) {759requestContentLen = requestPublisher.contentLength();760} else {761requestContentLen = 0;762}763764// At this point the stream doesn't have a streamid yet.765// It will be allocated if we send the request headers.766Throwable t = errorRef.get();767if (t != null) {768if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);769return MinimalFuture.failedFuture(t);770}771772// sending the headers will cause the allocation of the stream id773OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);774connection.sendFrame(f);775CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();776cf.complete(this); // #### good enough for now777return cf;778}779780@Override781void released() {782if (streamid > 0) {783if (debug.on()) debug.log("Released stream %d", streamid);784// remove this stream from the Http2Connection map.785connection.decrementStreamsCount(streamid);786connection.closeStream(streamid);787} else {788if (debug.on()) debug.log("Can't release stream %d", streamid);789}790}791792@Override793void completed() {794// There should be nothing to do here: the stream should have795// been already closed (or will be closed shortly after).796}797798boolean registerStream(int id, boolean registerIfCancelled) {799boolean cancelled = closed || exchange.multi.requestCancelled();800if (!cancelled || registerIfCancelled) {801this.streamid = id;802connection.putStream(this, streamid);803if (debug.on()) {804debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",805streamid, cancelled, registerIfCancelled);806}807}808return !cancelled;809}810811void signalWindowUpdate() {812RequestSubscriber subscriber = requestSubscriber;813assert subscriber != null;814if (debug.on()) debug.log("Signalling window update");815subscriber.sendScheduler.runOrSchedule();816}817818static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);819class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {820// can be < 0 if the actual length is not known.821private final long contentLength;822private volatile long remainingContentLength;823private volatile Subscription subscription;824825// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.826// 1) The data that was published by the request body Publisher, and827// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.828final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();829830private final AtomicReference<Throwable> errorRef = new AtomicReference<>();831// A scheduler used to honor window updates. Writing must be paused832// when the window is exhausted, and resumed when the window acquires833// some space. The sendScheduler makes it possible to implement this834// behaviour in an asynchronous non-blocking way.835// See RequestSubscriber::trySend below.836final SequentialScheduler sendScheduler;837838RequestSubscriber(long contentLen) {839this.contentLength = contentLen;840this.remainingContentLength = contentLen;841this.sendScheduler =842SequentialScheduler.lockingScheduler(this::trySend);843}844845@Override846public void onSubscribe(Flow.Subscription subscription) {847if (this.subscription != null) {848throw new IllegalStateException("already subscribed");849}850this.subscription = subscription;851if (debug.on())852debug.log("RequestSubscriber: onSubscribe, request 1");853subscription.request(1);854}855856@Override857public void onNext(ByteBuffer item) {858if (debug.on())859debug.log("RequestSubscriber: onNext(%d)", item.remaining());860int size = outgoing.size();861assert size == 0 : "non-zero size: " + size;862onNextImpl(item);863}864865private void onNextImpl(ByteBuffer item) {866// Got some more request body bytes to send.867if (requestBodyCF.isDone()) {868// stream already cancelled, probably in timeout869sendScheduler.stop();870subscription.cancel();871return;872}873outgoing.add(item);874sendScheduler.runOrSchedule();875}876877@Override878public void onError(Throwable throwable) {879if (debug.on())880debug.log(() -> "RequestSubscriber: onError: " + throwable);881// ensure that errors are handled within the flow.882if (errorRef.compareAndSet(null, throwable)) {883sendScheduler.runOrSchedule();884}885}886887@Override888public void onComplete() {889if (debug.on()) debug.log("RequestSubscriber: onComplete");890int size = outgoing.size();891assert size == 0 || size == 1 : "non-zero or one size: " + size;892// last byte of request body has been obtained.893// ensure that everything is completed within the flow.894onNextImpl(COMPLETED);895}896897// Attempts to send the data, if any.898// Handles errors and completion state.899// Pause writing if the send window is exhausted, resume it if the900// send window has some bytes that can be acquired.901void trySend() {902try {903// handle errors raised by onError;904Throwable t = errorRef.get();905if (t != null) {906sendScheduler.stop();907if (requestBodyCF.isDone()) return;908subscription.cancel();909requestBodyCF.completeExceptionally(t);910cancelImpl(t);911return;912}913int state = streamState;914915do {916// handle COMPLETED;917ByteBuffer item = outgoing.peekFirst();918if (item == null) return;919else if (item == COMPLETED) {920sendScheduler.stop();921complete();922return;923}924925// handle bytes to send downstream926while (item.hasRemaining() && state == 0) {927if (debug.on()) debug.log("trySend: %d", item.remaining());928DataFrame df = getDataFrame(item);929if (df == null) {930if (debug.on())931debug.log("trySend: can't send yet: %d", item.remaining());932return; // the send window is exhausted: come back later933}934935if (contentLength > 0) {936remainingContentLength -= df.getDataLength();937if (remainingContentLength < 0) {938String msg = connection().getConnectionFlow()939+ " stream=" + streamid + " "940+ "[" + Thread.currentThread().getName() + "] "941+ "Too many bytes in request body. Expected: "942+ contentLength + ", got: "943+ (contentLength - remainingContentLength);944assert streamid > 0;945connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);946throw new IOException(msg);947} else if (remainingContentLength == 0) {948assert !endStreamSent : "internal error, send data after END_STREAM flag";949df.setFlag(DataFrame.END_STREAM);950endStreamSent = true;951}952} else {953assert !endStreamSent : "internal error, send data after END_STREAM flag";954}955if ((state = streamState) != 0) {956if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));957break;958}959if (debug.on())960debug.log("trySend: sending: %d", df.getDataLength());961sendDataFrame(df);962}963if (state != 0) break;964assert !item.hasRemaining();965ByteBuffer b = outgoing.removeFirst();966assert b == item;967} while (outgoing.peekFirst() != null);968969if (state != 0) {970t = errorRef.get();971if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));972throw t;973}974975if (debug.on()) debug.log("trySend: request 1");976subscription.request(1);977} catch (Throwable ex) {978if (debug.on()) debug.log("trySend: ", ex);979sendScheduler.stop();980subscription.cancel();981requestBodyCF.completeExceptionally(ex);982// need to cancel the stream to 1. tell the server983// we don't want to receive any more data and984// 2. ensure that the operation ref count will be985// decremented on the HttpClient.986cancelImpl(ex);987}988}989990private void complete() throws IOException {991long remaining = remainingContentLength;992long written = contentLength - remaining;993if (remaining > 0) {994connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);995// let trySend() handle the exception996throw new IOException(connection().getConnectionFlow()997+ " stream=" + streamid + " "998+ "[" + Thread.currentThread().getName() +"] "999+ "Too few bytes returned by the publisher ("1000+ written + "/"1001+ contentLength + ")");1002}1003if (!endStreamSent) {1004endStreamSent = true;1005connection.sendDataFrame(getEmptyEndStreamDataFrame());1006}1007requestBodyCF.complete(null);1008}1009}10101011/**1012* Send a RESET frame to tell server to stop sending data on this stream1013*/1014@Override1015public CompletableFuture<Void> ignoreBody() {1016try {1017connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);1018return MinimalFuture.completedFuture(null);1019} catch (Throwable e) {1020Log.logTrace("Error resetting stream {0}", e.toString());1021return MinimalFuture.failedFuture(e);1022}1023}10241025DataFrame getDataFrame(ByteBuffer buffer) {1026int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());1027// blocks waiting for stream send window, if exhausted1028int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);1029if (actualAmount <= 0) return null;1030ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);1031DataFrame df = new DataFrame(streamid, 0 , outBuf);1032return df;1033}10341035private DataFrame getEmptyEndStreamDataFrame() {1036return new DataFrame(streamid, DataFrame.END_STREAM, List.of());1037}10381039/**1040* A List of responses relating to this stream. Normally there is only1041* one response, but intermediate responses like 100 are allowed1042* and must be passed up to higher level before continuing. Deals with races1043* such as if responses are returned before the CFs get created by1044* getResponseAsync()1045*/10461047final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);10481049@Override1050CompletableFuture<Response> getResponseAsync(Executor executor) {1051CompletableFuture<Response> cf;1052// The code below deals with race condition that can be caused when1053// completeResponse() is being called before getResponseAsync()1054synchronized (response_cfs) {1055if (!response_cfs.isEmpty()) {1056// This CompletableFuture was created by completeResponse().1057// it will be already completed.1058cf = response_cfs.remove(0);1059// if we find a cf here it should be already completed.1060// finding a non completed cf should not happen. just assert it.1061assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";1062} else {1063// getResponseAsync() is called first. Create a CompletableFuture1064// that will be completed by completeResponse() when1065// completeResponse() is called.1066cf = new MinimalFuture<>();1067response_cfs.add(cf);1068}1069}1070if (executor != null && !cf.isDone()) {1071// protect from executing later chain of CompletableFuture operations from SelectorManager thread1072cf = cf.thenApplyAsync(r -> r, executor);1073}1074Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);1075PushGroup<?> pg = exchange.getPushGroup();1076if (pg != null) {1077// if an error occurs make sure it is recorded in the PushGroup1078cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));1079}1080return cf;1081}10821083/**1084* Completes the first uncompleted CF on list, and removes it. If there is no1085* uncompleted CF then creates one (completes it) and adds to list1086*/1087void completeResponse(Response resp) {1088synchronized (response_cfs) {1089CompletableFuture<Response> cf;1090int cfs_len = response_cfs.size();1091for (int i=0; i<cfs_len; i++) {1092cf = response_cfs.get(i);1093if (!cf.isDone()) {1094Log.logTrace("Completing response (streamid={0}): {1}",1095streamid, cf);1096if (debug.on())1097debug.log("Completing responseCF(%d) with response headers", i);1098response_cfs.remove(cf);1099cf.complete(resp);1100return;1101} // else we found the previous response: just leave it alone.1102}1103cf = MinimalFuture.completedFuture(resp);1104Log.logTrace("Created completed future (streamid={0}): {1}",1105streamid, cf);1106if (debug.on())1107debug.log("Adding completed responseCF(0) with response headers");1108response_cfs.add(cf);1109}1110}11111112// methods to update state and remove stream when finished11131114synchronized void requestSent() {1115requestSent = true;1116if (responseReceived) {1117if (debug.on()) debug.log("requestSent: streamid=%d", streamid);1118close();1119} else {1120if (debug.on()) {1121debug.log("requestSent: streamid=%d but response not received", streamid);1122}1123}1124}11251126synchronized void responseReceived() {1127responseReceived = true;1128if (requestSent) {1129if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);1130close();1131} else {1132if (debug.on()) {1133debug.log("responseReceived: streamid=%d but request not sent", streamid);1134}1135}1136}11371138/**1139* same as above but for errors1140*/1141void completeResponseExceptionally(Throwable t) {1142synchronized (response_cfs) {1143// use index to avoid ConcurrentModificationException1144// caused by removing the CF from within the loop.1145for (int i = 0; i < response_cfs.size(); i++) {1146CompletableFuture<Response> cf = response_cfs.get(i);1147if (!cf.isDone()) {1148response_cfs.remove(i);1149cf.completeExceptionally(t);1150return;1151}1152}1153response_cfs.add(MinimalFuture.failedFuture(t));1154}1155}11561157CompletableFuture<Void> sendBodyImpl() {1158requestBodyCF.whenComplete((v, t) -> requestSent());1159try {1160if (requestPublisher != null) {1161final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);1162requestPublisher.subscribe(requestSubscriber = subscriber);1163} else {1164// there is no request body, therefore the request is complete,1165// END_STREAM has already sent with outgoing headers1166requestBodyCF.complete(null);1167}1168} catch (Throwable t) {1169cancelImpl(t);1170requestBodyCF.completeExceptionally(t);1171}1172return requestBodyCF;1173}11741175@Override1176void cancel() {1177if ((streamid == 0)) {1178cancel(new IOException("Stream cancelled before streamid assigned"));1179} else {1180cancel(new IOException("Stream " + streamid + " cancelled"));1181}1182}11831184void onSubscriptionError(Throwable t) {1185errorRef.compareAndSet(null, t);1186if (debug.on()) debug.log("Got subscription error: %s", (Object)t);1187// This is the special case where the subscriber1188// has requested an illegal number of items.1189// In this case, the error doesn't come from1190// upstream, but from downstream, and we need to1191// handle the error without waiting for the inputQ1192// to be exhausted.1193stopRequested = true;1194sched.runOrSchedule();1195}11961197@Override1198void cancel(IOException cause) {1199cancelImpl(cause);1200}12011202void connectionClosing(Throwable cause) {1203Flow.Subscriber<?> subscriber =1204responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;1205errorRef.compareAndSet(null, cause);1206if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {1207sched.runOrSchedule();1208} else cancelImpl(cause);1209}12101211// This method sends a RST_STREAM frame1212void cancelImpl(Throwable e) {1213errorRef.compareAndSet(null, e);1214if (debug.on()) {1215if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);1216else debug.log("cancelling stream %d: %s", streamid, e);1217}1218if (Log.trace()) {1219if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);1220else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);1221}1222boolean closing;1223if (closing = !closed) { // assigning closing to !closed1224synchronized (this) {1225if (closing = !closed) { // assigning closing to !closed1226closed=true;1227}1228}1229}1230if (closing) { // true if the stream has not been closed yet1231if (responseSubscriber != null || pendingResponseSubscriber != null)1232sched.runOrSchedule();1233}1234completeResponseExceptionally(e);1235if (!requestBodyCF.isDone()) {1236requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..1237}1238if (responseBodyCF != null) {1239responseBodyCF.completeExceptionally(errorRef.get());1240}1241try {1242// will send a RST_STREAM frame1243if (streamid != 0 && streamState == 0) {1244e = Utils.getCompletionCause(e);1245if (e instanceof EOFException) {1246// read EOF: no need to try & send reset1247connection.decrementStreamsCount(streamid);1248connection.closeStream(streamid);1249} else {1250// no use to send CANCEL if already closed.1251sendCancelStreamFrame();1252}1253}1254} catch (Throwable ex) {1255Log.logError(ex);1256}1257}12581259void sendCancelStreamFrame() {1260// do not reset a stream until it has a streamid.1261if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {1262connection.resetStream(streamid, ResetFrame.CANCEL);1263}1264close();1265}12661267// This method doesn't send any frame1268void close() {1269if (closed) return;1270synchronized(this) {1271if (closed) return;1272closed = true;1273}1274if (debug.on()) debug.log("close stream %d", streamid);1275Log.logTrace("Closing stream {0}", streamid);1276connection.closeStream(streamid);1277Log.logTrace("Stream {0} closed", streamid);1278}12791280static class PushedStream<T> extends Stream<T> {1281final PushGroup<T> pushGroup;1282// push streams need the response CF allocated up front as it is1283// given directly to user via the multi handler callback function.1284final CompletableFuture<Response> pushCF;1285CompletableFuture<HttpResponse<T>> responseCF;1286final HttpRequestImpl pushReq;1287HttpResponse.BodyHandler<T> pushHandler;12881289PushedStream(PushGroup<T> pushGroup,1290Http2Connection connection,1291Exchange<T> pushReq) {1292// ## no request body possible, null window controller1293super(connection, pushReq, null);1294this.pushGroup = pushGroup;1295this.pushReq = pushReq.request();1296this.pushCF = new MinimalFuture<>();1297this.responseCF = new MinimalFuture<>();12981299}13001301CompletableFuture<HttpResponse<T>> responseCF() {1302return responseCF;1303}13041305synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {1306this.pushHandler = pushHandler;1307}13081309synchronized HttpResponse.BodyHandler<T> getPushHandler() {1310// ignored parameters to function can be used as BodyHandler1311return this.pushHandler;1312}13131314// Following methods call the super class but in case of1315// error record it in the PushGroup. The error method is called1316// with a null value when no error occurred (is a no-op)1317@Override1318CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {1319return super.sendBodyAsync()1320.whenComplete((ExchangeImpl<T> v, Throwable t)1321-> pushGroup.pushError(Utils.getCompletionCause(t)));1322}13231324@Override1325CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {1326return super.sendHeadersAsync()1327.whenComplete((ExchangeImpl<T> ex, Throwable t)1328-> pushGroup.pushError(Utils.getCompletionCause(t)));1329}13301331@Override1332CompletableFuture<Response> getResponseAsync(Executor executor) {1333CompletableFuture<Response> cf = pushCF.whenComplete(1334(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));1335if(executor!=null && !cf.isDone()) {1336cf = cf.thenApplyAsync( r -> r, executor);1337}1338return cf;1339}13401341@Override1342CompletableFuture<T> readBodyAsync(1343HttpResponse.BodyHandler<T> handler,1344boolean returnConnectionToPool,1345Executor executor)1346{1347return super.readBodyAsync(handler, returnConnectionToPool, executor)1348.whenComplete((v, t) -> pushGroup.pushError(t));1349}13501351@Override1352void completeResponse(Response r) {1353Log.logResponse(r::toString);1354pushCF.complete(r); // not strictly required for push API1355// start reading the body using the obtained BodySubscriber1356CompletableFuture<Void> start = new MinimalFuture<>();1357start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))1358.whenComplete((T body, Throwable t) -> {1359if (t != null) {1360responseCF.completeExceptionally(t);1361} else {1362HttpResponseImpl<T> resp =1363new HttpResponseImpl<>(r.request, r, null, body, getExchange());1364responseCF.complete(resp);1365}1366});1367start.completeAsync(() -> null, getExchange().executor());1368}13691370@Override1371void completeResponseExceptionally(Throwable t) {1372pushCF.completeExceptionally(t);1373}13741375// @Override1376// synchronized void responseReceived() {1377// super.responseReceived();1378// }13791380// create and return the PushResponseImpl1381@Override1382protected void handleResponse() {1383HttpHeaders responseHeaders = responseHeadersBuilder.build();1384responseCode = (int)responseHeaders1385.firstValueAsLong(":status")1386.orElse(-1);13871388if (responseCode == -1) {1389completeResponseExceptionally(new IOException("No status code"));1390}13911392this.response = new Response(1393pushReq, exchange, responseHeaders, connection(),1394responseCode, HttpClient.Version.HTTP_2);13951396/* TODO: review if needs to be removed1397the value is not used, but in case `content-length` doesn't parse1398as long, there will be NumberFormatException. If left as is, make1399sure code up the stack handles NFE correctly. */1400responseHeaders.firstValueAsLong("content-length");14011402if (Log.headers()) {1403StringBuilder sb = new StringBuilder("RESPONSE HEADERS");1404sb.append(" (streamid=").append(streamid).append("):\n");1405Log.dumpHeaders(sb, " ", responseHeaders);1406Log.logHeaders(sb.toString());1407}14081409rspHeadersConsumer.reset();14101411// different implementations for normal streams and pushed streams1412completeResponse(response);1413}1414}14151416final class StreamWindowUpdateSender extends WindowUpdateSender {14171418StreamWindowUpdateSender(Http2Connection connection) {1419super(connection);1420}14211422@Override1423int getStreamId() {1424return streamid;1425}14261427@Override1428String dbgString() {1429String dbg = dbgString;1430if (dbg != null) return dbg;1431if (streamid == 0) {1432return connection.dbgString() + ":WindowUpdateSender(stream: ?)";1433} else {1434dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";1435return dbgString = dbg;1436}1437}1438}14391440/**1441* Returns true if this exchange was canceled.1442* @return true if this exchange was canceled.1443*/1444synchronized boolean isCanceled() {1445return errorRef.get() != null;1446}14471448/**1449* Returns the cause for which this exchange was canceled, if available.1450* @return the cause for which this exchange was canceled, if available.1451*/1452synchronized Throwable getCancelCause() {1453return errorRef.get();1454}14551456final String dbgString() {1457return connection.dbgString() + "/Stream("+streamid+")";1458}14591460private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {14611462void reset() {1463super.reset();1464responseHeadersBuilder.clear();1465debug.log("Response builder cleared, ready to receive new headers.");1466}14671468@Override1469public void onDecoded(CharSequence name, CharSequence value)1470throws UncheckedIOException1471{1472String n = name.toString();1473String v = value.toString();1474super.onDecoded(n, v);1475responseHeadersBuilder.addHeader(n, v);1476if (Log.headers() && Log.trace()) {1477Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",1478streamid, n, v);1479}1480}1481}14821483private static final VarHandle STREAM_STATE;1484private static final VarHandle DEREGISTERED;1485static {1486try {1487STREAM_STATE = MethodHandles.lookup()1488.findVarHandle(Stream.class, "streamState", int.class);1489DEREGISTERED = MethodHandles.lookup()1490.findVarHandle(Stream.class, "deRegistered", boolean.class);1491} catch (Exception x) {1492throw new ExceptionInInitializerError(x);1493}1494}1495}149614971498