Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.lang.ref.WeakReference;29import java.net.ConnectException;30import java.net.http.HttpConnectTimeoutException;31import java.time.Duration;32import java.util.Iterator;33import java.util.LinkedList;34import java.security.AccessControlContext;35import java.util.Objects;36import java.util.Optional;37import java.util.concurrent.CancellationException;38import java.util.concurrent.CompletableFuture;39import java.util.concurrent.CompletionStage;40import java.util.concurrent.CompletionException;41import java.util.concurrent.ExecutionException;42import java.util.concurrent.Executor;43import java.util.concurrent.Flow;44import java.util.concurrent.atomic.AtomicInteger;45import java.util.concurrent.atomic.AtomicLong;46import java.util.concurrent.atomic.AtomicReference;47import java.util.function.Function;4849import java.net.http.HttpClient;50import java.net.http.HttpHeaders;51import java.net.http.HttpRequest;52import java.net.http.HttpResponse;53import java.net.http.HttpResponse.BodySubscriber;54import java.net.http.HttpResponse.PushPromiseHandler;55import java.net.http.HttpTimeoutException;56import jdk.internal.net.http.common.Cancelable;57import jdk.internal.net.http.common.Log;58import jdk.internal.net.http.common.Logger;59import jdk.internal.net.http.common.MinimalFuture;60import jdk.internal.net.http.common.ConnectionExpiredException;61import jdk.internal.net.http.common.Utils;62import static jdk.internal.net.http.common.MinimalFuture.completedFuture;63import static jdk.internal.net.http.common.MinimalFuture.failedFuture;6465/**66* Encapsulates multiple Exchanges belonging to one HttpRequestImpl.67* - manages filters68* - retries due to filters.69* - I/O errors and most other exceptions get returned directly to user70*71* Creates a new Exchange for each request/response interaction72*/73class MultiExchange<T> implements Cancelable {7475static final Logger debug =76Utils.getDebugLogger("MultiExchange"::toString, Utils.DEBUG);7778private final HttpRequest userRequest; // the user request79private final HttpRequestImpl request; // a copy of the user request80private final ConnectTimeoutTracker connectTimeout; // null if no timeout81@SuppressWarnings("removal")82final AccessControlContext acc;83final HttpClientImpl client;84final HttpResponse.BodyHandler<T> responseHandler;85final HttpClientImpl.DelegatingExecutor executor;86final AtomicInteger attempts = new AtomicInteger();87HttpRequestImpl currentreq; // used for retries & redirect88HttpRequestImpl previousreq; // used for retries & redirect89Exchange<T> exchange; // the current exchange90Exchange<T> previous;91volatile Throwable retryCause;92volatile boolean expiredOnce;93volatile HttpResponse<T> response = null;9495// Maximum number of times a request will be retried/redirected96// for any reason97static final int DEFAULT_MAX_ATTEMPTS = 5;98static final int max_attempts = Utils.getIntegerNetProperty(99"jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS100);101102private final LinkedList<HeaderFilter> filters;103ResponseTimerEvent responseTimerEvent;104volatile boolean cancelled;105AtomicReference<CancellationException> interrupted = new AtomicReference<>();106final PushGroup<T> pushGroup;107108/**109* Filter fields. These are attached as required by filters110* and only used by the filter implementations. This could be111* generalised into Objects that are passed explicitly to the filters112* (one per MultiExchange object, and one per Exchange object possibly)113*/114volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;115// RedirectHandler116volatile int numberOfRedirects = 0;117118// This class is used to keep track of the connection timeout119// across retries, when a ConnectException causes a retry.120// In that case - we will retry the connect, but we don't121// want to double the timeout by starting a new timer with122// the full connectTimeout again.123// Instead we use the ConnectTimeoutTracker to return a new124// duration that takes into account the time spent in the125// first connect attempt.126// If however, the connection gets connected, but we later127// retry the whole operation, then we reset the timer before128// retrying (since the connection used for the second request129// will not necessarily be the same: it could be a new130// unconnected connection) - see getExceptionalCF().131private static final class ConnectTimeoutTracker {132final Duration max;133final AtomicLong startTime = new AtomicLong();134ConnectTimeoutTracker(Duration connectTimeout) {135this.max = Objects.requireNonNull(connectTimeout);136}137138Duration getRemaining() {139long now = System.nanoTime();140long previous = startTime.compareAndExchange(0, now);141if (previous == 0 || max.isZero()) return max;142Duration remaining = max.minus(Duration.ofNanos(now - previous));143assert remaining.compareTo(max) <= 0;144return remaining.isNegative() ? Duration.ZERO : remaining;145}146147void reset() { startTime.set(0); }148}149150/**151* MultiExchange with one final response.152*/153MultiExchange(HttpRequest userRequest,154HttpRequestImpl requestImpl,155HttpClientImpl client,156HttpResponse.BodyHandler<T> responseHandler,157PushPromiseHandler<T> pushPromiseHandler,158@SuppressWarnings("removal") AccessControlContext acc) {159this.previous = null;160this.userRequest = userRequest;161this.request = requestImpl;162this.currentreq = request;163this.previousreq = null;164this.client = client;165this.filters = client.filterChain();166this.acc = acc;167this.executor = client.theExecutor();168this.responseHandler = responseHandler;169170if (pushPromiseHandler != null) {171Executor executor = acc == null172? this.executor.delegate()173: new PrivilegedExecutor(this.executor.delegate(), acc);174this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);175} else {176pushGroup = null;177}178this.connectTimeout = client.connectTimeout()179.map(ConnectTimeoutTracker::new).orElse(null);180this.exchange = new Exchange<>(request, this);181}182183static final class CancelableRef implements Cancelable {184private final WeakReference<Cancelable> cancelableRef;185CancelableRef(Cancelable cancelable) {186cancelableRef = new WeakReference<>(cancelable);187}188@Override189public boolean cancel(boolean mayInterruptIfRunning) {190Cancelable cancelable = cancelableRef.get();191if (cancelable != null) {192return cancelable.cancel(mayInterruptIfRunning);193} else return false;194}195}196197synchronized Exchange<T> getExchange() {198return exchange;199}200201HttpClientImpl client() {202return client;203}204205HttpClient.Version version() {206HttpClient.Version vers = request.version().orElse(client.version());207if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)208vers = HttpClient.Version.HTTP_1_1;209return vers;210}211212private synchronized void setExchange(Exchange<T> exchange) {213if (this.exchange != null && exchange != this.exchange) {214this.exchange.released();215if (cancelled) exchange.cancel();216}217this.exchange = exchange;218}219220public Optional<Duration> remainingConnectTimeout() {221return Optional.ofNullable(connectTimeout)222.map(ConnectTimeoutTracker::getRemaining);223}224225private void cancelTimer() {226if (responseTimerEvent != null) {227client.cancelTimer(responseTimerEvent);228}229}230231private void requestFilters(HttpRequestImpl r) throws IOException {232Log.logTrace("Applying request filters");233for (HeaderFilter filter : filters) {234Log.logTrace("Applying {0}", filter);235filter.request(r, this);236}237Log.logTrace("All filters applied");238}239240private HttpRequestImpl responseFilters(Response response) throws IOException241{242Log.logTrace("Applying response filters");243Iterator<HeaderFilter> reverseItr = filters.descendingIterator();244while (reverseItr.hasNext()) {245HeaderFilter filter = reverseItr.next();246Log.logTrace("Applying {0}", filter);247HttpRequestImpl newreq = filter.response(response);248if (newreq != null) {249Log.logTrace("New request: stopping filters");250return newreq;251}252}253Log.logTrace("All filters applied");254return null;255}256257public void cancel(IOException cause) {258cancelled = true;259getExchange().cancel(cause);260}261262/**263* Used to relay a call from {@link CompletableFuture#cancel(boolean)}264* to this multi exchange for the purpose of cancelling the265* HTTP exchange.266* @param mayInterruptIfRunning if true, and this exchange is not already267* cancelled, this method will attempt to interrupt and cancel the268* exchange. Otherwise, the exchange is allowed to proceed and this269* method does nothing.270* @return true if the exchange was cancelled, false otherwise.271*/272@Override273public boolean cancel(boolean mayInterruptIfRunning) {274boolean cancelled = this.cancelled;275if (!cancelled && mayInterruptIfRunning) {276if (interrupted.get() == null) {277interrupted.compareAndSet(null,278new CancellationException("Request cancelled"));279}280this.cancelled = true;281var exchange = getExchange();282if (exchange != null) {283exchange.cancel();284}285return true;286}287return false;288}289290public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {291CompletableFuture<Void> start = new MinimalFuture<>(new CancelableRef(this));292CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);293start.completeAsync( () -> null, executor); // trigger execution294return cf;295}296297// return true if the response is a type where a response body is never possible298// and therefore doesn't have to include header information which indicates no299// body is present. This is distinct from responses that also do not contain300// response bodies (possibly ever) but which are required to have content length301// info in the header (eg 205). Those cases do not have to be handled specially302303private static boolean bodyNotPermitted(Response r) {304return r.statusCode == 204;305}306307private boolean bodyIsPresent(Response r) {308HttpHeaders headers = r.headers();309if (headers.firstValueAsLong("Content-length").orElse(0L) != 0L)310return true;311if (headers.firstValue("Transfer-encoding").isPresent())312return true;313return false;314}315316// Call the user's body handler to get an empty body object317318private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T> exch) {319BodySubscriber<T> bs = responseHandler.apply(new ResponseInfoImpl(r.statusCode(),320r.headers(), r.version()));321bs.onSubscribe(new NullSubscription());322bs.onComplete();323CompletionStage<T> cs = ResponseSubscribers.getBodyAsync(executor, bs);324MinimalFuture<HttpResponse<T>> result = new MinimalFuture<>();325cs.whenComplete((nullBody, exception) -> {326if (exception != null)327result.completeExceptionally(exception);328else {329this.response =330new HttpResponseImpl<>(r.request(), r, this.response, nullBody, exch);331result.complete(this.response);332}333});334// ensure that the connection is closed or returned to the pool.335return result.whenComplete(exch::nullBody);336}337338private CompletableFuture<HttpResponse<T>>339responseAsync0(CompletableFuture<Void> start) {340return start.thenCompose( v -> responseAsyncImpl())341.thenCompose((Response r) -> {342Exchange<T> exch = getExchange();343if (bodyNotPermitted(r)) {344if (bodyIsPresent(r)) {345IOException ioe = new IOException(346"unexpected content length header with 204 response");347exch.cancel(ioe);348return MinimalFuture.failedFuture(ioe);349} else350return handleNoBody(r, exch);351}352return exch.readBodyAsync(responseHandler)353.thenApply((T body) -> {354this.response =355new HttpResponseImpl<>(r.request(), r, this.response, body, exch);356return this.response;357});358}).exceptionallyCompose(this::whenCancelled);359}360361private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {362CancellationException x = interrupted.get();363if (x != null) {364// make sure to fail with CancellationException if cancel(true)365// was called.366t = x.initCause(Utils.getCancelCause(t));367if (debug.on()) {368debug.log("MultiExchange interrupted with: " + t.getCause());369}370}371return MinimalFuture.failedFuture(t);372}373374static class NullSubscription implements Flow.Subscription {375@Override376public void request(long n) {377}378379@Override380public void cancel() {381}382}383384private CompletableFuture<Response> responseAsyncImpl() {385CompletableFuture<Response> cf;386if (attempts.incrementAndGet() > max_attempts) {387cf = failedFuture(new IOException("Too many retries", retryCause));388} else {389if (currentreq.timeout().isPresent()) {390responseTimerEvent = ResponseTimerEvent.of(this);391client.registerTimer(responseTimerEvent);392}393try {394// 1. apply request filters395// if currentreq == previousreq the filters have already396// been applied once. Applying them a second time might397// cause some headers values to be added twice: for398// instance, the same cookie might be added again.399if (currentreq != previousreq) {400requestFilters(currentreq);401}402} catch (IOException e) {403return failedFuture(e);404}405Exchange<T> exch = getExchange();406// 2. get response407cf = exch.responseAsync()408.thenCompose((Response response) -> {409HttpRequestImpl newrequest;410try {411// 3. apply response filters412newrequest = responseFilters(response);413} catch (IOException e) {414return failedFuture(e);415}416// 4. check filter result and repeat or continue417if (newrequest == null) {418if (attempts.get() > 1) {419Log.logError("Succeeded on attempt: " + attempts);420}421return completedFuture(response);422} else {423this.response =424new HttpResponseImpl<>(currentreq, response, this.response, null, exch);425Exchange<T> oldExch = exch;426if (currentreq.isWebSocket()) {427// need to close the connection and open a new one.428exch.exchImpl.connection().close();429}430return exch.ignoreBody().handle((r,t) -> {431previousreq = currentreq;432currentreq = newrequest;433expiredOnce = false;434setExchange(new Exchange<>(currentreq, this, acc));435return responseAsyncImpl();436}).thenCompose(Function.identity());437} })438.handle((response, ex) -> {439// 5. handle errors and cancel any timer set440cancelTimer();441if (ex == null) {442assert response != null;443return completedFuture(response);444}445// all exceptions thrown are handled here446CompletableFuture<Response> errorCF = getExceptionalCF(ex);447if (errorCF == null) {448return responseAsyncImpl();449} else {450return errorCF;451} })452.thenCompose(Function.identity());453}454return cf;455}456457private static boolean retryPostValue() {458String s = Utils.getNetProperty("jdk.httpclient.enableAllMethodRetry");459if (s == null)460return false;461return s.isEmpty() ? true : Boolean.parseBoolean(s);462}463464private static boolean disableRetryConnect() {465String s = Utils.getNetProperty("jdk.httpclient.disableRetryConnect");466if (s == null)467return false;468return s.isEmpty() ? true : Boolean.parseBoolean(s);469}470471/** True if ALL ( even non-idempotent ) requests can be automatic retried. */472private static final boolean RETRY_ALWAYS = retryPostValue();473/** True if ConnectException should cause a retry. Enabled by default */474static final boolean RETRY_CONNECT = !disableRetryConnect();475476/** Returns true is given request has an idempotent method. */477private static boolean isIdempotentRequest(HttpRequest request) {478String method = request.method();479return switch (method) {480case "GET", "HEAD" -> true;481default -> false;482};483}484485/** Returns true if the given request can be automatically retried. */486private static boolean canRetryRequest(HttpRequest request) {487if (RETRY_ALWAYS)488return true;489if (isIdempotentRequest(request))490return true;491return false;492}493494// Returns true if cancel(true) was called.495// This is an important distinction in several scenarios:496// for instance, if cancel(true) was called 1. we don't want497// to retry, 2. we don't want to wrap the exception in498// a timeout exception.499boolean requestCancelled() {500return interrupted.get() != null;501}502503private boolean retryOnFailure(Throwable t) {504if (requestCancelled()) return false;505return t instanceof ConnectionExpiredException506|| (RETRY_CONNECT && (t instanceof ConnectException));507}508509private Throwable retryCause(Throwable t) {510Throwable cause = t instanceof ConnectionExpiredException ? t.getCause() : t;511return cause == null ? t : cause;512}513514/**515* Takes a Throwable and returns a suitable CompletableFuture that is516* completed exceptionally, or null.517*/518private CompletableFuture<Response> getExceptionalCF(Throwable t) {519if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {520if (t.getCause() != null) {521t = t.getCause();522}523}524if (cancelled && !requestCancelled() && t instanceof IOException) {525if (!(t instanceof HttpTimeoutException)) {526t = toTimeoutException((IOException)t);527}528} else if (retryOnFailure(t)) {529Throwable cause = retryCause(t);530531if (!(t instanceof ConnectException)) {532// we may need to start a new connection, and if so533// we want to start with a fresh connect timeout again.534if (connectTimeout != null) connectTimeout.reset();535if (!canRetryRequest(currentreq)) {536return failedFuture(cause); // fails with original cause537}538} // ConnectException: retry, but don't reset the connectTimeout.539540// allow the retry mechanism to do its work541retryCause = cause;542if (!expiredOnce) {543if (debug.on())544debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);545expiredOnce = true;546// The connection was abruptly closed.547// We return null to retry the same request a second time.548// The request filters have already been applied to the549// currentreq, so we set previousreq = currentreq to550// prevent them from being applied again.551previousreq = currentreq;552return null;553} else {554if (debug.on()) {555debug.log(t.getClass().getSimpleName()556+ " (async): already retried once.", t);557}558t = cause;559}560}561return failedFuture(t);562}563564private HttpTimeoutException toTimeoutException(IOException ioe) {565HttpTimeoutException t = null;566567// more specific, "request timed out", when connected568Exchange<?> exchange = getExchange();569if (exchange != null) {570ExchangeImpl<?> exchangeImpl = exchange.exchImpl;571if (exchangeImpl != null) {572if (exchangeImpl.connection().connected()) {573t = new HttpTimeoutException("request timed out");574t.initCause(ioe);575}576}577}578if (t == null) {579t = new HttpConnectTimeoutException("HTTP connect timed out");580t.initCause(new ConnectException("HTTP connect timed out"));581}582return t;583}584}585586587