Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
41171 views
/*1* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.EOFException;28import java.lang.System.Logger.Level;29import java.nio.ByteBuffer;30import java.util.List;31import java.util.concurrent.CompletableFuture;32import java.util.concurrent.CompletionStage;33import java.util.concurrent.Executor;34import java.util.concurrent.Flow;35import java.util.concurrent.atomic.AtomicBoolean;36import java.util.concurrent.atomic.AtomicLong;37import java.util.function.Consumer;38import java.util.function.Function;39import java.net.http.HttpHeaders;40import java.net.http.HttpResponse;41import jdk.internal.net.http.ResponseContent.BodyParser;42import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser;43import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;44import jdk.internal.net.http.common.Log;45import jdk.internal.net.http.common.Logger;46import jdk.internal.net.http.common.MinimalFuture;47import jdk.internal.net.http.common.Utils;48import static java.net.http.HttpClient.Version.HTTP_1_1;49import static java.net.http.HttpResponse.BodySubscribers.discarding;50import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;51import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED;5253/**54* Handles a HTTP/1.1 response (headers + body).55* There can be more than one of these per Http exchange.56*/57class Http1Response<T> {5859private volatile ResponseContent content;60private final HttpRequestImpl request;61private Response response;62private final HttpConnection connection;63private HttpHeaders headers;64private int responseCode;65private final Http1Exchange<T> exchange;66private boolean return2Cache; // return connection to cache when finished67private final HeadersReader headersReader; // used to read the headers68private final BodyReader bodyReader; // used to read the body69private final Http1AsyncReceiver asyncReceiver;70private volatile EOFException eof;71private volatile BodyParser bodyParser;72// max number of bytes of (fixed length) body to ignore on redirect73private final static int MAX_IGNORE = 1024;7475// Revisit: can we get rid of this?76static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}77private volatile State readProgress = State.INITIAL;7879final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);80final static AtomicLong responseCount = new AtomicLong();81final long id = responseCount.incrementAndGet();82private Http1HeaderParser hd;8384Http1Response(HttpConnection conn,85Http1Exchange<T> exchange,86Http1AsyncReceiver asyncReceiver) {87this.readProgress = State.INITIAL;88this.request = exchange.request();89this.exchange = exchange;90this.connection = conn;91this.asyncReceiver = asyncReceiver;92headersReader = new HeadersReader(this::advance);93bodyReader = new BodyReader(this::advance);9495hd = new Http1HeaderParser();96readProgress = State.READING_HEADERS;97headersReader.start(hd);98asyncReceiver.subscribe(headersReader);99}100101String dbgTag;102private String dbgString() {103String dbg = dbgTag;104if (dbg == null) {105String cdbg = connection.dbgTag;106if (cdbg != null) {107dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";108} else {109dbg = "Http1Response(id=" + id + ")";110}111}112return dbg;113}114115// The ClientRefCountTracker is used to track the state116// of a pending operation. Altough there usually is a single117// point where the operation starts, it may terminate at118// different places.119private final class ClientRefCountTracker {120final HttpClientImpl client = connection.client();121// state & 0x01 != 0 => acquire called122// state & 0x02 != 0 => tryRelease called123byte state;124125public synchronized void acquire() {126if (state == 0) {127// increment the reference count on the HttpClientImpl128// to prevent the SelectorManager thread from exiting129// until our operation is complete.130if (debug.on())131debug.log("Operation started: incrementing ref count for %s", client);132client.reference();133state = 0x01;134} else {135if (debug.on())136debug.log("Operation ref count for %s is already %s",137client, ((state & 0x2) == 0x2) ? "released." : "incremented!" );138assert (state & 0x01) == 0 : "reference count already incremented";139}140}141142public synchronized void tryRelease() {143if (state == 0x01) {144// decrement the reference count on the HttpClientImpl145// to allow the SelectorManager thread to exit if no146// other operation is pending and the facade is no147// longer referenced.148if (debug.on())149debug.log("Operation finished: decrementing ref count for %s", client);150client.unreference();151} else if (state == 0) {152if (debug.on())153debug.log("Operation finished: releasing ref count for %s", client);154} else if ((state & 0x02) == 0x02) {155if (debug.on())156debug.log("ref count for %s already released", client);157}158state |= 0x02;159}160}161162private volatile boolean firstTimeAround = true;163164public CompletableFuture<Response> readHeadersAsync(Executor executor) {165if (debug.on())166debug.log("Reading Headers: (remaining: "167+ asyncReceiver.remaining() +") " + readProgress);168169if (firstTimeAround) {170if (debug.on()) debug.log("First time around");171firstTimeAround = false;172} else {173// with expect continue we will resume reading headers + body.174asyncReceiver.unsubscribe(bodyReader);175bodyReader.reset();176177hd = new Http1HeaderParser();178readProgress = State.READING_HEADERS;179headersReader.reset();180headersReader.start(hd);181asyncReceiver.subscribe(headersReader);182}183184CompletableFuture<State> cf = headersReader.completion();185assert cf != null : "parsing not started";186if (debug.on()) {187debug.log("headersReader is %s",188cf == null ? "not yet started"189: cf.isDone() ? "already completed"190: "not yet completed");191}192193Function<State, Response> lambda = (State completed) -> {194assert completed == State.READING_HEADERS;195if (debug.on())196debug.log("Reading Headers: creating Response object;"197+ " state is now " + readProgress);198asyncReceiver.unsubscribe(headersReader);199responseCode = hd.responseCode();200headers = hd.headers();201202response = new Response(request,203exchange.getExchange(),204headers,205connection,206responseCode,207HTTP_1_1);208209if (Log.headers()) {210StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");211Log.dumpHeaders(sb, " ", headers);212Log.logHeaders(sb.toString());213}214215return response;216};217218if (executor != null) {219return cf.thenApplyAsync(lambda, executor);220} else {221return cf.thenApply(lambda);222}223}224225private boolean finished;226227synchronized void completed() {228finished = true;229}230231synchronized boolean finished() {232return finished;233}234235/**236* Return known fixed content length or -1 if chunked, or -2 if no content-length237* information in which case, connection termination delimits the response body238*/239long fixupContentLen(long clen) {240if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {241return 0L;242}243if (clen == -1L) {244if (headers.firstValue("Transfer-encoding").orElse("")245.equalsIgnoreCase("chunked")) {246return -1L;247}248if (responseCode == 101) {249// this is a h2c or websocket upgrade, contentlength must be zero250return 0L;251}252return -2L;253}254return clen;255}256257/**258* Read up to MAX_IGNORE bytes discarding259*/260public CompletableFuture<Void> ignoreBody(Executor executor) {261int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);262if (clen == -1 || clen > MAX_IGNORE) {263connection.close();264return MinimalFuture.completedFuture(null); // not treating as error265} else {266return readBody(discarding(), !request.isWebSocket(), executor);267}268}269270// Used for those response codes that have no body associated271public void nullBody(HttpResponse<T> resp, Throwable t) {272if (t != null) connection.close();273else {274return2Cache = !request.isWebSocket();275onFinished();276}277}278279static final Flow.Subscription NOP = new Flow.Subscription() {280@Override281public void request(long n) { }282public void cancel() { }283};284285/**286* The Http1AsyncReceiver ensures that all calls to287* the subscriber, including onSubscribe, occur sequentially.288* There could however be some race conditions that could happen289* in case of unexpected errors thrown at unexpected places, which290* may cause onError to be called multiple times.291* The Http1BodySubscriber will ensure that the user subscriber292* is actually completed only once - and only after it is293* subscribed.294* @param <U> The type of response.295*/296final static class Http1BodySubscriber<U> implements TrustedSubscriber<U> {297final HttpResponse.BodySubscriber<U> userSubscriber;298final AtomicBoolean completed = new AtomicBoolean();299volatile Throwable withError;300volatile boolean subscribed;301Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {302this.userSubscriber = userSubscriber;303}304305@Override306public boolean needsExecutor() {307return TrustedSubscriber.needsExecutor(userSubscriber);308}309310// propagate the error to the user subscriber, even if not311// subscribed yet.312private void propagateError(Throwable t) {313assert t != null;314try {315// if unsubscribed at this point, it will not316// get subscribed later - so do it now and317// propagate the error318if (subscribed == false) {319subscribed = true;320userSubscriber.onSubscribe(NOP);321}322} finally {323// if onError throws then there is nothing to do324// here: let the caller deal with it by logging325// and closing the connection.326userSubscriber.onError(t);327}328}329330// complete the subscriber, either normally or exceptionally331// ensure that the subscriber is completed only once.332private void complete(Throwable t) {333if (completed.compareAndSet(false, true)) {334t = withError = Utils.getCompletionCause(t);335if (t == null) {336assert subscribed;337try {338userSubscriber.onComplete();339} catch (Throwable x) {340// Simply propagate the error by calling341// onError on the user subscriber, and let the342// connection be reused since we should have received343// and parsed all the bytes when we reach here.344// If onError throws in turn, then we will simply345// let that new exception flow up to the caller346// and let it deal with it.347// (i.e: log and close the connection)348// Note that rethrowing here could introduce a349// race that might cause the next send() operation to350// fail as the connection has already been put back351// into the cache when we reach here.352propagateError(t = withError = Utils.getCompletionCause(x));353}354} else {355propagateError(t);356}357}358}359360@Override361public CompletionStage<U> getBody() {362return userSubscriber.getBody();363}364365@Override366public void onSubscribe(Flow.Subscription subscription) {367if (!subscribed) {368subscribed = true;369userSubscriber.onSubscribe(subscription);370} else {371// could be already subscribed and completed372// if an unexpected error occurred before the actual373// subscription - though that's not supposed374// happen.375assert completed.get();376}377}378@Override379public void onNext(List<ByteBuffer> item) {380assert !completed.get();381userSubscriber.onNext(item);382}383@Override384public void onError(Throwable throwable) {385complete(throwable);386}387@Override388public void onComplete() {389complete(null);390}391}392393public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,394boolean return2Cache,395Executor executor) {396if (debug.on()) {397debug.log("readBody: return2Cache: " + return2Cache);398if (request.isWebSocket() && return2Cache && connection != null) {399debug.log("websocket connection will be returned to cache: "400+ connection.getClass() + "/" + connection );401}402}403assert !return2Cache || !request.isWebSocket();404this.return2Cache = return2Cache;405final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);406407final CompletableFuture<U> cf = new MinimalFuture<>();408409long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);410final long clen = fixupContentLen(clen0);411412// expect-continue reads headers and body twice.413// if we reach here, we must reset the headersReader state.414asyncReceiver.unsubscribe(headersReader);415headersReader.reset();416ClientRefCountTracker refCountTracker = new ClientRefCountTracker();417418// We need to keep hold on the client facade until the419// tracker has been incremented.420connection.client().reference();421executor.execute(() -> {422try {423content = new ResponseContent(424connection, clen, headers, subscriber,425this::onFinished426);427if (cf.isCompletedExceptionally()) {428// if an error occurs during subscription429connection.close();430return;431}432// increment the reference count on the HttpClientImpl433// to prevent the SelectorManager thread from exiting until434// the body is fully read.435refCountTracker.acquire();436bodyParser = content.getBodyParser(437(t) -> {438try {439if (t != null) {440try {441subscriber.onError(t);442} finally {443cf.completeExceptionally(t);444}445}446} finally {447bodyReader.onComplete(t);448if (t != null) {449connection.close();450}451}452});453bodyReader.start(bodyParser);454CompletableFuture<State> bodyReaderCF = bodyReader.completion();455asyncReceiver.subscribe(bodyReader);456assert bodyReaderCF != null : "parsing not started";457// Make sure to keep a reference to asyncReceiver from458// within this459CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> {460t = Utils.getCompletionCause(t);461try {462if (t == null) {463if (debug.on()) debug.log("Finished reading body: " + s);464assert s == State.READING_BODY;465}466if (t != null) {467subscriber.onError(t);468cf.completeExceptionally(t);469}470} catch (Throwable x) {471// not supposed to happen472asyncReceiver.onReadError(x);473} finally {474// we're done: release the ref count for475// the current operation.476refCountTracker.tryRelease();477}478});479connection.addTrailingOperation(trailingOp);480} catch (Throwable t) {481if (debug.on()) debug.log("Failed reading body: " + t);482try {483subscriber.onError(t);484cf.completeExceptionally(t);485} finally {486asyncReceiver.onReadError(t);487}488} finally {489connection.client().unreference();490}491});492493ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> {494cf.completeExceptionally(t);495asyncReceiver.setRetryOnError(false);496asyncReceiver.onReadError(t);497});498499return cf.whenComplete((s,t) -> {500if (t != null) {501// If an exception occurred, release the502// ref count for the current operation, as503// it may never be triggered otherwise504// (BodySubscriber ofInputStream)505// If there was no exception then the506// ref count will be/have been released when507// the last byte of the response is/was received508refCountTracker.tryRelease();509}510});511}512513514private void onFinished() {515asyncReceiver.clear();516if (return2Cache) {517Log.logTrace("Attempting to return connection to the pool: {0}", connection);518// TODO: need to do something here?519// connection.setAsyncCallbacks(null, null, null);520521// don't return the connection to the cache if EOF happened.522if (debug.on())523debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool");524connection.closeOrReturnToCache(eof == null ? headers : null);525}526}527528HttpHeaders responseHeaders() {529return headers;530}531532int responseCode() {533return responseCode;534}535536// ================ Support for plugging into Http1Receiver =================537// ============================================================================538539// Callback: Error receiver: Consumer of Throwable.540void onReadError(Throwable t) {541Log.logError(t);542Receiver<?> receiver = receiver(readProgress);543if (t instanceof EOFException) {544debug.log(Level.DEBUG, "onReadError: received EOF");545eof = (EOFException) t;546}547CompletableFuture<?> cf = receiver == null ? null : receiver.completion();548debug.log(Level.DEBUG, () -> "onReadError: cf is "549+ (cf == null ? "null"550: (cf.isDone() ? "already completed"551: "not yet completed")));552if (cf != null) {553cf.completeExceptionally(t);554} else {555debug.log(Level.DEBUG, "onReadError", t);556}557debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);558connection.close();559}560561// ========================================================================562563private State advance(State previous) {564assert readProgress == previous;565switch(previous) {566case READING_HEADERS:567asyncReceiver.unsubscribe(headersReader);568return readProgress = State.READING_BODY;569case READING_BODY:570asyncReceiver.unsubscribe(bodyReader);571return readProgress = State.DONE;572default:573throw new InternalError("can't advance from " + previous);574}575}576577Receiver<?> receiver(State state) {578return switch (state) {579case READING_HEADERS -> headersReader;580case READING_BODY -> bodyReader;581582default -> null;583};584585}586587static abstract class Receiver<T>588implements Http1AsyncReceiver.Http1AsyncDelegate {589abstract void start(T parser);590abstract CompletableFuture<State> completion();591// accepts a buffer from upstream.592// this should be implemented as a simple call to593// accept(ref, parser, cf)594public abstract boolean tryAsyncReceive(ByteBuffer buffer);595public abstract void onReadError(Throwable t);596// handle a byte buffer received from upstream.597// this method should set the value of Http1Response.buffer598// to ref.get() before beginning parsing.599abstract void handle(ByteBuffer buf, T parser,600CompletableFuture<State> cf);601// resets this objects state so that it can be reused later on602// typically puts the reference to parser and completion to null603abstract void reset();604605// accepts a byte buffer received from upstream606// returns true if the buffer is fully parsed and more data can607// be accepted, false otherwise.608final boolean accept(ByteBuffer buf, T parser,609CompletableFuture<State> cf) {610if (cf == null || parser == null || cf.isDone()) return false;611handle(buf, parser, cf);612return !cf.isDone();613}614public abstract void onSubscribe(AbstractSubscription s);615public abstract AbstractSubscription subscription();616617}618619// Invoked with each new ByteBuffer when reading headers...620final class HeadersReader extends Receiver<Http1HeaderParser> {621final Consumer<State> onComplete;622volatile Http1HeaderParser parser;623volatile CompletableFuture<State> cf;624volatile long count; // bytes parsed (for debug)625volatile AbstractSubscription subscription;626627HeadersReader(Consumer<State> onComplete) {628this.onComplete = onComplete;629}630631@Override632public AbstractSubscription subscription() {633return subscription;634}635636@Override637public void onSubscribe(AbstractSubscription s) {638this.subscription = s;639s.request(1);640}641642@Override643void reset() {644cf = null;645parser = null;646count = 0;647subscription = null;648}649650// Revisit: do we need to support restarting?651@Override652final void start(Http1HeaderParser hp) {653count = 0;654cf = new MinimalFuture<>();655parser = hp;656}657658@Override659CompletableFuture<State> completion() {660return cf;661}662663@Override664public final boolean tryAsyncReceive(ByteBuffer ref) {665boolean hasDemand = subscription.demand().tryDecrement();666assert hasDemand;667boolean needsMore = accept(ref, parser, cf);668if (needsMore) subscription.request(1);669return needsMore;670}671672@Override673public final void onReadError(Throwable t) {674t = wrapWithExtraDetail(t, parser::currentStateMessage);675Http1Response.this.onReadError(t);676}677678@Override679final void handle(ByteBuffer b,680Http1HeaderParser parser,681CompletableFuture<State> cf) {682assert cf != null : "parsing not started";683assert parser != null : "no parser";684try {685count += b.remaining();686if (debug.on())687debug.log("Sending " + b.remaining() + "/" + b.capacity()688+ " bytes to header parser");689if (parser.parse(b)) {690count -= b.remaining();691if (debug.on())692debug.log("Parsing headers completed. bytes=" + count);693onComplete.accept(State.READING_HEADERS);694cf.complete(State.READING_HEADERS);695}696} catch (Throwable t) {697if (debug.on())698debug.log("Header parser failed to handle buffer: " + t);699cf.completeExceptionally(t);700}701}702703@Override704public void close(Throwable error) {705// if there's no error nothing to do: the cf should/will706// be completed.707if (error != null) {708CompletableFuture<State> cf = this.cf;709if (cf != null) {710if (debug.on())711debug.log("close: completing header parser CF with " + error);712cf.completeExceptionally(error);713}714}715}716}717718// Invoked with each new ByteBuffer when reading bodies...719final class BodyReader extends Receiver<BodyParser> {720final Consumer<State> onComplete;721volatile BodyParser parser;722volatile CompletableFuture<State> cf;723volatile AbstractSubscription subscription;724BodyReader(Consumer<State> onComplete) {725this.onComplete = onComplete;726}727728@Override729void reset() {730parser = null;731cf = null;732subscription = null;733}734735// Revisit: do we need to support restarting?736@Override737final void start(BodyParser parser) {738cf = new MinimalFuture<>();739this.parser = parser;740}741742@Override743CompletableFuture<State> completion() {744return cf;745}746747@Override748public final boolean tryAsyncReceive(ByteBuffer b) {749return accept(b, parser, cf);750}751752@Override753public final void onReadError(Throwable t) {754if (t instanceof EOFException && bodyParser != null &&755bodyParser instanceof UnknownLengthBodyParser) {756((UnknownLengthBodyParser)bodyParser).complete();757return;758}759t = wrapWithExtraDetail(t, parser::currentStateMessage);760Http1Response.this.onReadError(t);761}762763@Override764public AbstractSubscription subscription() {765return subscription;766}767768@Override769public void onSubscribe(AbstractSubscription s) {770this.subscription = s;771try {772parser.onSubscribe(s);773} catch (Throwable t) {774cf.completeExceptionally(t);775throw t;776}777}778779@Override780final void handle(ByteBuffer b,781BodyParser parser,782CompletableFuture<State> cf) {783assert cf != null : "parsing not started";784assert parser != null : "no parser";785try {786if (debug.on())787debug.log("Sending " + b.remaining() + "/" + b.capacity()788+ " bytes to body parser");789parser.accept(b);790} catch (Throwable t) {791if (debug.on())792debug.log("Body parser failed to handle buffer: " + t);793if (!cf.isDone()) {794cf.completeExceptionally(t);795}796}797}798799final void onComplete(Throwable closedExceptionally) {800if (cf.isDone()) return;801if (closedExceptionally != null) {802cf.completeExceptionally(closedExceptionally);803} else {804onComplete.accept(State.READING_BODY);805cf.complete(State.READING_BODY);806}807}808809@Override810public final void close(Throwable error) {811CompletableFuture<State> cf = this.cf;812if (cf != null && !cf.isDone()) {813// we want to make sure dependent actions are triggered814// in order to make sure the client reference count815// is decremented816if (error != null) {817if (debug.on())818debug.log("close: completing body parser CF with " + error);819cf.completeExceptionally(error);820} else {821if (debug.on())822debug.log("close: completing body parser CF");823cf.complete(State.READING_BODY);824}825}826}827828@Override829public String toString() {830return super.toString() + "/parser=" + String.valueOf(parser);831}832}833}834835836