Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.net.InetSocketAddress;29import java.net.http.HttpClient;30import java.net.http.HttpResponse;31import java.net.http.HttpResponse.BodyHandler;32import java.net.http.HttpResponse.BodySubscriber;33import java.nio.ByteBuffer;34import java.util.Objects;35import java.util.concurrent.CompletableFuture;36import java.util.LinkedList;37import java.util.List;38import java.util.concurrent.ConcurrentLinkedDeque;39import java.util.concurrent.Executor;40import java.util.concurrent.Flow;41import jdk.internal.net.http.common.Demand;42import jdk.internal.net.http.common.Log;43import jdk.internal.net.http.common.FlowTube;44import jdk.internal.net.http.common.Logger;45import jdk.internal.net.http.common.SequentialScheduler;46import jdk.internal.net.http.common.MinimalFuture;47import jdk.internal.net.http.common.Utils;48import static java.net.http.HttpClient.Version.HTTP_1_1;49import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;5051/**52* Encapsulates one HTTP/1.1 request/response exchange.53*/54class Http1Exchange<T> extends ExchangeImpl<T> {5556final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);57final HttpRequestImpl request; // main request58final Http1Request requestAction;59private volatile Http1Response<T> response;60final HttpConnection connection;61final HttpClientImpl client;62final Executor executor;63private final Http1AsyncReceiver asyncReceiver;64private volatile boolean upgraded;6566/** Records a possible cancellation raised before any operation67* has been initiated, or an error received while sending the request. */68private Throwable failed;69private final List<CompletableFuture<?>> operations; // used for cancel7071/** Must be held when operating on any internal state or data. */72private final Object lock = new Object();7374/** Holds the outgoing data, either the headers or a request body part. Or75* an error from the request body publisher. At most there can be ~2 pieces76* of outgoing data ( onComplete|onError can be invoked without demand ).*/77final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();7879/** The write publisher, responsible for writing the complete request ( both80* headers and body ( if any ). */81private final Http1Publisher writePublisher = new Http1Publisher();8283/** Completed when the header have been published, or there is an error */84private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>();85/** Completed when the body has been published, or there is an error */86private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();8788/** The subscriber to the request's body published. Maybe null. */89private volatile Http1BodySubscriber bodySubscriber;9091enum State { INITIAL,92HEADERS,93BODY,94ERROR, // terminal state95COMPLETING,96COMPLETED } // terminal state9798private State state = State.INITIAL;99100/** A carrier for either data or an error. Used to carry data, and communicate101* errors from the request ( both headers and body ) to the exchange. */102static class DataPair {103Throwable throwable;104List<ByteBuffer> data;105DataPair(List<ByteBuffer> data, Throwable throwable){106this.data = data;107this.throwable = throwable;108}109@Override110public String toString() {111return "DataPair [data=" + data + ", throwable=" + throwable + "]";112}113}114115/** An abstract supertype for HTTP/1.1 body subscribers. There are two116* concrete implementations: {@link Http1Request.StreamSubscriber}, and117* {@link Http1Request.FixedContentSubscriber}, for receiving chunked and118* fixed length bodies, respectively. */119static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {120final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();121private volatile Flow.Subscription subscription;122volatile boolean complete;123private final Logger debug;124Http1BodySubscriber(Logger debug) {125assert debug != null;126this.debug = debug;127}128129/** Final sentinel in the stream of request body. */130static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));131132final void request(long n) {133if (debug.on())134debug.log("Http1BodySubscriber requesting %d, from %s",135n, subscription);136subscription.request(n);137}138139/** A current-state message suitable for inclusion in an exception detail message. */140abstract String currentStateMessage();141142final boolean isSubscribed() {143return subscription != null;144}145146final void setSubscription(Flow.Subscription subscription) {147this.subscription = subscription;148whenSubscribed.complete(subscription);149}150151final void cancelSubscription() {152try {153subscription.cancel();154} catch(Throwable t) {155String msg = "Ignoring exception raised when canceling BodyPublisher subscription";156if (debug.on()) debug.log("%s: %s", msg, t);157Log.logError("{0}: {1}", msg, (Object)t);158}159}160161static Http1BodySubscriber completeSubscriber(Logger debug) {162return new Http1BodySubscriber(debug) {163@Override public void onSubscribe(Flow.Subscription subscription) { error(); }164@Override public void onNext(ByteBuffer item) { error(); }165@Override public void onError(Throwable throwable) { error(); }166@Override public void onComplete() { error(); }167@Override String currentStateMessage() { return null; }168private void error() {169throw new InternalError("should not reach here");170}171};172}173}174175@Override176public String toString() {177return "HTTP/1.1 " + request.toString();178}179180HttpRequestImpl request() {181return request;182}183184Http1Exchange(Exchange<T> exchange, HttpConnection connection)185throws IOException186{187super(exchange);188this.request = exchange.request();189this.client = exchange.client();190this.executor = exchange.executor();191this.operations = new LinkedList<>();192operations.add(headersSentCF);193operations.add(bodySentCF);194if (connection != null) {195this.connection = connection;196} else {197InetSocketAddress addr = request.getAddress();198this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);199}200this.requestAction = new Http1Request(request, this);201this.asyncReceiver = new Http1AsyncReceiver(executor, this);202}203204@Override205HttpConnection connection() {206return connection;207}208209private void connectFlows(HttpConnection connection) {210FlowTube tube = connection.getConnectionFlow();211if (debug.on()) debug.log("%s connecting flows", tube);212213// Connect the flow to our Http1TubeSubscriber:214// asyncReceiver.subscriber().215tube.connectFlows(writePublisher,216asyncReceiver.subscriber());217}218219@Override220CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {221// create the response before sending the request headers, so that222// the response can set the appropriate receivers.223if (debug.on()) debug.log("Sending headers only");224// If the first attempt to read something triggers EOF, or225// IOException("channel reset by peer"), we're going to retry.226// Instruct the asyncReceiver to throw ConnectionExpiredException227// to force a retry.228asyncReceiver.setRetryOnError(true);229if (response == null) {230response = new Http1Response<>(connection, this, asyncReceiver);231}232233if (debug.on()) debug.log("response created in advance");234235CompletableFuture<Void> connectCF;236if (!connection.connected()) {237if (debug.on()) debug.log("initiating connect async");238connectCF = connection.connectAsync(exchange)239.thenCompose(unused -> connection.finishConnect());240Throwable cancelled;241synchronized (lock) {242if ((cancelled = failed) == null) {243operations.add(connectCF);244}245}246if (cancelled != null) {247if (client.isSelectorThread()) {248executor.execute(() ->249connectCF.completeExceptionally(cancelled));250} else {251connectCF.completeExceptionally(cancelled);252}253}254} else {255connectCF = new MinimalFuture<>();256connectCF.complete(null);257}258259return connectCF260.thenCompose(unused -> {261CompletableFuture<Void> cf = new MinimalFuture<>();262try {263asyncReceiver.whenFinished.whenComplete((r,t) -> {264if (t != null) {265if (debug.on())266debug.log("asyncReceiver finished (failed=%s)", (Object)t);267if (!headersSentCF.isDone())268headersSentCF.completeAsync(() -> this, executor);269}270});271connectFlows(connection);272273if (debug.on()) debug.log("requestAction.headers");274List<ByteBuffer> data = requestAction.headers();275synchronized (lock) {276state = State.HEADERS;277}278if (debug.on()) debug.log("setting outgoing with headers");279assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;280appendToOutgoing(data);281cf.complete(null);282return cf;283} catch (Throwable t) {284if (debug.on()) debug.log("Failed to send headers: %s", t);285headersSentCF.completeExceptionally(t);286bodySentCF.completeExceptionally(t);287connection.close();288cf.completeExceptionally(t);289return cf;290} })291.thenCompose(unused -> headersSentCF);292}293294private void cancelIfFailed(Flow.Subscription s) {295asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {296if (debug.on())297debug.log("asyncReceiver finished (failed=%s)", (Object)t);298if (t != null) {299s.cancel();300// Don't complete exceptionally here as 't'301// might not be the right exception: it will302// not have been decorated yet.303// t is an exception raised by the read side,304// an EOFException or Broken Pipe...305// We are cancelling the BodyPublisher subscription306// and completing bodySentCF to allow the next step307// to flow and call readHeaderAsync, which will308// get the right exception from the asyncReceiver.309bodySentCF.complete(this);310}311}, executor);312}313314@Override315CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {316assert headersSentCF.isDone();317if (debug.on()) debug.log("sendBodyAsync");318try {319bodySubscriber = requestAction.continueRequest();320if (debug.on()) debug.log("bodySubscriber is %s",321bodySubscriber == null ? null : bodySubscriber.getClass());322if (bodySubscriber == null) {323bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);324appendToOutgoing(Http1BodySubscriber.COMPLETED);325} else {326// start327bodySubscriber.whenSubscribed328.thenAccept((s) -> cancelIfFailed(s))329.thenAccept((s) -> requestMoreBody());330}331} catch (Throwable t) {332cancelImpl(t);333bodySentCF.completeExceptionally(t);334}335return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);336}337338@Override339CompletableFuture<Response> getResponseAsync(Executor executor) {340if (debug.on()) debug.log("reading headers");341CompletableFuture<Response> cf = response.readHeadersAsync(executor);342Throwable cause;343synchronized (lock) {344operations.add(cf);345cause = failed;346failed = null;347}348349if (cause != null) {350Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"351+ "\n\tCompleting exceptionally with {2}\n",352request.uri(),353request.timeout().isPresent() ?354// calling duration.toMillis() can throw an exception.355// this is just debugging, we don't care if it overflows.356(request.timeout().get().getSeconds() * 1000357+ request.timeout().get().getNano() / 1000000) : -1,358cause);359boolean acknowledged = cf.completeExceptionally(cause);360if (debug.on())361debug.log(acknowledged ? ("completed response with " + cause)362: ("response already completed, ignoring " + cause));363}364return Utils.wrapForDebug(debug, "getResponseAsync", cf);365}366367@Override368CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,369boolean returnConnectionToPool,370Executor executor)371{372BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),373response.responseHeaders(),374HTTP_1_1));375CompletableFuture<T> bodyCF = response.readBody(bs,376returnConnectionToPool,377executor);378return bodyCF;379}380381@Override382CompletableFuture<Void> ignoreBody() {383return response.ignoreBody(executor);384}385386// Used for those response codes that have no body associated387@Override388public void nullBody(HttpResponse<T> resp, Throwable t) {389response.nullBody(resp, t);390}391392393ByteBuffer drainLeftOverBytes() {394synchronized (lock) {395asyncReceiver.stop();396return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);397}398}399400void released() {401Http1Response<T> resp = this.response;402if (resp != null) resp.completed();403asyncReceiver.clear();404}405406void completed() {407Http1Response<T> resp = this.response;408if (resp != null) resp.completed();409}410411/**412* Cancel checks to see if request and responseAsync finished already.413* If not it closes the connection and completes all pending operations414*/415@Override416void cancel() {417cancelImpl(new IOException("Request cancelled"));418}419420/**421* Cancel checks to see if request and responseAsync finished already.422* If not it closes the connection and completes all pending operations423*/424@Override425void cancel(IOException cause) {426cancelImpl(cause);427}428429private void cancelImpl(Throwable cause) {430LinkedList<CompletableFuture<?>> toComplete = null;431int count = 0;432Throwable error;433synchronized (lock) {434if ((error = failed) == null) {435failed = error = cause;436}437if (debug.on()) {438debug.log(request.uri() + ": " + error);439}440if (requestAction != null && requestAction.finished()441&& response != null && response.finished()) {442return;443}444writePublisher.writeScheduler.stop();445if (operations.isEmpty()) {446Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."447+ "\n\tCan''t cancel yet with {2}",448request.uri(),449request.timeout().isPresent() ?450// calling duration.toMillis() can throw an exception.451// this is just debugging, we don't care if it overflows.452(request.timeout().get().getSeconds() * 1000453+ request.timeout().get().getNano() / 1000000) : -1,454cause);455} else {456for (CompletableFuture<?> cf : operations) {457if (!cf.isDone()) {458if (toComplete == null) toComplete = new LinkedList<>();459toComplete.add(cf);460count++;461}462}463operations.clear();464}465}466try {467Log.logError("Http1Exchange.cancel: count=" + count);468if (toComplete != null) {469// We might be in the selector thread in case of timeout, when470// the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()471// There may or may not be other places that reach here472// from the SelectorManager thread, so just make sure we473// don't complete any CF from within the selector manager474// thread.475Executor exec = client.isSelectorThread()476? executor477: this::runInline;478Throwable x = error;479while (!toComplete.isEmpty()) {480CompletableFuture<?> cf = toComplete.poll();481exec.execute(() -> {482if (cf.completeExceptionally(x)) {483if (debug.on())484debug.log("%s: completed cf with %s", request.uri(), x);485}486});487}488}489} finally {490if (!upgraded)491connection.close();492}493}494495void upgraded() {496upgraded = true;497}498499private void runInline(Runnable run) {500assert !client.isSelectorThread();501run.run();502}503504/** Returns true if this exchange was canceled. */505boolean isCanceled() {506synchronized (lock) {507return failed != null;508}509}510511/** Returns the cause for which this exchange was canceled, if available. */512Throwable getCancelCause() {513synchronized (lock) {514return failed;515}516}517518/** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */519void appendToOutgoing(Throwable throwable) {520appendToOutgoing(new DataPair(null, throwable));521}522523/** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */524void appendToOutgoing(List<ByteBuffer> item) {525appendToOutgoing(new DataPair(item, null));526}527528private void appendToOutgoing(DataPair dp) {529if (debug.on()) debug.log("appending to outgoing " + dp);530outgoing.add(dp);531writePublisher.writeScheduler.runOrSchedule();532}533534/** Tells whether, or not, there is any outgoing data that can be published,535* or if there is an error. */536private boolean hasOutgoing() {537return !outgoing.isEmpty();538}539540private void requestMoreBody() {541try {542if (debug.on()) debug.log("requesting more request body from the subscriber");543bodySubscriber.request(1);544} catch (Throwable t) {545if (debug.on()) debug.log("Subscription::request failed", t);546cancelImpl(t);547bodySentCF.completeExceptionally(t);548}549}550551private void cancelUpstreamSubscription() {552final Executor exec = client.theExecutor();553if (debug.on()) debug.log("cancelling upstream publisher");554if (bodySubscriber != null) {555exec.execute(bodySubscriber::cancelSubscription);556} else if (debug.on()) {557debug.log("bodySubscriber is null");558}559}560561// Invoked only by the publisher562// ALL tasks should execute off the Selector-Manager thread563/** Returns the next portion of the HTTP request, or the error. */564private DataPair getOutgoing() {565final Executor exec = client.theExecutor();566final DataPair dp = outgoing.pollFirst();567568if (writePublisher.cancelled) {569cancelUpstreamSubscription();570headersSentCF.completeAsync(() -> this, exec);571bodySentCF.completeAsync(() -> this, exec);572return null;573}574575if (dp == null) // publisher has not published anything yet576return null;577578if (dp.throwable != null) {579synchronized (lock) {580state = State.ERROR;581}582exec.execute(() -> {583headersSentCF.completeExceptionally(dp.throwable);584bodySentCF.completeExceptionally(dp.throwable);585connection.close();586});587return dp;588}589590switch (state) {591case HEADERS:592synchronized (lock) {593state = State.BODY;594}595// completeAsync, since dependent tasks should run in another thread596if (debug.on()) debug.log("initiating completion of headersSentCF");597headersSentCF.completeAsync(() -> this, exec);598break;599case BODY:600if (dp.data == Http1BodySubscriber.COMPLETED) {601synchronized (lock) {602state = State.COMPLETING;603}604if (debug.on()) debug.log("initiating completion of bodySentCF");605bodySentCF.completeAsync(() -> this, exec);606} else {607exec.execute(this::requestMoreBody);608}609break;610case INITIAL:611case ERROR:612case COMPLETING:613case COMPLETED:614default:615assert false : "Unexpected state:" + state;616}617618return dp;619}620621/** A Publisher of HTTP/1.1 headers and request body. */622final class Http1Publisher implements FlowTube.TubePublisher {623624final Logger debug = Utils.getDebugLogger(this::dbgString);625volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;626volatile boolean cancelled;627final Http1WriteSubscription subscription = new Http1WriteSubscription();628final Demand demand = new Demand();629final SequentialScheduler writeScheduler =630SequentialScheduler.lockingScheduler(new WriteTask());631632@Override633public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {634assert state == State.INITIAL;635Objects.requireNonNull(s);636assert subscriber == null;637638subscriber = s;639if (debug.on()) debug.log("got subscriber: %s", s);640s.onSubscribe(subscription);641}642643volatile String dbgTag;644String dbgString() {645String tag = dbgTag;646Object flow = connection.getConnectionFlow();647if (tag == null && flow != null) {648dbgTag = tag = "Http1Publisher(" + flow + ")";649} else if (tag == null) {650tag = "Http1Publisher(?)";651}652return tag;653}654655@SuppressWarnings("fallthrough")656private boolean checkRequestCancelled() {657if (exchange.multi.requestCancelled()) {658if (debug.on()) debug.log("request cancelled");659if (subscriber == null) {660if (debug.on()) debug.log("no subscriber yet");661return true;662}663switch (state) {664case BODY:665cancelUpstreamSubscription();666// fall trough to HEADERS667case HEADERS:668Throwable cause = getCancelCause();669if (cause == null) cause = new IOException("Request cancelled");670subscriber.onError(cause);671writeScheduler.stop();672return true;673}674}675return false;676}677678679final class WriteTask implements Runnable {680@Override681public void run() {682assert state != State.COMPLETED : "Unexpected state:" + state;683if (debug.on()) debug.log("WriteTask");684685if (cancelled) {686if (debug.on()) debug.log("handling cancellation");687writeScheduler.stop();688getOutgoing();689return;690}691692if (checkRequestCancelled()) return;693694if (subscriber == null) {695if (debug.on()) debug.log("no subscriber yet");696return;697}698699if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());700while (hasOutgoing() && demand.tryDecrement()) {701DataPair dp = getOutgoing();702if (dp == null)703break;704705if (dp.throwable != null) {706if (debug.on()) debug.log("onError");707// Do not call the subscriber's onError, it is not required.708writeScheduler.stop();709} else {710List<ByteBuffer> data = dp.data;711if (data == Http1BodySubscriber.COMPLETED) {712synchronized (lock) {713assert state == State.COMPLETING : "Unexpected state:" + state;714state = State.COMPLETED;715}716if (debug.on())717debug.log("completed, stopping %s", writeScheduler);718writeScheduler.stop();719// Do nothing more. Just do not publish anything further.720// The next Subscriber will eventually take over.721722} else {723if (checkRequestCancelled()) return;724if (debug.on())725debug.log("onNext with " + Utils.remaining(data) + " bytes");726subscriber.onNext(data);727}728}729}730}731}732733final class Http1WriteSubscription implements Flow.Subscription {734735@Override736public void request(long n) {737if (cancelled)738return; //no-op739demand.increase(n);740if (debug.on())741debug.log("subscription request(%d), demand=%s", n, demand);742writeScheduler.runOrSchedule(client.theExecutor());743}744745@Override746public void cancel() {747if (debug.on()) debug.log("subscription cancelled");748if (cancelled)749return; //no-op750cancelled = true;751writeScheduler.runOrSchedule(client.theExecutor());752}753}754}755756HttpClient client() {757return client;758}759760String dbgString() {761return "Http1Exchange";762}763}764765766