Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import javax.net.ssl.SSLContext;28import javax.net.ssl.SSLException;29import javax.net.ssl.SSLHandshakeException;30import javax.net.ssl.SSLParameters;31import java.io.IOException;32import java.io.UncheckedIOException;33import java.lang.ref.Reference;34import java.lang.ref.WeakReference;35import java.net.Authenticator;36import java.net.ConnectException;37import java.net.CookieHandler;38import java.net.ProxySelector;39import java.net.http.HttpConnectTimeoutException;40import java.net.http.HttpTimeoutException;41import java.nio.ByteBuffer;42import java.nio.channels.CancelledKeyException;43import java.nio.channels.ClosedChannelException;44import java.nio.channels.SelectableChannel;45import java.nio.channels.SelectionKey;46import java.nio.channels.Selector;47import java.nio.channels.SocketChannel;48import java.security.AccessControlContext;49import java.security.AccessController;50import java.security.NoSuchAlgorithmException;51import java.security.PrivilegedAction;52import java.time.Duration;53import java.time.Instant;54import java.time.temporal.ChronoUnit;55import java.util.ArrayList;56import java.util.HashSet;57import java.util.Iterator;58import java.util.LinkedList;59import java.util.List;60import java.util.Objects;61import java.util.Optional;62import java.util.Set;63import java.util.TreeSet;64import java.util.concurrent.CompletableFuture;65import java.util.concurrent.CompletionException;66import java.util.concurrent.ExecutionException;67import java.util.concurrent.Executor;68import java.util.concurrent.ExecutorService;69import java.util.concurrent.Executors;70import java.util.concurrent.ThreadFactory;71import java.util.concurrent.atomic.AtomicInteger;72import java.util.concurrent.atomic.AtomicLong;73import java.util.function.BooleanSupplier;74import java.util.stream.Stream;75import java.net.http.HttpClient;76import java.net.http.HttpRequest;77import java.net.http.HttpResponse;78import java.net.http.HttpResponse.BodyHandler;79import java.net.http.HttpResponse.PushPromiseHandler;80import java.net.http.WebSocket;81import jdk.internal.net.http.common.BufferSupplier;82import jdk.internal.net.http.common.Log;83import jdk.internal.net.http.common.Logger;84import jdk.internal.net.http.common.Pair;85import jdk.internal.net.http.common.Utils;86import jdk.internal.net.http.common.OperationTrackers.Trackable;87import jdk.internal.net.http.common.OperationTrackers.Tracker;88import jdk.internal.net.http.websocket.BuilderImpl;89import jdk.internal.misc.InnocuousThread;9091/**92* Client implementation. Contains all configuration information and also93* the selector manager thread which allows async events to be registered94* and delivered when they occur. See AsyncEvent.95*/96final class HttpClientImpl extends HttpClient implements Trackable {9798static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag99static final boolean DEBUGTIMEOUT = false; // dev flag100final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);101final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);102final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);103static final AtomicLong CLIENT_IDS = new AtomicLong();104105// Define the default factory as a static inner class106// that embeds all the necessary logic to avoid107// the risk of using a lambda that might keep a reference on the108// HttpClient instance from which it was created (helps with109// heapdump analysis).110private static final class DefaultThreadFactory implements ThreadFactory {111private final String namePrefix;112private final AtomicInteger nextId = new AtomicInteger();113114DefaultThreadFactory(long clientID) {115namePrefix = "HttpClient-" + clientID + "-Worker-";116}117118@SuppressWarnings("removal")119@Override120public Thread newThread(Runnable r) {121String name = namePrefix + nextId.getAndIncrement();122Thread t;123if (System.getSecurityManager() == null) {124t = new Thread(null, r, name, 0, false);125} else {126t = InnocuousThread.newThread(name, r);127}128t.setDaemon(true);129return t;130}131}132133/**134* A DelegatingExecutor is an executor that delegates tasks to135* a wrapped executor when it detects that the current thread136* is the SelectorManager thread. If the current thread is not137* the selector manager thread the given task is executed inline.138*/139final static class DelegatingExecutor implements Executor {140private final BooleanSupplier isInSelectorThread;141private final Executor delegate;142DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {143this.isInSelectorThread = isInSelectorThread;144this.delegate = delegate;145}146147Executor delegate() {148return delegate;149}150151@Override152public void execute(Runnable command) {153if (isInSelectorThread.getAsBoolean()) {154delegate.execute(command);155} else {156command.run();157}158}159160@SuppressWarnings("removal")161private void shutdown() {162if (delegate instanceof ExecutorService service) {163PrivilegedAction<?> action = () -> {164service.shutdown();165return null;166};167AccessController.doPrivileged(action, null,168new RuntimePermission("modifyThread"));169}170}171172}173174private final CookieHandler cookieHandler;175private final Duration connectTimeout;176private final Redirect followRedirects;177private final ProxySelector userProxySelector;178private final ProxySelector proxySelector;179private final Authenticator authenticator;180private final Version version;181private final ConnectionPool connections;182private final DelegatingExecutor delegatingExecutor;183private final boolean isDefaultExecutor;184// Security parameters185private final SSLContext sslContext;186private final SSLParameters sslParams;187private final SelectorManager selmgr;188private final FilterFactory filters;189private final Http2ClientImpl client2;190private final long id;191private final String dbgTag;192193// The SSL DirectBuffer Supplier provides the ability to recycle194// buffers used between the socket reader and the SSLEngine, or195// more precisely between the SocketTube publisher and the196// SSLFlowDelegate reader.197private final SSLDirectBufferSupplier sslBufferSupplier198= new SSLDirectBufferSupplier(this);199200// This reference is used to keep track of the facade HttpClient201// that was returned to the application code.202// It makes it possible to know when the application no longer203// holds any reference to the HttpClient.204// Unfortunately, this information is not enough to know when205// to exit the SelectorManager thread. Because of the asynchronous206// nature of the API, we also need to wait until all pending operations207// have completed.208private final WeakReference<HttpClientFacade> facadeRef;209210// This counter keeps track of the number of operations pending211// on the HttpClient. The SelectorManager thread will wait212// until there are no longer any pending operations and the213// facadeRef is cleared before exiting.214//215// The pendingOperationCount is incremented every time a send/sendAsync216// operation is invoked on the HttpClient, and is decremented when217// the HttpResponse<T> object is returned to the user.218// However, at this point, the body may not have been fully read yet.219// This is the case when the response T is implemented as a streaming220// subscriber (such as an InputStream).221//222// To take care of this issue the pendingOperationCount will additionally223// be incremented/decremented in the following cases:224//225// 1. For HTTP/2 it is incremented when a stream is added to the226// Http2Connection streams map, and decreased when the stream is removed227// from the map. This should also take care of push promises.228// 2. For WebSocket the count is increased when creating a229// DetachedConnectionChannel for the socket, and decreased230// when the channel is closed.231// In addition, the HttpClient facade is passed to the WebSocket builder,232// (instead of the client implementation delegate).233// 3. For HTTP/1.1 the count is incremented before starting to parse the body234// response, and decremented when the parser has reached the end of the235// response body flow.236//237// This should ensure that the selector manager thread remains alive until238// the response has been fully received or the web socket is closed.239private final AtomicLong pendingOperationCount = new AtomicLong();240private final AtomicLong pendingWebSocketCount = new AtomicLong();241private final AtomicLong pendingHttpRequestCount = new AtomicLong();242private final AtomicLong pendingHttp2StreamCount = new AtomicLong();243244/** A Set of, deadline first, ordered timeout events. */245private final TreeSet<TimeoutEvent> timeouts;246247/**248* This is a bit tricky:249* 1. an HttpClientFacade has a final HttpClientImpl field.250* 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,251* where the referent is the facade created for that instance.252* 3. We cannot just create the HttpClientFacade in the HttpClientImpl253* constructor, because it would be only weakly referenced and could254* be GC'ed before we can return it.255* The solution is to use an instance of SingleFacadeFactory which will256* allow the caller of new HttpClientImpl(...) to retrieve the facade257* after the HttpClientImpl has been created.258*/259private static final class SingleFacadeFactory {260HttpClientFacade facade;261HttpClientFacade createFacade(HttpClientImpl impl) {262assert facade == null;263return (facade = new HttpClientFacade(impl));264}265}266267static HttpClientFacade create(HttpClientBuilderImpl builder) {268SingleFacadeFactory facadeFactory = new SingleFacadeFactory();269HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);270impl.start();271assert facadeFactory.facade != null;272assert impl.facadeRef.get() == facadeFactory.facade;273return facadeFactory.facade;274}275276private HttpClientImpl(HttpClientBuilderImpl builder,277SingleFacadeFactory facadeFactory) {278id = CLIENT_IDS.incrementAndGet();279dbgTag = "HttpClientImpl(" + id +")";280if (builder.sslContext == null) {281try {282sslContext = SSLContext.getDefault();283} catch (NoSuchAlgorithmException ex) {284throw new UncheckedIOException(new IOException(ex));285}286} else {287sslContext = builder.sslContext;288}289Executor ex = builder.executor;290if (ex == null) {291ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));292isDefaultExecutor = true;293} else {294isDefaultExecutor = false;295}296delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);297facadeRef = new WeakReference<>(facadeFactory.createFacade(this));298client2 = new Http2ClientImpl(this);299cookieHandler = builder.cookieHandler;300connectTimeout = builder.connectTimeout;301followRedirects = builder.followRedirects == null ?302Redirect.NEVER : builder.followRedirects;303this.userProxySelector = builder.proxy;304this.proxySelector = Optional.ofNullable(userProxySelector)305.orElseGet(HttpClientImpl::getDefaultProxySelector);306if (debug.on())307debug.log("proxySelector is %s (user-supplied=%s)",308this.proxySelector, userProxySelector != null);309authenticator = builder.authenticator;310if (builder.version == null) {311version = HttpClient.Version.HTTP_2;312} else {313version = builder.version;314}315if (builder.sslParams == null) {316sslParams = getDefaultParams(sslContext);317} else {318sslParams = builder.sslParams;319}320connections = new ConnectionPool(id);321connections.start();322timeouts = new TreeSet<>();323try {324selmgr = new SelectorManager(this);325} catch (IOException e) {326// unlikely327throw new UncheckedIOException(e);328}329selmgr.setDaemon(true);330filters = new FilterFactory();331initFilters();332assert facadeRef.get() != null;333}334335private void start() {336selmgr.start();337}338339// Called from the SelectorManager thread, just before exiting.340// Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections341// that may be still lingering there are properly closed (and their342// possibly still opened SocketChannel released).343private void stop() {344// Clears HTTP/1.1 cache and close its connections345connections.stop();346// Clears HTTP/2 cache and close its connections.347client2.stop();348// shutdown the executor if needed349if (isDefaultExecutor) delegatingExecutor.shutdown();350}351352private static SSLParameters getDefaultParams(SSLContext ctx) {353SSLParameters params = ctx.getDefaultSSLParameters();354return params;355}356357@SuppressWarnings("removal")358private static ProxySelector getDefaultProxySelector() {359PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;360return AccessController.doPrivileged(action);361}362363// Returns the facade that was returned to the application code.364// May be null if that facade is no longer referenced.365final HttpClientFacade facade() {366return facadeRef.get();367}368369// Increments the pendingOperationCount.370final long reference() {371pendingHttpRequestCount.incrementAndGet();372return pendingOperationCount.incrementAndGet();373}374375// Decrements the pendingOperationCount.376final long unreference() {377final long count = pendingOperationCount.decrementAndGet();378final long httpCount = pendingHttpRequestCount.decrementAndGet();379final long http2Count = pendingHttp2StreamCount.get();380final long webSocketCount = pendingWebSocketCount.get();381if (count == 0 && facade() == null) {382selmgr.wakeupSelector();383}384assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";385assert http2Count >= 0 : "count of HTTP/2 operations < 0";386assert webSocketCount >= 0 : "count of WS operations < 0";387assert count >= 0 : "count of pending operations < 0";388return count;389}390391// Increments the pendingOperationCount.392final long streamReference() {393pendingHttp2StreamCount.incrementAndGet();394return pendingOperationCount.incrementAndGet();395}396397// Decrements the pendingOperationCount.398final long streamUnreference() {399final long count = pendingOperationCount.decrementAndGet();400final long http2Count = pendingHttp2StreamCount.decrementAndGet();401final long httpCount = pendingHttpRequestCount.get();402final long webSocketCount = pendingWebSocketCount.get();403if (count == 0 && facade() == null) {404selmgr.wakeupSelector();405}406assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";407assert http2Count >= 0 : "count of HTTP/2 operations < 0";408assert webSocketCount >= 0 : "count of WS operations < 0";409assert count >= 0 : "count of pending operations < 0";410return count;411}412413// Increments the pendingOperationCount.414final long webSocketOpen() {415pendingWebSocketCount.incrementAndGet();416return pendingOperationCount.incrementAndGet();417}418419// Decrements the pendingOperationCount.420final long webSocketClose() {421final long count = pendingOperationCount.decrementAndGet();422final long webSocketCount = pendingWebSocketCount.decrementAndGet();423final long httpCount = pendingHttpRequestCount.get();424final long http2Count = pendingHttp2StreamCount.get();425if (count == 0 && facade() == null) {426selmgr.wakeupSelector();427}428assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";429assert http2Count >= 0 : "count of HTTP/2 operations < 0";430assert webSocketCount >= 0 : "count of WS operations < 0";431assert count >= 0 : "count of pending operations < 0";432return count;433}434435// Returns the pendingOperationCount.436final long referenceCount() {437return pendingOperationCount.get();438}439440final static class HttpClientTracker implements Tracker {441final AtomicLong httpCount;442final AtomicLong http2Count;443final AtomicLong websocketCount;444final AtomicLong operationsCount;445final Reference<?> reference;446final String name;447HttpClientTracker(AtomicLong http,448AtomicLong http2,449AtomicLong ws,450AtomicLong ops,451Reference<?> ref,452String name) {453this.httpCount = http;454this.http2Count = http2;455this.websocketCount = ws;456this.operationsCount = ops;457this.reference = ref;458this.name = name;459}460@Override461public long getOutstandingOperations() {462return operationsCount.get();463}464@Override465public long getOutstandingHttpOperations() {466return httpCount.get();467}468@Override469public long getOutstandingHttp2Streams() { return http2Count.get(); }470@Override471public long getOutstandingWebSocketOperations() {472return websocketCount.get();473}474@Override475public boolean isFacadeReferenced() {476return reference.get() != null;477}478@Override479public String getName() {480return name;481}482}483484public Tracker getOperationsTracker() {485return new HttpClientTracker(pendingHttpRequestCount,486pendingHttp2StreamCount,487pendingWebSocketCount,488pendingOperationCount,489facadeRef,490dbgTag);491}492493// Called by the SelectorManager thread to figure out whether it's time494// to terminate.495final boolean isReferenced() {496HttpClient facade = facade();497return facade != null || referenceCount() > 0;498}499500/**501* Wait for activity on given exchange.502* The following occurs in the SelectorManager thread.503*504* 1) add to selector505* 2) If selector fires for this exchange then506* call AsyncEvent.handle()507*508* If exchange needs to change interest ops, then call registerEvent() again.509*/510void registerEvent(AsyncEvent exchange) throws IOException {511selmgr.register(exchange);512}513514/**515* Allows an AsyncEvent to modify its interestOps.516* @param event The modified event.517*/518void eventUpdated(AsyncEvent event) throws ClosedChannelException {519assert !(event instanceof AsyncTriggerEvent);520selmgr.eventUpdated(event);521}522523boolean isSelectorThread() {524return Thread.currentThread() == selmgr;525}526527Http2ClientImpl client2() {528return client2;529}530531private void debugCompleted(String tag, long startNanos, HttpRequest req) {532if (debugelapsed.on()) {533debugelapsed.log(tag + " elapsed "534+ (System.nanoTime() - startNanos)/1000_000L535+ " millis for " + req.method()536+ " to " + req.uri());537}538}539540@Override541public <T> HttpResponse<T>542send(HttpRequest req, BodyHandler<T> responseHandler)543throws IOException, InterruptedException544{545CompletableFuture<HttpResponse<T>> cf = null;546547// if the thread is already interrupted no need to go further.548// cf.get() would throw anyway.549if (Thread.interrupted()) throw new InterruptedException();550try {551cf = sendAsync(req, responseHandler, null, null);552return cf.get();553} catch (InterruptedException ie) {554if (cf != null )555cf.cancel(true);556throw ie;557} catch (ExecutionException e) {558final Throwable throwable = e.getCause();559final String msg = throwable.getMessage();560561if (throwable instanceof IllegalArgumentException) {562throw new IllegalArgumentException(msg, throwable);563} else if (throwable instanceof SecurityException) {564throw new SecurityException(msg, throwable);565} else if (throwable instanceof HttpConnectTimeoutException) {566HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);567hcte.initCause(throwable);568throw hcte;569} else if (throwable instanceof HttpTimeoutException) {570throw new HttpTimeoutException(msg);571} else if (throwable instanceof ConnectException) {572ConnectException ce = new ConnectException(msg);573ce.initCause(throwable);574throw ce;575} else if (throwable instanceof SSLHandshakeException) {576// special case for SSLHandshakeException577SSLHandshakeException he = new SSLHandshakeException(msg);578he.initCause(throwable);579throw he;580} else if (throwable instanceof SSLException) {581// any other SSLException is wrapped in a plain582// SSLException583throw new SSLException(msg, throwable);584} else if (throwable instanceof IOException) {585throw new IOException(msg, throwable);586} else {587throw new IOException(msg, throwable);588}589}590}591592private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();593594@Override595public <T> CompletableFuture<HttpResponse<T>>596sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)597{598return sendAsync(userRequest, responseHandler, null);599}600601@Override602public <T> CompletableFuture<HttpResponse<T>>603sendAsync(HttpRequest userRequest,604BodyHandler<T> responseHandler,605PushPromiseHandler<T> pushPromiseHandler) {606return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);607}608609@SuppressWarnings("removal")610private <T> CompletableFuture<HttpResponse<T>>611sendAsync(HttpRequest userRequest,612BodyHandler<T> responseHandler,613PushPromiseHandler<T> pushPromiseHandler,614Executor exchangeExecutor) {615616Objects.requireNonNull(userRequest);617Objects.requireNonNull(responseHandler);618619AccessControlContext acc = null;620if (System.getSecurityManager() != null)621acc = AccessController.getContext();622623// Clone the, possibly untrusted, HttpRequest624HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);625if (requestImpl.method().equals("CONNECT"))626throw new IllegalArgumentException("Unsupported method CONNECT");627628long start = DEBUGELAPSED ? System.nanoTime() : 0;629reference();630try {631if (debugelapsed.on())632debugelapsed.log("ClientImpl (async) send %s", userRequest);633634// When using sendAsync(...) we explicitly pass the635// executor's delegate as exchange executor to force636// asynchronous scheduling of the exchange.637// When using send(...) we don't specify any executor638// and default to using the client's delegating executor639// which only spawns asynchronous tasks if it detects640// that the current thread is the selector manager641// thread. This will cause everything to execute inline642// until we need to schedule some event with the selector.643Executor executor = exchangeExecutor == null644? this.delegatingExecutor : exchangeExecutor;645646MultiExchange<T> mex = new MultiExchange<>(userRequest,647requestImpl,648this,649responseHandler,650pushPromiseHandler,651acc);652CompletableFuture<HttpResponse<T>> res =653mex.responseAsync(executor).whenComplete((b,t) -> unreference());654if (DEBUGELAPSED) {655res = res.whenComplete(656(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));657}658659// makes sure that any dependent actions happen in the CF default660// executor. This is only needed for sendAsync(...), when661// exchangeExecutor is non-null.662if (exchangeExecutor != null) {663res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);664}665return res;666} catch(Throwable t) {667unreference();668debugCompleted("ClientImpl (async)", start, userRequest);669throw t;670}671}672673// Main loop for this client's selector674private final static class SelectorManager extends Thread {675676// For testing purposes we have an internal System property that677// can control the frequency at which the selector manager will wake678// up when there are no pending operations.679// Increasing the frequency (shorter delays) might allow the selector680// to observe that the facade is no longer referenced and might allow681// the selector thread to terminate more timely - for when nothing is682// ongoing it will only check for that condition every NODEADLINE ms.683// To avoid misuse of the property, the delay that can be specified684// is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default685// value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms686// The property is -Djdk.internal.httpclient.selectorTimeout=<millis>687private static final int MIN_NODEADLINE = 1000; // ms688private static final int MAX_NODEADLINE = 1000 * 1200; // ms689private static final int DEF_NODEADLINE = 3000; // ms690private static final long NODEADLINE; // default is DEF_NODEADLINE ms691static {692// ensure NODEADLINE is initialized with some valid value.693long deadline = Utils.getIntegerProperty(694"jdk.internal.httpclient.selectorTimeout",695DEF_NODEADLINE); // millis696if (deadline <= 0) deadline = DEF_NODEADLINE;697deadline = Math.max(deadline, MIN_NODEADLINE);698NODEADLINE = Math.min(deadline, MAX_NODEADLINE);699}700701private final Selector selector;702private volatile boolean closed;703private final List<AsyncEvent> registrations;704private final List<AsyncTriggerEvent> deregistrations;705private final Logger debug;706private final Logger debugtimeout;707HttpClientImpl owner;708ConnectionPool pool;709710SelectorManager(HttpClientImpl ref) throws IOException {711super(null, null,712"HttpClient-" + ref.id + "-SelectorManager",7130, false);714owner = ref;715debug = ref.debug;716debugtimeout = ref.debugtimeout;717pool = ref.connectionPool();718registrations = new ArrayList<>();719deregistrations = new ArrayList<>();720selector = Selector.open();721}722723void eventUpdated(AsyncEvent e) throws ClosedChannelException {724if (Thread.currentThread() == this) {725SelectionKey key = e.channel().keyFor(selector);726if (key != null && key.isValid()) {727SelectorAttachment sa = (SelectorAttachment) key.attachment();728sa.register(e);729} else if (e.interestOps() != 0){730// We don't care about paused events.731// These are actually handled by732// SelectorAttachment::resetInterestOps later on.733// But if we reach here when trying to resume an734// event then it's better to fail fast.735if (debug.on()) debug.log("No key for channel");736e.abort(new IOException("No key for channel"));737}738} else {739register(e);740}741}742743// This returns immediately. So caller not allowed to send/receive744// on connection.745synchronized void register(AsyncEvent e) {746registrations.add(e);747selector.wakeup();748}749750synchronized void cancel(SocketChannel e) {751SelectionKey key = e.keyFor(selector);752if (key != null) {753key.cancel();754}755selector.wakeup();756}757758void wakeupSelector() {759selector.wakeup();760}761762synchronized void shutdown() {763Log.logTrace("{0}: shutting down", getName());764if (debug.on()) debug.log("SelectorManager shutting down");765closed = true;766try {767selector.close();768} catch (IOException ignored) {769} finally {770owner.stop();771}772}773774@Override775public void run() {776List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();777List<AsyncEvent> readyList = new ArrayList<>();778List<Runnable> resetList = new ArrayList<>();779try {780if (Log.channel()) Log.logChannel(getName() + ": starting");781while (!Thread.currentThread().isInterrupted()) {782synchronized (this) {783assert errorList.isEmpty();784assert readyList.isEmpty();785assert resetList.isEmpty();786for (AsyncTriggerEvent event : deregistrations) {787event.handle();788}789deregistrations.clear();790for (AsyncEvent event : registrations) {791if (event instanceof AsyncTriggerEvent) {792readyList.add(event);793continue;794}795SelectableChannel chan = event.channel();796SelectionKey key = null;797try {798key = chan.keyFor(selector);799SelectorAttachment sa;800if (key == null || !key.isValid()) {801if (key != null) {802// key is canceled.803// invoke selectNow() to purge it804// before registering the new event.805selector.selectNow();806}807sa = new SelectorAttachment(chan, selector);808} else {809sa = (SelectorAttachment) key.attachment();810}811// may throw IOE if channel closed: that's OK812sa.register(event);813if (!chan.isOpen()) {814throw new IOException("Channel closed");815}816} catch (IOException e) {817Log.logTrace("{0}: {1}", getName(), e);818if (debug.on())819debug.log("Got " + e.getClass().getName()820+ " while handling registration events");821chan.close();822// let the event abort deal with it823errorList.add(new Pair<>(event, e));824if (key != null) {825key.cancel();826selector.selectNow();827}828}829}830registrations.clear();831selector.selectedKeys().clear();832}833834for (AsyncEvent event : readyList) {835assert event instanceof AsyncTriggerEvent;836event.handle();837}838readyList.clear();839840for (Pair<AsyncEvent,IOException> error : errorList) {841// an IOException was raised and the channel closed.842handleEvent(error.first, error.second);843}844errorList.clear();845846// Check whether client is still alive, and if not,847// gracefully stop this thread848if (!owner.isReferenced()) {849Log.logTrace("{0}: {1}",850getName(),851"HttpClient no longer referenced. Exiting...");852return;853}854855// Timeouts will have milliseconds granularity. It is important856// to handle them in a timely fashion.857long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();858if (debugtimeout.on())859debugtimeout.log("next timeout: %d", nextTimeout);860861// Keep-alive have seconds granularity. It's not really an862// issue if we keep connections linger a bit more in the keep863// alive cache.864long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();865if (debugtimeout.on())866debugtimeout.log("next expired: %d", nextExpiry);867868assert nextTimeout >= 0;869assert nextExpiry >= 0;870871// Don't wait for ever as it might prevent the thread to872// stop gracefully. millis will be 0 if no deadline was found.873if (nextTimeout <= 0) nextTimeout = NODEADLINE;874875// Clip nextExpiry at NODEADLINE limit. The default876// keep alive is 1200 seconds (half an hour) - we don't877// want to wait that long.878if (nextExpiry <= 0) nextExpiry = NODEADLINE;879else nextExpiry = Math.min(NODEADLINE, nextExpiry);880881// takes the least of the two.882long millis = Math.min(nextExpiry, nextTimeout);883884if (debugtimeout.on())885debugtimeout.log("Next deadline is %d",886(millis == 0 ? NODEADLINE : millis));887//debugPrint(selector);888int n = selector.select(millis == 0 ? NODEADLINE : millis);889if (n == 0) {890// Check whether client is still alive, and if not,891// gracefully stop this thread892if (!owner.isReferenced()) {893Log.logTrace("{0}: {1}",894getName(),895"HttpClient no longer referenced. Exiting...");896return;897}898owner.purgeTimeoutsAndReturnNextDeadline();899continue;900}901902Set<SelectionKey> keys = selector.selectedKeys();903assert errorList.isEmpty();904905for (SelectionKey key : keys) {906SelectorAttachment sa = (SelectorAttachment) key.attachment();907if (!key.isValid()) {908IOException ex = sa.chan.isOpen()909? new IOException("Invalid key")910: new ClosedChannelException();911sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));912sa.pending.clear();913continue;914}915916int eventsOccurred;917try {918eventsOccurred = key.readyOps();919} catch (CancelledKeyException ex) {920IOException io = Utils.getIOException(ex);921sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));922sa.pending.clear();923continue;924}925sa.events(eventsOccurred).forEach(readyList::add);926resetList.add(() -> sa.resetInterestOps(eventsOccurred));927}928929selector.selectNow(); // complete cancellation930selector.selectedKeys().clear();931932// handle selected events933readyList.forEach((e) -> handleEvent(e, null));934readyList.clear();935936// handle errors (closed channels etc...)937errorList.forEach((p) -> handleEvent(p.first, p.second));938errorList.clear();939940// reset interest ops for selected channels941resetList.forEach(r -> r.run());942resetList.clear();943944}945} catch (Throwable e) {946if (!closed) {947// This terminates thread. So, better just print stack trace948String err = Utils.stackTrace(e);949Log.logError("{0}: {1}: {2}", getName(),950"HttpClientImpl shutting down due to fatal error", err);951}952if (debug.on()) debug.log("shutting down", e);953if (Utils.ASSERTIONSENABLED && !debug.on()) {954e.printStackTrace(System.err); // always print the stack955}956} finally {957if (Log.channel()) Log.logChannel(getName() + ": stopping");958shutdown();959}960}961962// void debugPrint(Selector selector) {963// System.err.println("Selector: debugprint start");964// Set<SelectionKey> keys = selector.keys();965// for (SelectionKey key : keys) {966// SelectableChannel c = key.channel();967// int ops = key.interestOps();968// System.err.printf("selector chan:%s ops:%d\n", c, ops);969// }970// System.err.println("Selector: debugprint end");971// }972973/** Handles the given event. The given ioe may be null. */974void handleEvent(AsyncEvent event, IOException ioe) {975if (closed || ioe != null) {976event.abort(ioe);977} else {978event.handle();979}980}981}982983final String debugInterestOps(SelectableChannel channel) {984try {985SelectionKey key = channel.keyFor(selmgr.selector);986if (key == null) return "channel not registered with selector";987String keyInterestOps = key.isValid()988? "key.interestOps=" + key.interestOps() : "invalid key";989return String.format("channel registered with selector, %s, sa.interestOps=%s",990keyInterestOps,991((SelectorAttachment)key.attachment()).interestOps);992} catch (Throwable t) {993return String.valueOf(t);994}995}996997/**998* Tracks multiple user level registrations associated with one NIO999* registration (SelectionKey). In this implementation, registrations1000* are one-off and when an event is posted the registration is cancelled1001* until explicitly registered again.1002*1003* <p> No external synchronization required as this class is only used1004* by the SelectorManager thread. One of these objects required per1005* connection.1006*/1007private static class SelectorAttachment {1008private final SelectableChannel chan;1009private final Selector selector;1010private final Set<AsyncEvent> pending;1011private final static Logger debug =1012Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);1013private int interestOps;10141015SelectorAttachment(SelectableChannel chan, Selector selector) {1016this.pending = new HashSet<>();1017this.chan = chan;1018this.selector = selector;1019}10201021void register(AsyncEvent e) throws ClosedChannelException {1022int newOps = e.interestOps();1023// re register interest if we are not already interested1024// in the event. If the event is paused, then the pause will1025// be taken into account later when resetInterestOps is called.1026boolean reRegister = (interestOps & newOps) != newOps;1027interestOps |= newOps;1028pending.add(e);1029if (debug.on())1030debug.log("Registering %s for %d (%s)", e, newOps, reRegister);1031if (reRegister) {1032// first time registration happens here also1033try {1034chan.register(selector, interestOps, this);1035} catch (Throwable x) {1036abortPending(x);1037}1038} else if (!chan.isOpen()) {1039abortPending(new ClosedChannelException());1040}1041}10421043/**1044* Returns a Stream<AsyncEvents> containing only events that are1045* registered with the given {@code interestOps}.1046*/1047Stream<AsyncEvent> events(int interestOps) {1048return pending.stream()1049.filter(ev -> (ev.interestOps() & interestOps) != 0);1050}10511052/**1053* Removes any events with the given {@code interestOps}, and if no1054* events remaining, cancels the associated SelectionKey.1055*/1056void resetInterestOps(int interestOps) {1057int newOps = 0;10581059Iterator<AsyncEvent> itr = pending.iterator();1060while (itr.hasNext()) {1061AsyncEvent event = itr.next();1062int evops = event.interestOps();1063if (event.repeating()) {1064newOps |= evops;1065continue;1066}1067if ((evops & interestOps) != 0) {1068itr.remove();1069} else {1070newOps |= evops;1071}1072}10731074this.interestOps = newOps;1075SelectionKey key = chan.keyFor(selector);1076if (newOps == 0 && key != null && pending.isEmpty()) {1077key.cancel();1078} else {1079try {1080if (key == null || !key.isValid()) {1081throw new CancelledKeyException();1082}1083key.interestOps(newOps);1084// double check after1085if (!chan.isOpen()) {1086abortPending(new ClosedChannelException());1087return;1088}1089assert key.interestOps() == newOps;1090} catch (CancelledKeyException x) {1091// channel may have been closed1092if (debug.on()) debug.log("key cancelled for " + chan);1093abortPending(x);1094}1095}1096}10971098void abortPending(Throwable x) {1099if (!pending.isEmpty()) {1100AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);1101pending.clear();1102IOException io = Utils.getIOException(x);1103for (AsyncEvent event : evts) {1104event.abort(io);1105}1106}1107}1108}11091110/*package-private*/ SSLContext theSSLContext() {1111return sslContext;1112}11131114@Override1115public SSLContext sslContext() {1116return sslContext;1117}11181119@Override1120public SSLParameters sslParameters() {1121return Utils.copySSLParameters(sslParams);1122}11231124@Override1125public Optional<Authenticator> authenticator() {1126return Optional.ofNullable(authenticator);1127}11281129/*package-private*/ final DelegatingExecutor theExecutor() {1130return delegatingExecutor;1131}11321133@Override1134public final Optional<Executor> executor() {1135return isDefaultExecutor1136? Optional.empty()1137: Optional.of(delegatingExecutor.delegate());1138}11391140ConnectionPool connectionPool() {1141return connections;1142}11431144@Override1145public Redirect followRedirects() {1146return followRedirects;1147}114811491150@Override1151public Optional<CookieHandler> cookieHandler() {1152return Optional.ofNullable(cookieHandler);1153}11541155@Override1156public Optional<Duration> connectTimeout() {1157return Optional.ofNullable(connectTimeout);1158}11591160@Override1161public Optional<ProxySelector> proxy() {1162return Optional.ofNullable(userProxySelector);1163}11641165// Return the effective proxy that this client uses.1166ProxySelector proxySelector() {1167return proxySelector;1168}11691170@Override1171public WebSocket.Builder newWebSocketBuilder() {1172// Make sure to pass the HttpClientFacade to the WebSocket builder.1173// This will ensure that the facade is not released before the1174// WebSocket has been created, at which point the pendingOperationCount1175// will have been incremented by the RawChannelTube.1176// See RawChannelTube.1177return new BuilderImpl(this.facade(), proxySelector);1178}11791180@Override1181public Version version() {1182return version;1183}11841185String dbgString() {1186return dbgTag;1187}11881189@Override1190public String toString() {1191// Used by tests to get the client's id and compute the1192// name of the SelectorManager thread.1193return super.toString() + ("(" + id + ")");1194}11951196private void initFilters() {1197addFilter(AuthenticationFilter.class);1198addFilter(RedirectFilter.class);1199if (this.cookieHandler != null) {1200addFilter(CookieFilter.class);1201}1202}12031204private void addFilter(Class<? extends HeaderFilter> f) {1205filters.addFilter(f);1206}12071208final LinkedList<HeaderFilter> filterChain() {1209return filters.getFilterChain();1210}12111212// Timer controls.1213// Timers are implemented through timed Selector.select() calls.12141215synchronized void registerTimer(TimeoutEvent event) {1216Log.logTrace("Registering timer {0}", event);1217timeouts.add(event);1218selmgr.wakeupSelector();1219}12201221synchronized void cancelTimer(TimeoutEvent event) {1222Log.logTrace("Canceling timer {0}", event);1223timeouts.remove(event);1224}12251226/**1227* Purges ( handles ) timer events that have passed their deadline, and1228* returns the amount of time, in milliseconds, until the next earliest1229* event. A return value of 0 means that there are no events.1230*/1231private long purgeTimeoutsAndReturnNextDeadline() {1232long diff = 0L;1233List<TimeoutEvent> toHandle = null;1234int remaining = 0;1235// enter critical section to retrieve the timeout event to handle1236synchronized(this) {1237if (timeouts.isEmpty()) return 0L;12381239Instant now = Instant.now();1240Iterator<TimeoutEvent> itr = timeouts.iterator();1241while (itr.hasNext()) {1242TimeoutEvent event = itr.next();1243diff = now.until(event.deadline(), ChronoUnit.MILLIS);1244if (diff <= 0) {1245itr.remove();1246toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;1247toHandle.add(event);1248} else {1249break;1250}1251}1252remaining = timeouts.size();1253}12541255// can be useful for debugging1256if (toHandle != null && Log.trace()) {1257Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "1258+ toHandle.size() + " events, "1259+ "remaining " + remaining1260+ ", next deadline: " + (diff < 0 ? 0L : diff));1261}12621263// handle timeout events out of critical section1264if (toHandle != null) {1265Throwable failed = null;1266for (TimeoutEvent event : toHandle) {1267try {1268Log.logTrace("Firing timer {0}", event);1269event.handle();1270} catch (Error | RuntimeException e) {1271// Not expected. Handle remaining events then throw...1272// If e is an OOME or SOE it might simply trigger a new1273// error from here - but in this case there's not much we1274// could do anyway. Just let it flow...1275if (failed == null) failed = e;1276else failed.addSuppressed(e);1277Log.logTrace("Failed to handle event {0}: {1}", event, e);1278}1279}1280if (failed instanceof Error) throw (Error) failed;1281if (failed instanceof RuntimeException) throw (RuntimeException) failed;1282}12831284// return time to wait until next event. 0L if there's no more events.1285return diff < 0 ? 0L : diff;1286}12871288// used for the connection window1289int getReceiveBufferSize() {1290return Utils.getIntegerNetProperty(1291"jdk.httpclient.receiveBufferSize",12920 // only set the size if > 01293);1294}12951296// used for testing1297int getSendBufferSize() {1298return Utils.getIntegerNetProperty(1299"jdk.httpclient.sendBufferSize",13000 // only set the size if > 01301);1302}13031304// Optimization for reading SSL encrypted data1305// --------------------------------------------13061307// Returns a BufferSupplier that can be used for reading1308// encrypted bytes of the channel. These buffers can then1309// be recycled by the SSLFlowDelegate::Reader after their1310// content has been copied in the SSLFlowDelegate::Reader1311// readBuf.1312// Because allocating, reading, copying, and recycling1313// all happen in the SelectorManager thread,1314// then this BufferSupplier can be shared between all1315// the SSL connections managed by this client.1316BufferSupplier getSSLBufferSupplier() {1317return sslBufferSupplier;1318}13191320// An implementation of BufferSupplier that manages a pool of1321// maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that1322// are used for reading encrypted bytes off the channel before1323// copying and subsequent unwrapping.1324private static final class SSLDirectBufferSupplier implements BufferSupplier {1325private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;1326private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];1327private final HttpClientImpl client;1328private final Logger debug;1329private int tail, count; // no need for volatile: only accessed in SM thread.13301331SSLDirectBufferSupplier(HttpClientImpl client) {1332this.client = Objects.requireNonNull(client);1333this.debug = client.debug;1334}13351336// Gets a buffer from the pool, or allocates a new one if needed.1337@Override1338public ByteBuffer get() {1339assert client.isSelectorThread();1340assert tail <= POOL_SIZE : "allocate tail is " + tail;1341ByteBuffer buf;1342if (tail == 0) {1343if (debug.on()) {1344// should not appear more than SocketTube.MAX_BUFFERS1345debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);1346}1347assert count++ < POOL_SIZE : "trying to allocate more than "1348+ POOL_SIZE + " buffers";1349buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);1350} else {1351assert tail > 0 : "non positive tail value: " + tail;1352tail--;1353buf = pool[tail];1354pool[tail] = null;1355}1356assert buf.isDirect();1357assert buf.position() == 0;1358assert buf.hasRemaining();1359assert buf.limit() == Utils.BUFSIZE;1360assert tail < POOL_SIZE;1361assert tail >= 0;1362return buf;1363}13641365// Returns the given buffer to the pool.1366@Override1367public void recycle(ByteBuffer buffer) {1368assert client.isSelectorThread();1369assert buffer.isDirect();1370assert !buffer.hasRemaining();1371assert tail < POOL_SIZE : "recycle tail is " + tail;1372assert tail >= 0;1373buffer.position(0);1374buffer.limit(buffer.capacity());1375// don't fail if assertions are off. we have asserted above.1376if (tail < POOL_SIZE) {1377pool[tail] = buffer;1378tail++;1379}1380assert tail <= POOL_SIZE;1381assert tail > 0;1382}1383}13841385}138613871388