Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.Closeable;28import java.io.IOException;29import java.net.InetSocketAddress;30import java.nio.ByteBuffer;31import java.nio.channels.SocketChannel;32import java.util.Arrays;33import java.util.IdentityHashMap;34import java.util.List;35import java.util.Map;36import java.util.TreeMap;37import java.util.concurrent.CompletableFuture;38import java.util.concurrent.CompletionStage;39import java.util.concurrent.ConcurrentLinkedDeque;40import java.util.concurrent.Flow;41import java.util.function.BiPredicate;42import java.util.function.Predicate;43import java.net.http.HttpClient;44import java.net.http.HttpClient.Version;45import java.net.http.HttpHeaders;46import jdk.internal.net.http.common.Demand;47import jdk.internal.net.http.common.FlowTube;48import jdk.internal.net.http.common.Logger;49import jdk.internal.net.http.common.SequentialScheduler;50import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;51import jdk.internal.net.http.common.Log;52import jdk.internal.net.http.common.Utils;53import static java.net.http.HttpClient.Version.HTTP_2;54import static jdk.internal.net.http.common.Utils.ProxyHeaders;5556/**57* Wraps socket channel layer and takes care of SSL also.58*59* Subtypes are:60* PlainHttpConnection: regular direct TCP connection to server61* PlainProxyConnection: plain text proxy connection62* PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server63* AsyncSSLConnection: TLS channel direct to server64* AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel65*/66abstract class HttpConnection implements Closeable {6768final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);69final static Logger DEBUG_LOGGER = Utils.getDebugLogger(70() -> "HttpConnection(SocketTube(?))", Utils.DEBUG);7172/** The address this connection is connected to. Could be a server or a proxy. */73final InetSocketAddress address;74private final HttpClientImpl client;75private final TrailingOperations trailingOperations;7677HttpConnection(InetSocketAddress address, HttpClientImpl client) {78this.address = address;79this.client = client;80trailingOperations = new TrailingOperations();81}8283private static final class TrailingOperations {84private final Map<CompletionStage<?>, Boolean> operations =85new IdentityHashMap<>();86void add(CompletionStage<?> cf) {87synchronized(operations) {88operations.put(cf, Boolean.TRUE);89cf.whenComplete((r,t)-> remove(cf));90}91}92boolean remove(CompletionStage<?> cf) {93synchronized(operations) {94return operations.remove(cf);95}96}97}9899final void addTrailingOperation(CompletionStage<?> cf) {100trailingOperations.add(cf);101}102103// final void removeTrailingOperation(CompletableFuture<?> cf) {104// trailingOperations.remove(cf);105// }106107final HttpClientImpl client() {108return client;109}110111/**112* Initiates the connect phase.113*114* Returns a CompletableFuture that completes when the underlying115* TCP connection has been established or an error occurs.116*/117public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);118119/**120* Finishes the connection phase.121*122* Returns a CompletableFuture that completes when any additional,123* type specific, setup has been done. Must be called after connectAsync. */124public abstract CompletableFuture<Void> finishConnect();125126/** Tells whether, or not, this connection is connected to its destination. */127abstract boolean connected();128129/** Tells whether, or not, this connection is secure ( over SSL ) */130abstract boolean isSecure();131132/**133* Tells whether, or not, this connection is proxied.134* Returns true for tunnel connections, or clear connection to135* any host through proxy.136*/137abstract boolean isProxied();138139/**140* Returns the address of the proxy used by this connection.141* Returns the proxy address for tunnel connections, or142* clear connection to any host through proxy.143* Returns {@code null} otherwise.144*/145abstract InetSocketAddress proxy();146147/** Tells whether, or not, this connection is open. */148final boolean isOpen() {149return channel().isOpen() &&150(connected() ? !getConnectionFlow().isFinished() : true);151}152153/**154* Forces a call to the native implementation of the155* connection's channel to verify that this channel is still156* open.157* <p>158* This method should only be called just after an HTTP/1.1159* connection is retrieved from the HTTP/1.1 connection pool.160* It is used to trigger an early detection of the channel state,161* before handling the connection over to the HTTP stack.162* It helps minimizing race conditions where the selector manager163* thread hasn't woken up - or hasn't raised the event, before164* the connection was retrieved from the pool. It helps reduce165* the occurrence of "HTTP/1.1 parser received no bytes"166* exception, when the server closes the connection while167* it's being taken out of the pool.168* <p>169* This method attempts to read one byte from the underlying170* channel. Because the connection was in the pool - there171* should be nothing to read.172* <p>173* If {@code read} manages to read a byte off the connection, this is a174* protocol error: the method closes the connection and returns false.175* If {@code read} returns EOF, the method closes the connection and176* returns false.177* If {@code read} throws an exception, the method returns false.178* Otherwise, {@code read} returns 0, the channel appears to be179* still open, and the method returns true.180* @return true if the channel appears to be still open.181*/182final boolean checkOpen() {183if (isOpen()) {184try {185// channel is non blocking186int read = channel().read(ByteBuffer.allocate(1));187if (read == 0) return true;188close();189} catch (IOException x) {190debug.log("Pooled connection is no longer operational: %s",191x.toString());192return false;193}194}195return false;196}197198interface HttpPublisher extends FlowTube.TubePublisher {199void enqueue(List<ByteBuffer> buffers) throws IOException;200void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;201void signalEnqueued() throws IOException;202}203204/**205* Returns the HTTP publisher associated with this connection. May be null206* if invoked before connecting.207*/208abstract HttpPublisher publisher();209210// HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS211private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->212proto.equals("TLSv1.2") || proto.equals("TLSv1.3");213214/**215* Returns true if the given client's SSL parameter protocols contains at216* least one TLS version that HTTP/2 requires.217*/218private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {219String[] protos = client.sslParameters().getProtocols();220if (protos != null) {221return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();222} else {223return false;224}225}226227/**228* Factory for retrieving HttpConnections. A connection can be retrieved229* from the connection pool, or a new one created if none available.230*231* The given {@code addr} is the ultimate destination. Any proxies,232* etc, are determined from the request. Returns a concrete instance which233* is one of the following:234* {@link PlainHttpConnection}235* {@link PlainTunnelingConnection}236*237* The returned connection, if not from the connection pool, must have its,238* connect() or connectAsync() method invoked, which ( when it completes239* successfully ) renders the connection usable for requests.240*/241public static HttpConnection getConnection(InetSocketAddress addr,242HttpClientImpl client,243HttpRequestImpl request,244Version version) {245// The default proxy selector may select a proxy whose address is246// unresolved. We must resolve the address before connecting to it.247InetSocketAddress proxy = Utils.resolveAddress(request.proxy());248HttpConnection c = null;249boolean secure = request.secure();250ConnectionPool pool = client.connectionPool();251252if (!secure) {253c = pool.getConnection(false, addr, proxy);254if (c != null && c.checkOpen() /* may have been eof/closed when in the pool */) {255final HttpConnection conn = c;256if (DEBUG_LOGGER.on())257DEBUG_LOGGER.log(conn.getConnectionFlow()258+ ": plain connection retrieved from HTTP/1.1 pool");259return c;260} else {261return getPlainConnection(addr, proxy, request, client);262}263} else { // secure264if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool265c = pool.getConnection(true, addr, proxy);266}267if (c != null && c.isOpen()) {268final HttpConnection conn = c;269if (DEBUG_LOGGER.on())270DEBUG_LOGGER.log(conn.getConnectionFlow()271+ ": SSL connection retrieved from HTTP/1.1 pool");272return c;273} else {274String[] alpn = null;275if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {276alpn = new String[] { "h2", "http/1.1" };277}278return getSSLConnection(addr, proxy, alpn, request, client);279}280}281}282283private static HttpConnection getSSLConnection(InetSocketAddress addr,284InetSocketAddress proxy,285String[] alpn,286HttpRequestImpl request,287HttpClientImpl client) {288if (proxy != null)289return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,290proxyTunnelHeaders(request));291else292return new AsyncSSLConnection(addr, client, alpn);293}294295/**296* This method is used to build a filter that will accept or297* veto (header-name, value) tuple for transmission on the298* wire.299* The filter is applied to the headers when sending the headers300* to the remote party.301* Which tuple is accepted/vetoed depends on:302* <pre>303* - whether the connection is a tunnel connection304* [talking to a server through a proxy tunnel]305* - whether the method is CONNECT306* [establishing a CONNECT tunnel through a proxy]307* - whether the request is using a proxy308* (and the connection is not a tunnel)309* [talking to a server through a proxy]310* - whether the request is a direct connection to311* a server (no tunnel, no proxy).312* </pre>313* @param request314* @return315*/316BiPredicate<String,String> headerFilter(HttpRequestImpl request) {317if (isTunnel()) {318// talking to a server through a proxy tunnel319// don't send proxy-* headers to a plain server320assert !request.isConnect();321return Utils.NO_PROXY_HEADERS_FILTER;322} else if (request.isConnect()) {323// establishing a proxy tunnel324// check for proxy tunnel disabled schemes325// assert !this.isTunnel();326assert request.proxy() == null;327return Utils.PROXY_TUNNEL_FILTER;328} else if (request.proxy() != null) {329// talking to a server through a proxy (no tunnel)330// check for proxy disabled schemes331// assert !isTunnel() && !request.isConnect();332return Utils.PROXY_FILTER;333} else {334// talking to a server directly (no tunnel, no proxy)335// don't send proxy-* headers to a plain server336// assert request.proxy() == null && !request.isConnect();337return Utils.NO_PROXY_HEADERS_FILTER;338}339}340341BiPredicate<String,String> contextRestricted(HttpRequestImpl request, HttpClient client) {342if (!isTunnel() && request.isConnect()) {343// establishing a proxy tunnel344assert request.proxy() == null;345return Utils.PROXY_TUNNEL_RESTRICTED(client);346} else {347return Utils.CONTEXT_RESTRICTED(client);348}349}350351// Composes a new immutable HttpHeaders that combines the352// user and system header but only keeps those headers that353// start with "proxy-"354private static ProxyHeaders proxyTunnelHeaders(HttpRequestImpl request) {355HttpHeaders userHeaders = HttpHeaders.of(request.headers().map(), Utils.PROXY_TUNNEL_FILTER);356HttpHeaders systemHeaders = HttpHeaders.of(request.getSystemHeadersBuilder().map(), Utils.PROXY_TUNNEL_FILTER);357return new ProxyHeaders(userHeaders, systemHeaders);358}359360/* Returns either a plain HTTP connection or a plain tunnelling connection361* for proxied WebSocket */362private static HttpConnection getPlainConnection(InetSocketAddress addr,363InetSocketAddress proxy,364HttpRequestImpl request,365HttpClientImpl client) {366if (request.isWebSocket() && proxy != null)367return new PlainTunnelingConnection(addr, proxy, client,368proxyTunnelHeaders(request));369370if (proxy == null)371return new PlainHttpConnection(addr, client);372else373return new PlainProxyConnection(proxy, client);374}375376void closeOrReturnToCache(HttpHeaders hdrs) {377if (hdrs == null) {378// the connection was closed by server, eof379Log.logTrace("Cannot return connection to pool: closing {0}", this);380close();381return;382}383HttpClientImpl client = client();384if (client == null) {385Log.logTrace("Client released: closing {0}", this);386close();387return;388}389ConnectionPool pool = client.connectionPool();390boolean keepAlive = hdrs.firstValue("Connection")391.map((s) -> !s.equalsIgnoreCase("close"))392.orElse(true);393394if (keepAlive && checkOpen()) {395Log.logTrace("Returning connection to the pool: {0}", this);396pool.returnToPool(this);397} else {398Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}",399keepAlive, isOpen(), this);400close();401}402}403404/* Tells whether or not this connection is a tunnel through a proxy */405boolean isTunnel() { return false; }406407abstract SocketChannel channel();408409final InetSocketAddress address() {410return address;411}412413abstract ConnectionPool.CacheKey cacheKey();414415/**416* Closes this connection, by returning the socket to its connection pool.417*/418@Override419public abstract void close();420421abstract FlowTube getConnectionFlow();422423/**424* A publisher that makes it possible to publish (write) ordered (normal425* priority) and unordered (high priority) buffers downstream.426*/427final class PlainHttpPublisher implements HttpPublisher {428final Object reading;429PlainHttpPublisher() {430this(new Object());431}432PlainHttpPublisher(Object readingLock) {433this.reading = readingLock;434}435final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();436final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();437volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;438volatile HttpWriteSubscription subscription;439final SequentialScheduler writeScheduler =440new SequentialScheduler(this::flushTask);441@Override442public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {443synchronized (reading) {444//assert this.subscription == null;445//assert this.subscriber == null;446if (subscription == null) {447subscription = new HttpWriteSubscription();448}449this.subscriber = subscriber;450}451// TODO: should we do this in the flow?452subscriber.onSubscribe(subscription);453signal();454}455456void flushTask(DeferredCompleter completer) {457try {458HttpWriteSubscription sub = subscription;459if (sub != null) sub.flush();460} finally {461completer.complete();462}463}464465void signal() {466writeScheduler.runOrSchedule();467}468469final class HttpWriteSubscription implements Flow.Subscription {470final Demand demand = new Demand();471472@Override473public void request(long n) {474if (n <= 0) throw new IllegalArgumentException("non-positive request");475demand.increase(n);476if (debug.on())477debug.log("HttpPublisher: got request of " + n + " from "478+ getConnectionFlow());479writeScheduler.runOrSchedule();480}481482@Override483public void cancel() {484if (debug.on())485debug.log("HttpPublisher: cancelled by " + getConnectionFlow());486}487488private boolean isEmpty() {489return queue.isEmpty() && priority.isEmpty();490}491492private List<ByteBuffer> poll() {493List<ByteBuffer> elem = priority.poll();494return elem == null ? queue.poll() : elem;495}496497void flush() {498while (!isEmpty() && demand.tryDecrement()) {499List<ByteBuffer> elem = poll();500if (debug.on())501debug.log("HttpPublisher: sending "502+ Utils.remaining(elem) + " bytes ("503+ elem.size() + " buffers) to "504+ getConnectionFlow());505subscriber.onNext(elem);506}507}508}509510@Override511public void enqueue(List<ByteBuffer> buffers) throws IOException {512queue.add(buffers);513int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();514debug.log("added %d bytes to the write queue", bytes);515}516517@Override518public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {519// Unordered frames are sent before existing frames.520int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();521priority.add(buffers);522debug.log("added %d bytes in the priority write queue", bytes);523}524525@Override526public void signalEnqueued() throws IOException {527debug.log("signalling the publisher of the write queue");528signal();529}530}531532String dbgTag;533final String dbgString() {534FlowTube flow = getConnectionFlow();535String tag = dbgTag;536if (tag == null && flow != null) {537dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";538} else if (tag == null) {539tag = this.getClass().getSimpleName() + "(?)";540}541return tag;542}543544@Override545public String toString() {546return "HttpConnection: " + channel().toString();547}548}549550551