Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.lang.System.Logger.Level;29import java.net.InetSocketAddress;30import java.net.ProxySelector;31import java.net.URI;32import java.net.URISyntaxException;33import java.net.URLPermission;34import java.security.AccessControlContext;35import java.time.Duration;36import java.util.List;37import java.util.Map;38import java.util.Optional;39import java.util.concurrent.CompletableFuture;40import java.util.concurrent.Executor;41import java.util.function.Function;42import java.net.http.HttpClient;43import java.net.http.HttpHeaders;44import java.net.http.HttpResponse;45import java.net.http.HttpTimeoutException;4647import jdk.internal.net.http.common.Logger;48import jdk.internal.net.http.common.MinimalFuture;49import jdk.internal.net.http.common.Utils;50import jdk.internal.net.http.common.Log;5152import static jdk.internal.net.http.common.Utils.permissionForProxy;5354/**55* One request/response exchange (handles 100/101 intermediate response also).56* depth field used to track number of times a new request is being sent57* for a given API request. If limit exceeded exception is thrown.58*59* Security check is performed here:60* - uses AccessControlContext captured at API level61* - checks for appropriate URLPermission for request62* - if permission allowed, grants equivalent SocketPermission to call63* - in case of direct HTTP proxy, checks additionally for access to proxy64* (CONNECT proxying uses its own Exchange, so check done there)65*66*/67final class Exchange<T> {6869final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);7071final HttpRequestImpl request;72final HttpClientImpl client;73volatile ExchangeImpl<T> exchImpl;74volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;75volatile CompletableFuture<Void> bodyIgnored;7677// used to record possible cancellation raised before the exchImpl78// has been established.79private volatile IOException failed;80@SuppressWarnings("removal")81final AccessControlContext acc;82final MultiExchange<T> multi;83final Executor parentExecutor;84volatile boolean upgrading; // to HTTP/285volatile boolean upgraded; // to HTTP/286final PushGroup<T> pushGroup;87final String dbgTag;8889// Keeps track of the underlying connection when establishing an HTTP/290// exchange so that it can be aborted/timed out mid setup.91final ConnectionAborter connectionAborter = new ConnectionAborter();9293Exchange(HttpRequestImpl request, MultiExchange<T> multi) {94this.request = request;95this.upgrading = false;96this.client = multi.client();97this.multi = multi;98this.acc = multi.acc;99this.parentExecutor = multi.executor;100this.pushGroup = multi.pushGroup;101this.dbgTag = "Exchange";102}103104/* If different AccessControlContext to be used */105Exchange(HttpRequestImpl request,106MultiExchange<T> multi,107@SuppressWarnings("removal") AccessControlContext acc)108{109this.request = request;110this.acc = acc;111this.upgrading = false;112this.client = multi.client();113this.multi = multi;114this.parentExecutor = multi.executor;115this.pushGroup = multi.pushGroup;116this.dbgTag = "Exchange";117}118119PushGroup<T> getPushGroup() {120return pushGroup;121}122123Executor executor() {124return parentExecutor;125}126127public HttpRequestImpl request() {128return request;129}130131public Optional<Duration> remainingConnectTimeout() {132return multi.remainingConnectTimeout();133}134135HttpClientImpl client() {136return client;137}138139// Keeps track of the underlying connection when establishing an HTTP/2140// exchange so that it can be aborted/timed out mid setup.141static final class ConnectionAborter {142private volatile HttpConnection connection;143private volatile boolean closeRequested;144145void connection(HttpConnection connection) {146this.connection = connection;147if (closeRequested) closeConnection();148}149150void closeConnection() {151closeRequested = true;152HttpConnection connection = this.connection;153this.connection = null;154if (connection != null) {155try {156connection.close();157} catch (Throwable t) {158// ignore159}160}161}162163void disable() {164connection = null;165closeRequested = false;166}167}168169// Called for 204 response - when no body is permitted170// This is actually only needed for HTTP/1.1 in order171// to return the connection to the pool (or close it)172void nullBody(HttpResponse<T> resp, Throwable t) {173exchImpl.nullBody(resp, t);174}175176public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {177// If we received a 407 while establishing the exchange178// there will be no body to read: bodyIgnored will be true,179// and exchImpl will be null (if we were trying to establish180// an HTTP/2 tunnel through an HTTP/1.1 proxy)181if (bodyIgnored != null) return MinimalFuture.completedFuture(null);182183// The connection will not be returned to the pool in the case of WebSocket184return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)185.whenComplete((r,t) -> exchImpl.completed());186}187188/**189* Called after a redirect or similar kind of retry where a body might190* be sent but we don't want it. Should send a RESET in h2. For http/1.1191* we can consume small quantity of data, or close the connection in192* other cases.193*/194public CompletableFuture<Void> ignoreBody() {195if (bodyIgnored != null) return bodyIgnored;196return exchImpl.ignoreBody();197}198199/**200* Called when a new exchange is created to replace this exchange.201* At this point it is guaranteed that readBody/readBodyAsync will202* not be called.203*/204public void released() {205ExchangeImpl<?> impl = exchImpl;206if (impl != null) impl.released();207// Don't set exchImpl to null here. We need to keep208// it alive until it's replaced by a Stream in wrapForUpgrade.209// Setting it to null here might get it GC'ed too early, because210// the Http1Response is now only weakly referenced by the Selector.211}212213public void cancel() {214// cancel can be called concurrently before or at the same time215// that the exchange impl is being established.216// In that case we won't be able to propagate the cancellation217// right away218if (exchImpl != null) {219exchImpl.cancel();220} else {221// no impl - can't cancel impl yet.222// call cancel(IOException) instead which takes care223// of race conditions between impl/cancel.224cancel(new IOException("Request cancelled"));225}226}227228public void cancel(IOException cause) {229if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);230// If the impl is non null, propagate the exception right away.231// Otherwise record it so that it can be propagated once the232// exchange impl has been established.233ExchangeImpl<?> impl = exchImpl;234if (impl != null) {235// propagate the exception to the impl236if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);237impl.cancel(cause);238} else {239// no impl yet. record the exception240failed = cause;241242// abort/close the connection if setting up the exchange. This can243// be important when setting up HTTP/2244connectionAborter.closeConnection();245246// now call checkCancelled to recheck the impl.247// if the failed state is set and the impl is not null, reset248// the failed state and propagate the exception to the impl.249checkCancelled();250}251}252253// This method will raise an exception if one was reported and if254// it is possible to do so. If the exception can be raised, then255// the failed state will be reset. Otherwise, the failed state256// will persist until the exception can be raised and the failed state257// can be cleared.258// Takes care of possible race conditions.259private void checkCancelled() {260ExchangeImpl<?> impl = null;261IOException cause = null;262CompletableFuture<? extends ExchangeImpl<T>> cf = null;263if (failed != null) {264synchronized(this) {265cause = failed;266impl = exchImpl;267cf = exchangeCF;268}269}270if (cause == null) return;271if (impl != null) {272// The exception is raised by propagating it to the impl.273if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);274impl.cancel(cause);275failed = null;276} else {277Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."278+ "\n\tCan''t cancel yet with {2}",279request.uri(),280request.timeout().isPresent() ?281// calling duration.toMillis() can throw an exception.282// this is just debugging, we don't care if it overflows.283(request.timeout().get().getSeconds() * 1000284+ request.timeout().get().getNano() / 1000000) : -1,285cause);286if (cf != null) cf.completeExceptionally(cause);287}288}289290<T> CompletableFuture<T> checkCancelled(CompletableFuture<T> cf, HttpConnection connection) {291return cf.handle((r,t) -> {292if (t == null) {293if (multi.requestCancelled()) {294// if upgraded, we don't close the connection.295// cancelling will be handled by the HTTP/2 exchange296// in its own time.297if (!upgraded) {298t = getCancelCause();299if (t == null) t = new IOException("Request cancelled");300if (debug.on()) debug.log("exchange cancelled during connect: " + t);301try {302connection.close();303} catch (Throwable x) {304if (debug.on()) debug.log("Failed to close connection", x);305}306return MinimalFuture.<T>failedFuture(t);307}308}309}310return cf;311}).thenCompose(Function.identity());312}313314public void h2Upgrade() {315upgrading = true;316request.setH2Upgrade(client.client2());317}318319synchronized IOException getCancelCause() {320return failed;321}322323// get/set the exchange impl, solving race condition issues with324// potential concurrent calls to cancel() or cancel(IOException)325private CompletableFuture<? extends ExchangeImpl<T>>326establishExchange(HttpConnection connection) {327if (debug.on()) {328debug.log("establishing exchange for %s,%n\t proxy=%s",329request, request.proxy());330}331// check if we have been cancelled first.332Throwable t = getCancelCause();333checkCancelled();334if (t != null) {335if (debug.on()) {336debug.log("exchange was cancelled: returned failed cf (%s)", String.valueOf(t));337}338return exchangeCF = MinimalFuture.failedFuture(t);339}340341CompletableFuture<? extends ExchangeImpl<T>> cf, res;342cf = ExchangeImpl.get(this, connection);343// We should probably use a VarHandle to get/set exchangeCF344// instead - as we need CAS semantics.345synchronized (this) { exchangeCF = cf; };346res = cf.whenComplete((r,x) -> {347synchronized(Exchange.this) {348if (exchangeCF == cf) exchangeCF = null;349}350});351checkCancelled();352return res.thenCompose((eimpl) -> {353// recheck for cancelled, in case of race conditions354exchImpl = eimpl;355IOException tt = getCancelCause();356checkCancelled();357if (tt != null) {358return MinimalFuture.failedFuture(tt);359} else {360// Now we're good to go. Because exchImpl is no longer361// null cancel() will be able to propagate directly to362// the impl after this point ( if needed ).363return MinimalFuture.completedFuture(eimpl);364} });365}366367// Completed HttpResponse will be null if response succeeded368// will be a non null responseAsync if expect continue returns an error369370public CompletableFuture<Response> responseAsync() {371return responseAsyncImpl(null);372}373374CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {375SecurityException e = checkPermissions();376if (e != null) {377return MinimalFuture.failedFuture(e);378} else {379return responseAsyncImpl0(connection);380}381}382383// check whether the headersSentCF was completed exceptionally with384// ProxyAuthorizationRequired. If so the Response embedded in the385// exception is returned. Otherwise we proceed.386private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,387Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {388t = Utils.getCompletionCause(t);389if (t instanceof ProxyAuthenticationRequired) {390if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");391bodyIgnored = MinimalFuture.completedFuture(null);392Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;393HttpConnection c = ex == null ? null : ex.connection();394Response syntheticResponse = new Response(request, this,395proxyResponse.headers, c, proxyResponse.statusCode,396proxyResponse.version, true);397return MinimalFuture.completedFuture(syntheticResponse);398} else if (t != null) {399if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);400return MinimalFuture.failedFuture(t);401} else {402if (debug.on()) debug.log("checkFor407: all clear");403return andThen.apply(ex);404}405}406407// After sending the request headers, if no ProxyAuthorizationRequired408// was raised and the expectContinue flag is on, we need to wait409// for the 100-Continue response410private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {411assert request.expectContinue();412return ex.getResponseAsync(parentExecutor)413.thenCompose((Response r1) -> {414Log.logResponse(r1::toString);415int rcode = r1.statusCode();416if (rcode == 100) {417Log.logTrace("Received 100-Continue: sending body");418if (debug.on()) debug.log("Received 100-Continue for %s", r1);419CompletableFuture<Response> cf =420exchImpl.sendBodyAsync()421.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));422cf = wrapForUpgrade(cf);423cf = wrapForLog(cf);424return cf;425} else {426Log.logTrace("Expectation failed: Received {0}",427rcode);428if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);429if (upgrading && rcode == 101) {430IOException failed = new IOException(431"Unable to handle 101 while waiting for 100");432return MinimalFuture.failedFuture(failed);433}434return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)435.thenApply(v -> r1);436}437});438}439440// After sending the request headers, if no ProxyAuthorizationRequired441// was raised and the expectContinue flag is off, we can immediately442// send the request body and proceed.443private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {444assert !request.expectContinue();445if (debug.on()) debug.log("sendRequestBody");446CompletableFuture<Response> cf = ex.sendBodyAsync()447.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));448cf = wrapForUpgrade(cf);449cf = wrapForLog(cf);450return cf;451}452453CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {454Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;455bodyIgnored = null;456if (request.expectContinue()) {457request.addSystemHeader("Expect", "100-Continue");458Log.logTrace("Sending Expect: 100-Continue");459// wait for 100-Continue before sending body460after407Check = this::expectContinue;461} else {462// send request body and proceed.463after407Check = this::sendRequestBody;464}465// The ProxyAuthorizationRequired can be triggered either by466// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy467// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy468// Therefore we handle it with a call to this checkFor407(...) after these469// two places.470Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =471(ex) -> ex.sendHeadersAsync()472.handle((r,t) -> this.checkFor407(r, t, after407Check))473.thenCompose(Function.identity());474return establishExchange(connection)475.handle((r,t) -> this.checkFor407(r,t, afterExch407Check))476.thenCompose(Function.identity());477}478479private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {480if (upgrading) {481return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));482}483return cf;484}485486private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {487if (Log.requests()) {488return cf.thenApply(response -> {489Log.logResponse(response::toString);490return response;491});492}493return cf;494}495496HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {497return HttpResponse.BodySubscribers.replacing(null);498}499500// if this response was received in reply to an upgrade501// then create the Http2Connection from the HttpConnection502// initialize it and wait for the real response on a newly created Stream503504private CompletableFuture<Response>505checkForUpgradeAsync(Response resp,506ExchangeImpl<T> ex) {507508int rcode = resp.statusCode();509if (upgrading && (rcode == 101)) {510Http1Exchange<T> e = (Http1Exchange<T>)ex;511// check for 101 switching protocols512// 101 responses are not supposed to contain a body.513// => should we fail if there is one?514if (debug.on()) debug.log("Upgrading async %s", e.connection());515return e.readBodyAsync(this::ignoreBody, false, parentExecutor)516.thenCompose((T v) -> {// v is null517debug.log("Ignored body");518// we pass e::getBuffer to allow the ByteBuffers to accumulate519// while we build the Http2Connection520ex.upgraded();521upgraded = true;522return Http2Connection.createAsync(e.connection(),523client.client2(),524this, e::drainLeftOverBytes)525.thenCompose((Http2Connection c) -> {526boolean cached = c.offerConnection();527if (cached) connectionAborter.disable();528Stream<T> s = c.getStream(1);529530if (s == null) {531// s can be null if an exception occurred532// asynchronously while sending the preface.533Throwable t = c.getRecordedCause();534IOException ioe;535if (t != null) {536if (!cached)537c.close();538ioe = new IOException("Can't get stream 1: " + t, t);539} else {540ioe = new IOException("Can't get stream 1");541}542return MinimalFuture.failedFuture(ioe);543}544exchImpl.released();545Throwable t;546// There's a race condition window where an external547// thread (SelectorManager) might complete the548// exchange in timeout at the same time where we're549// trying to switch the exchange impl.550// 'failed' will be reset to null after551// exchImpl.cancel() has completed, so either we552// will observe failed != null here, or we will553// observe e.getCancelCause() != null, or the554// timeout exception will be routed to 's'.555// Either way, we need to relay it to s.556synchronized (this) {557exchImpl = s;558t = failed;559}560// Check whether the HTTP/1.1 was cancelled.561if (t == null) t = e.getCancelCause();562// if HTTP/1.1 exchange was timed out, or the request563// was cancelled don't try to go further.564if (t instanceof HttpTimeoutException || multi.requestCancelled()) {565if (t == null) t = new IOException("Request cancelled");566s.cancelImpl(t);567return MinimalFuture.failedFuture(t);568}569if (debug.on())570debug.log("Getting response async %s", s);571return s.getResponseAsync(null);572});}573);574}575return MinimalFuture.completedFuture(resp);576}577578private URI getURIForSecurityCheck() {579URI u;580String method = request.method();581InetSocketAddress authority = request.authority();582URI uri = request.uri();583584// CONNECT should be restricted at API level585if (method.equalsIgnoreCase("CONNECT")) {586try {587u = new URI("socket",588null,589authority.getHostString(),590authority.getPort(),591null,592null,593null);594} catch (URISyntaxException e) {595throw new InternalError(e); // shouldn't happen596}597} else {598u = uri;599}600return u;601}602603/**604* Returns the security permission required for the given details.605* If method is CONNECT, then uri must be of form "scheme://host:port"606*/607private static URLPermission permissionForServer(URI uri,608String method,609Map<String, List<String>> headers) {610if (method.equals("CONNECT")) {611return new URLPermission(uri.toString(), "CONNECT");612} else {613return Utils.permissionForServer(uri, method, headers.keySet().stream());614}615}616617/**618* Performs the necessary security permission checks required to retrieve619* the response. Returns a security exception representing the denied620* permission, or null if all checks pass or there is no security manager.621*/622private SecurityException checkPermissions() {623String method = request.method();624@SuppressWarnings("removal")625SecurityManager sm = System.getSecurityManager();626if (sm == null || method.equals("CONNECT")) {627// tunneling will have a null acc, which is fine. The proxy628// permission check will have already been preformed.629return null;630}631632HttpHeaders userHeaders = request.getUserHeaders();633URI u = getURIForSecurityCheck();634URLPermission p = permissionForServer(u, method, userHeaders.map());635636try {637assert acc != null;638sm.checkPermission(p, acc);639} catch (SecurityException e) {640return e;641}642String hostHeader = userHeaders.firstValue("Host").orElse(null);643if (hostHeader != null && !hostHeader.equalsIgnoreCase(u.getHost())) {644// user has set a Host header different to request URI645// must check that for URLPermission also646URI u1 = replaceHostInURI(u, hostHeader);647URLPermission p1 = permissionForServer(u1, method, userHeaders.map());648try {649assert acc != null;650sm.checkPermission(p1, acc);651} catch (SecurityException e) {652return e;653}654}655ProxySelector ps = client.proxySelector();656if (ps != null) {657if (!method.equals("CONNECT")) {658// a non-tunneling HTTP proxy. Need to check access659URLPermission proxyPerm = permissionForProxy(request.proxy());660if (proxyPerm != null) {661try {662sm.checkPermission(proxyPerm, acc);663} catch (SecurityException e) {664return e;665}666}667}668}669return null;670}671672private static URI replaceHostInURI(URI u, String hostPort) {673StringBuilder sb = new StringBuilder();674sb.append(u.getScheme())675.append("://")676.append(hostPort)677.append(u.getRawPath());678return URI.create(sb.toString());679}680681HttpClient.Version version() {682return multi.version();683}684685String dbgString() {686return dbgTag;687}688}689690691