Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
41171 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import javax.net.ssl.SSLContext;
29
import javax.net.ssl.SSLException;
30
import javax.net.ssl.SSLHandshakeException;
31
import javax.net.ssl.SSLParameters;
32
import java.io.IOException;
33
import java.io.UncheckedIOException;
34
import java.lang.ref.Reference;
35
import java.lang.ref.WeakReference;
36
import java.net.Authenticator;
37
import java.net.ConnectException;
38
import java.net.CookieHandler;
39
import java.net.ProxySelector;
40
import java.net.http.HttpConnectTimeoutException;
41
import java.net.http.HttpTimeoutException;
42
import java.nio.ByteBuffer;
43
import java.nio.channels.CancelledKeyException;
44
import java.nio.channels.ClosedChannelException;
45
import java.nio.channels.SelectableChannel;
46
import java.nio.channels.SelectionKey;
47
import java.nio.channels.Selector;
48
import java.nio.channels.SocketChannel;
49
import java.security.AccessControlContext;
50
import java.security.AccessController;
51
import java.security.NoSuchAlgorithmException;
52
import java.security.PrivilegedAction;
53
import java.time.Duration;
54
import java.time.Instant;
55
import java.time.temporal.ChronoUnit;
56
import java.util.ArrayList;
57
import java.util.HashSet;
58
import java.util.Iterator;
59
import java.util.LinkedList;
60
import java.util.List;
61
import java.util.Objects;
62
import java.util.Optional;
63
import java.util.Set;
64
import java.util.TreeSet;
65
import java.util.concurrent.CompletableFuture;
66
import java.util.concurrent.CompletionException;
67
import java.util.concurrent.ExecutionException;
68
import java.util.concurrent.Executor;
69
import java.util.concurrent.ExecutorService;
70
import java.util.concurrent.Executors;
71
import java.util.concurrent.ThreadFactory;
72
import java.util.concurrent.atomic.AtomicInteger;
73
import java.util.concurrent.atomic.AtomicLong;
74
import java.util.function.BooleanSupplier;
75
import java.util.stream.Stream;
76
import java.net.http.HttpClient;
77
import java.net.http.HttpRequest;
78
import java.net.http.HttpResponse;
79
import java.net.http.HttpResponse.BodyHandler;
80
import java.net.http.HttpResponse.PushPromiseHandler;
81
import java.net.http.WebSocket;
82
import jdk.internal.net.http.common.BufferSupplier;
83
import jdk.internal.net.http.common.Log;
84
import jdk.internal.net.http.common.Logger;
85
import jdk.internal.net.http.common.Pair;
86
import jdk.internal.net.http.common.Utils;
87
import jdk.internal.net.http.common.OperationTrackers.Trackable;
88
import jdk.internal.net.http.common.OperationTrackers.Tracker;
89
import jdk.internal.net.http.websocket.BuilderImpl;
90
import jdk.internal.misc.InnocuousThread;
91
92
/**
93
* Client implementation. Contains all configuration information and also
94
* the selector manager thread which allows async events to be registered
95
* and delivered when they occur. See AsyncEvent.
96
*/
97
final class HttpClientImpl extends HttpClient implements Trackable {
98
99
static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag
100
static final boolean DEBUGTIMEOUT = false; // dev flag
101
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
102
final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
103
final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
104
static final AtomicLong CLIENT_IDS = new AtomicLong();
105
106
// Define the default factory as a static inner class
107
// that embeds all the necessary logic to avoid
108
// the risk of using a lambda that might keep a reference on the
109
// HttpClient instance from which it was created (helps with
110
// heapdump analysis).
111
private static final class DefaultThreadFactory implements ThreadFactory {
112
private final String namePrefix;
113
private final AtomicInteger nextId = new AtomicInteger();
114
115
DefaultThreadFactory(long clientID) {
116
namePrefix = "HttpClient-" + clientID + "-Worker-";
117
}
118
119
@SuppressWarnings("removal")
120
@Override
121
public Thread newThread(Runnable r) {
122
String name = namePrefix + nextId.getAndIncrement();
123
Thread t;
124
if (System.getSecurityManager() == null) {
125
t = new Thread(null, r, name, 0, false);
126
} else {
127
t = InnocuousThread.newThread(name, r);
128
}
129
t.setDaemon(true);
130
return t;
131
}
132
}
133
134
/**
135
* A DelegatingExecutor is an executor that delegates tasks to
136
* a wrapped executor when it detects that the current thread
137
* is the SelectorManager thread. If the current thread is not
138
* the selector manager thread the given task is executed inline.
139
*/
140
final static class DelegatingExecutor implements Executor {
141
private final BooleanSupplier isInSelectorThread;
142
private final Executor delegate;
143
DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
144
this.isInSelectorThread = isInSelectorThread;
145
this.delegate = delegate;
146
}
147
148
Executor delegate() {
149
return delegate;
150
}
151
152
@Override
153
public void execute(Runnable command) {
154
if (isInSelectorThread.getAsBoolean()) {
155
delegate.execute(command);
156
} else {
157
command.run();
158
}
159
}
160
161
@SuppressWarnings("removal")
162
private void shutdown() {
163
if (delegate instanceof ExecutorService service) {
164
PrivilegedAction<?> action = () -> {
165
service.shutdown();
166
return null;
167
};
168
AccessController.doPrivileged(action, null,
169
new RuntimePermission("modifyThread"));
170
}
171
}
172
173
}
174
175
private final CookieHandler cookieHandler;
176
private final Duration connectTimeout;
177
private final Redirect followRedirects;
178
private final ProxySelector userProxySelector;
179
private final ProxySelector proxySelector;
180
private final Authenticator authenticator;
181
private final Version version;
182
private final ConnectionPool connections;
183
private final DelegatingExecutor delegatingExecutor;
184
private final boolean isDefaultExecutor;
185
// Security parameters
186
private final SSLContext sslContext;
187
private final SSLParameters sslParams;
188
private final SelectorManager selmgr;
189
private final FilterFactory filters;
190
private final Http2ClientImpl client2;
191
private final long id;
192
private final String dbgTag;
193
194
// The SSL DirectBuffer Supplier provides the ability to recycle
195
// buffers used between the socket reader and the SSLEngine, or
196
// more precisely between the SocketTube publisher and the
197
// SSLFlowDelegate reader.
198
private final SSLDirectBufferSupplier sslBufferSupplier
199
= new SSLDirectBufferSupplier(this);
200
201
// This reference is used to keep track of the facade HttpClient
202
// that was returned to the application code.
203
// It makes it possible to know when the application no longer
204
// holds any reference to the HttpClient.
205
// Unfortunately, this information is not enough to know when
206
// to exit the SelectorManager thread. Because of the asynchronous
207
// nature of the API, we also need to wait until all pending operations
208
// have completed.
209
private final WeakReference<HttpClientFacade> facadeRef;
210
211
// This counter keeps track of the number of operations pending
212
// on the HttpClient. The SelectorManager thread will wait
213
// until there are no longer any pending operations and the
214
// facadeRef is cleared before exiting.
215
//
216
// The pendingOperationCount is incremented every time a send/sendAsync
217
// operation is invoked on the HttpClient, and is decremented when
218
// the HttpResponse<T> object is returned to the user.
219
// However, at this point, the body may not have been fully read yet.
220
// This is the case when the response T is implemented as a streaming
221
// subscriber (such as an InputStream).
222
//
223
// To take care of this issue the pendingOperationCount will additionally
224
// be incremented/decremented in the following cases:
225
//
226
// 1. For HTTP/2 it is incremented when a stream is added to the
227
// Http2Connection streams map, and decreased when the stream is removed
228
// from the map. This should also take care of push promises.
229
// 2. For WebSocket the count is increased when creating a
230
// DetachedConnectionChannel for the socket, and decreased
231
// when the channel is closed.
232
// In addition, the HttpClient facade is passed to the WebSocket builder,
233
// (instead of the client implementation delegate).
234
// 3. For HTTP/1.1 the count is incremented before starting to parse the body
235
// response, and decremented when the parser has reached the end of the
236
// response body flow.
237
//
238
// This should ensure that the selector manager thread remains alive until
239
// the response has been fully received or the web socket is closed.
240
private final AtomicLong pendingOperationCount = new AtomicLong();
241
private final AtomicLong pendingWebSocketCount = new AtomicLong();
242
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
243
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
244
245
/** A Set of, deadline first, ordered timeout events. */
246
private final TreeSet<TimeoutEvent> timeouts;
247
248
/**
249
* This is a bit tricky:
250
* 1. an HttpClientFacade has a final HttpClientImpl field.
251
* 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
252
* where the referent is the facade created for that instance.
253
* 3. We cannot just create the HttpClientFacade in the HttpClientImpl
254
* constructor, because it would be only weakly referenced and could
255
* be GC'ed before we can return it.
256
* The solution is to use an instance of SingleFacadeFactory which will
257
* allow the caller of new HttpClientImpl(...) to retrieve the facade
258
* after the HttpClientImpl has been created.
259
*/
260
private static final class SingleFacadeFactory {
261
HttpClientFacade facade;
262
HttpClientFacade createFacade(HttpClientImpl impl) {
263
assert facade == null;
264
return (facade = new HttpClientFacade(impl));
265
}
266
}
267
268
static HttpClientFacade create(HttpClientBuilderImpl builder) {
269
SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
270
HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
271
impl.start();
272
assert facadeFactory.facade != null;
273
assert impl.facadeRef.get() == facadeFactory.facade;
274
return facadeFactory.facade;
275
}
276
277
private HttpClientImpl(HttpClientBuilderImpl builder,
278
SingleFacadeFactory facadeFactory) {
279
id = CLIENT_IDS.incrementAndGet();
280
dbgTag = "HttpClientImpl(" + id +")";
281
if (builder.sslContext == null) {
282
try {
283
sslContext = SSLContext.getDefault();
284
} catch (NoSuchAlgorithmException ex) {
285
throw new UncheckedIOException(new IOException(ex));
286
}
287
} else {
288
sslContext = builder.sslContext;
289
}
290
Executor ex = builder.executor;
291
if (ex == null) {
292
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
293
isDefaultExecutor = true;
294
} else {
295
isDefaultExecutor = false;
296
}
297
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
298
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
299
client2 = new Http2ClientImpl(this);
300
cookieHandler = builder.cookieHandler;
301
connectTimeout = builder.connectTimeout;
302
followRedirects = builder.followRedirects == null ?
303
Redirect.NEVER : builder.followRedirects;
304
this.userProxySelector = builder.proxy;
305
this.proxySelector = Optional.ofNullable(userProxySelector)
306
.orElseGet(HttpClientImpl::getDefaultProxySelector);
307
if (debug.on())
308
debug.log("proxySelector is %s (user-supplied=%s)",
309
this.proxySelector, userProxySelector != null);
310
authenticator = builder.authenticator;
311
if (builder.version == null) {
312
version = HttpClient.Version.HTTP_2;
313
} else {
314
version = builder.version;
315
}
316
if (builder.sslParams == null) {
317
sslParams = getDefaultParams(sslContext);
318
} else {
319
sslParams = builder.sslParams;
320
}
321
connections = new ConnectionPool(id);
322
connections.start();
323
timeouts = new TreeSet<>();
324
try {
325
selmgr = new SelectorManager(this);
326
} catch (IOException e) {
327
// unlikely
328
throw new UncheckedIOException(e);
329
}
330
selmgr.setDaemon(true);
331
filters = new FilterFactory();
332
initFilters();
333
assert facadeRef.get() != null;
334
}
335
336
private void start() {
337
selmgr.start();
338
}
339
340
// Called from the SelectorManager thread, just before exiting.
341
// Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
342
// that may be still lingering there are properly closed (and their
343
// possibly still opened SocketChannel released).
344
private void stop() {
345
// Clears HTTP/1.1 cache and close its connections
346
connections.stop();
347
// Clears HTTP/2 cache and close its connections.
348
client2.stop();
349
// shutdown the executor if needed
350
if (isDefaultExecutor) delegatingExecutor.shutdown();
351
}
352
353
private static SSLParameters getDefaultParams(SSLContext ctx) {
354
SSLParameters params = ctx.getDefaultSSLParameters();
355
return params;
356
}
357
358
@SuppressWarnings("removal")
359
private static ProxySelector getDefaultProxySelector() {
360
PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
361
return AccessController.doPrivileged(action);
362
}
363
364
// Returns the facade that was returned to the application code.
365
// May be null if that facade is no longer referenced.
366
final HttpClientFacade facade() {
367
return facadeRef.get();
368
}
369
370
// Increments the pendingOperationCount.
371
final long reference() {
372
pendingHttpRequestCount.incrementAndGet();
373
return pendingOperationCount.incrementAndGet();
374
}
375
376
// Decrements the pendingOperationCount.
377
final long unreference() {
378
final long count = pendingOperationCount.decrementAndGet();
379
final long httpCount = pendingHttpRequestCount.decrementAndGet();
380
final long http2Count = pendingHttp2StreamCount.get();
381
final long webSocketCount = pendingWebSocketCount.get();
382
if (count == 0 && facade() == null) {
383
selmgr.wakeupSelector();
384
}
385
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
386
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
387
assert webSocketCount >= 0 : "count of WS operations < 0";
388
assert count >= 0 : "count of pending operations < 0";
389
return count;
390
}
391
392
// Increments the pendingOperationCount.
393
final long streamReference() {
394
pendingHttp2StreamCount.incrementAndGet();
395
return pendingOperationCount.incrementAndGet();
396
}
397
398
// Decrements the pendingOperationCount.
399
final long streamUnreference() {
400
final long count = pendingOperationCount.decrementAndGet();
401
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
402
final long httpCount = pendingHttpRequestCount.get();
403
final long webSocketCount = pendingWebSocketCount.get();
404
if (count == 0 && facade() == null) {
405
selmgr.wakeupSelector();
406
}
407
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
408
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
409
assert webSocketCount >= 0 : "count of WS operations < 0";
410
assert count >= 0 : "count of pending operations < 0";
411
return count;
412
}
413
414
// Increments the pendingOperationCount.
415
final long webSocketOpen() {
416
pendingWebSocketCount.incrementAndGet();
417
return pendingOperationCount.incrementAndGet();
418
}
419
420
// Decrements the pendingOperationCount.
421
final long webSocketClose() {
422
final long count = pendingOperationCount.decrementAndGet();
423
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
424
final long httpCount = pendingHttpRequestCount.get();
425
final long http2Count = pendingHttp2StreamCount.get();
426
if (count == 0 && facade() == null) {
427
selmgr.wakeupSelector();
428
}
429
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
430
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
431
assert webSocketCount >= 0 : "count of WS operations < 0";
432
assert count >= 0 : "count of pending operations < 0";
433
return count;
434
}
435
436
// Returns the pendingOperationCount.
437
final long referenceCount() {
438
return pendingOperationCount.get();
439
}
440
441
final static class HttpClientTracker implements Tracker {
442
final AtomicLong httpCount;
443
final AtomicLong http2Count;
444
final AtomicLong websocketCount;
445
final AtomicLong operationsCount;
446
final Reference<?> reference;
447
final String name;
448
HttpClientTracker(AtomicLong http,
449
AtomicLong http2,
450
AtomicLong ws,
451
AtomicLong ops,
452
Reference<?> ref,
453
String name) {
454
this.httpCount = http;
455
this.http2Count = http2;
456
this.websocketCount = ws;
457
this.operationsCount = ops;
458
this.reference = ref;
459
this.name = name;
460
}
461
@Override
462
public long getOutstandingOperations() {
463
return operationsCount.get();
464
}
465
@Override
466
public long getOutstandingHttpOperations() {
467
return httpCount.get();
468
}
469
@Override
470
public long getOutstandingHttp2Streams() { return http2Count.get(); }
471
@Override
472
public long getOutstandingWebSocketOperations() {
473
return websocketCount.get();
474
}
475
@Override
476
public boolean isFacadeReferenced() {
477
return reference.get() != null;
478
}
479
@Override
480
public String getName() {
481
return name;
482
}
483
}
484
485
public Tracker getOperationsTracker() {
486
return new HttpClientTracker(pendingHttpRequestCount,
487
pendingHttp2StreamCount,
488
pendingWebSocketCount,
489
pendingOperationCount,
490
facadeRef,
491
dbgTag);
492
}
493
494
// Called by the SelectorManager thread to figure out whether it's time
495
// to terminate.
496
final boolean isReferenced() {
497
HttpClient facade = facade();
498
return facade != null || referenceCount() > 0;
499
}
500
501
/**
502
* Wait for activity on given exchange.
503
* The following occurs in the SelectorManager thread.
504
*
505
* 1) add to selector
506
* 2) If selector fires for this exchange then
507
* call AsyncEvent.handle()
508
*
509
* If exchange needs to change interest ops, then call registerEvent() again.
510
*/
511
void registerEvent(AsyncEvent exchange) throws IOException {
512
selmgr.register(exchange);
513
}
514
515
/**
516
* Allows an AsyncEvent to modify its interestOps.
517
* @param event The modified event.
518
*/
519
void eventUpdated(AsyncEvent event) throws ClosedChannelException {
520
assert !(event instanceof AsyncTriggerEvent);
521
selmgr.eventUpdated(event);
522
}
523
524
boolean isSelectorThread() {
525
return Thread.currentThread() == selmgr;
526
}
527
528
Http2ClientImpl client2() {
529
return client2;
530
}
531
532
private void debugCompleted(String tag, long startNanos, HttpRequest req) {
533
if (debugelapsed.on()) {
534
debugelapsed.log(tag + " elapsed "
535
+ (System.nanoTime() - startNanos)/1000_000L
536
+ " millis for " + req.method()
537
+ " to " + req.uri());
538
}
539
}
540
541
@Override
542
public <T> HttpResponse<T>
543
send(HttpRequest req, BodyHandler<T> responseHandler)
544
throws IOException, InterruptedException
545
{
546
CompletableFuture<HttpResponse<T>> cf = null;
547
548
// if the thread is already interrupted no need to go further.
549
// cf.get() would throw anyway.
550
if (Thread.interrupted()) throw new InterruptedException();
551
try {
552
cf = sendAsync(req, responseHandler, null, null);
553
return cf.get();
554
} catch (InterruptedException ie) {
555
if (cf != null )
556
cf.cancel(true);
557
throw ie;
558
} catch (ExecutionException e) {
559
final Throwable throwable = e.getCause();
560
final String msg = throwable.getMessage();
561
562
if (throwable instanceof IllegalArgumentException) {
563
throw new IllegalArgumentException(msg, throwable);
564
} else if (throwable instanceof SecurityException) {
565
throw new SecurityException(msg, throwable);
566
} else if (throwable instanceof HttpConnectTimeoutException) {
567
HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
568
hcte.initCause(throwable);
569
throw hcte;
570
} else if (throwable instanceof HttpTimeoutException) {
571
throw new HttpTimeoutException(msg);
572
} else if (throwable instanceof ConnectException) {
573
ConnectException ce = new ConnectException(msg);
574
ce.initCause(throwable);
575
throw ce;
576
} else if (throwable instanceof SSLHandshakeException) {
577
// special case for SSLHandshakeException
578
SSLHandshakeException he = new SSLHandshakeException(msg);
579
he.initCause(throwable);
580
throw he;
581
} else if (throwable instanceof SSLException) {
582
// any other SSLException is wrapped in a plain
583
// SSLException
584
throw new SSLException(msg, throwable);
585
} else if (throwable instanceof IOException) {
586
throw new IOException(msg, throwable);
587
} else {
588
throw new IOException(msg, throwable);
589
}
590
}
591
}
592
593
private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
594
595
@Override
596
public <T> CompletableFuture<HttpResponse<T>>
597
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
598
{
599
return sendAsync(userRequest, responseHandler, null);
600
}
601
602
@Override
603
public <T> CompletableFuture<HttpResponse<T>>
604
sendAsync(HttpRequest userRequest,
605
BodyHandler<T> responseHandler,
606
PushPromiseHandler<T> pushPromiseHandler) {
607
return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
608
}
609
610
@SuppressWarnings("removal")
611
private <T> CompletableFuture<HttpResponse<T>>
612
sendAsync(HttpRequest userRequest,
613
BodyHandler<T> responseHandler,
614
PushPromiseHandler<T> pushPromiseHandler,
615
Executor exchangeExecutor) {
616
617
Objects.requireNonNull(userRequest);
618
Objects.requireNonNull(responseHandler);
619
620
AccessControlContext acc = null;
621
if (System.getSecurityManager() != null)
622
acc = AccessController.getContext();
623
624
// Clone the, possibly untrusted, HttpRequest
625
HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
626
if (requestImpl.method().equals("CONNECT"))
627
throw new IllegalArgumentException("Unsupported method CONNECT");
628
629
long start = DEBUGELAPSED ? System.nanoTime() : 0;
630
reference();
631
try {
632
if (debugelapsed.on())
633
debugelapsed.log("ClientImpl (async) send %s", userRequest);
634
635
// When using sendAsync(...) we explicitly pass the
636
// executor's delegate as exchange executor to force
637
// asynchronous scheduling of the exchange.
638
// When using send(...) we don't specify any executor
639
// and default to using the client's delegating executor
640
// which only spawns asynchronous tasks if it detects
641
// that the current thread is the selector manager
642
// thread. This will cause everything to execute inline
643
// until we need to schedule some event with the selector.
644
Executor executor = exchangeExecutor == null
645
? this.delegatingExecutor : exchangeExecutor;
646
647
MultiExchange<T> mex = new MultiExchange<>(userRequest,
648
requestImpl,
649
this,
650
responseHandler,
651
pushPromiseHandler,
652
acc);
653
CompletableFuture<HttpResponse<T>> res =
654
mex.responseAsync(executor).whenComplete((b,t) -> unreference());
655
if (DEBUGELAPSED) {
656
res = res.whenComplete(
657
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
658
}
659
660
// makes sure that any dependent actions happen in the CF default
661
// executor. This is only needed for sendAsync(...), when
662
// exchangeExecutor is non-null.
663
if (exchangeExecutor != null) {
664
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
665
}
666
return res;
667
} catch(Throwable t) {
668
unreference();
669
debugCompleted("ClientImpl (async)", start, userRequest);
670
throw t;
671
}
672
}
673
674
// Main loop for this client's selector
675
private final static class SelectorManager extends Thread {
676
677
// For testing purposes we have an internal System property that
678
// can control the frequency at which the selector manager will wake
679
// up when there are no pending operations.
680
// Increasing the frequency (shorter delays) might allow the selector
681
// to observe that the facade is no longer referenced and might allow
682
// the selector thread to terminate more timely - for when nothing is
683
// ongoing it will only check for that condition every NODEADLINE ms.
684
// To avoid misuse of the property, the delay that can be specified
685
// is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
686
// value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
687
// The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
688
private static final int MIN_NODEADLINE = 1000; // ms
689
private static final int MAX_NODEADLINE = 1000 * 1200; // ms
690
private static final int DEF_NODEADLINE = 3000; // ms
691
private static final long NODEADLINE; // default is DEF_NODEADLINE ms
692
static {
693
// ensure NODEADLINE is initialized with some valid value.
694
long deadline = Utils.getIntegerProperty(
695
"jdk.internal.httpclient.selectorTimeout",
696
DEF_NODEADLINE); // millis
697
if (deadline <= 0) deadline = DEF_NODEADLINE;
698
deadline = Math.max(deadline, MIN_NODEADLINE);
699
NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
700
}
701
702
private final Selector selector;
703
private volatile boolean closed;
704
private final List<AsyncEvent> registrations;
705
private final List<AsyncTriggerEvent> deregistrations;
706
private final Logger debug;
707
private final Logger debugtimeout;
708
HttpClientImpl owner;
709
ConnectionPool pool;
710
711
SelectorManager(HttpClientImpl ref) throws IOException {
712
super(null, null,
713
"HttpClient-" + ref.id + "-SelectorManager",
714
0, false);
715
owner = ref;
716
debug = ref.debug;
717
debugtimeout = ref.debugtimeout;
718
pool = ref.connectionPool();
719
registrations = new ArrayList<>();
720
deregistrations = new ArrayList<>();
721
selector = Selector.open();
722
}
723
724
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
725
if (Thread.currentThread() == this) {
726
SelectionKey key = e.channel().keyFor(selector);
727
if (key != null && key.isValid()) {
728
SelectorAttachment sa = (SelectorAttachment) key.attachment();
729
sa.register(e);
730
} else if (e.interestOps() != 0){
731
// We don't care about paused events.
732
// These are actually handled by
733
// SelectorAttachment::resetInterestOps later on.
734
// But if we reach here when trying to resume an
735
// event then it's better to fail fast.
736
if (debug.on()) debug.log("No key for channel");
737
e.abort(new IOException("No key for channel"));
738
}
739
} else {
740
register(e);
741
}
742
}
743
744
// This returns immediately. So caller not allowed to send/receive
745
// on connection.
746
synchronized void register(AsyncEvent e) {
747
registrations.add(e);
748
selector.wakeup();
749
}
750
751
synchronized void cancel(SocketChannel e) {
752
SelectionKey key = e.keyFor(selector);
753
if (key != null) {
754
key.cancel();
755
}
756
selector.wakeup();
757
}
758
759
void wakeupSelector() {
760
selector.wakeup();
761
}
762
763
synchronized void shutdown() {
764
Log.logTrace("{0}: shutting down", getName());
765
if (debug.on()) debug.log("SelectorManager shutting down");
766
closed = true;
767
try {
768
selector.close();
769
} catch (IOException ignored) {
770
} finally {
771
owner.stop();
772
}
773
}
774
775
@Override
776
public void run() {
777
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
778
List<AsyncEvent> readyList = new ArrayList<>();
779
List<Runnable> resetList = new ArrayList<>();
780
try {
781
if (Log.channel()) Log.logChannel(getName() + ": starting");
782
while (!Thread.currentThread().isInterrupted()) {
783
synchronized (this) {
784
assert errorList.isEmpty();
785
assert readyList.isEmpty();
786
assert resetList.isEmpty();
787
for (AsyncTriggerEvent event : deregistrations) {
788
event.handle();
789
}
790
deregistrations.clear();
791
for (AsyncEvent event : registrations) {
792
if (event instanceof AsyncTriggerEvent) {
793
readyList.add(event);
794
continue;
795
}
796
SelectableChannel chan = event.channel();
797
SelectionKey key = null;
798
try {
799
key = chan.keyFor(selector);
800
SelectorAttachment sa;
801
if (key == null || !key.isValid()) {
802
if (key != null) {
803
// key is canceled.
804
// invoke selectNow() to purge it
805
// before registering the new event.
806
selector.selectNow();
807
}
808
sa = new SelectorAttachment(chan, selector);
809
} else {
810
sa = (SelectorAttachment) key.attachment();
811
}
812
// may throw IOE if channel closed: that's OK
813
sa.register(event);
814
if (!chan.isOpen()) {
815
throw new IOException("Channel closed");
816
}
817
} catch (IOException e) {
818
Log.logTrace("{0}: {1}", getName(), e);
819
if (debug.on())
820
debug.log("Got " + e.getClass().getName()
821
+ " while handling registration events");
822
chan.close();
823
// let the event abort deal with it
824
errorList.add(new Pair<>(event, e));
825
if (key != null) {
826
key.cancel();
827
selector.selectNow();
828
}
829
}
830
}
831
registrations.clear();
832
selector.selectedKeys().clear();
833
}
834
835
for (AsyncEvent event : readyList) {
836
assert event instanceof AsyncTriggerEvent;
837
event.handle();
838
}
839
readyList.clear();
840
841
for (Pair<AsyncEvent,IOException> error : errorList) {
842
// an IOException was raised and the channel closed.
843
handleEvent(error.first, error.second);
844
}
845
errorList.clear();
846
847
// Check whether client is still alive, and if not,
848
// gracefully stop this thread
849
if (!owner.isReferenced()) {
850
Log.logTrace("{0}: {1}",
851
getName(),
852
"HttpClient no longer referenced. Exiting...");
853
return;
854
}
855
856
// Timeouts will have milliseconds granularity. It is important
857
// to handle them in a timely fashion.
858
long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
859
if (debugtimeout.on())
860
debugtimeout.log("next timeout: %d", nextTimeout);
861
862
// Keep-alive have seconds granularity. It's not really an
863
// issue if we keep connections linger a bit more in the keep
864
// alive cache.
865
long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
866
if (debugtimeout.on())
867
debugtimeout.log("next expired: %d", nextExpiry);
868
869
assert nextTimeout >= 0;
870
assert nextExpiry >= 0;
871
872
// Don't wait for ever as it might prevent the thread to
873
// stop gracefully. millis will be 0 if no deadline was found.
874
if (nextTimeout <= 0) nextTimeout = NODEADLINE;
875
876
// Clip nextExpiry at NODEADLINE limit. The default
877
// keep alive is 1200 seconds (half an hour) - we don't
878
// want to wait that long.
879
if (nextExpiry <= 0) nextExpiry = NODEADLINE;
880
else nextExpiry = Math.min(NODEADLINE, nextExpiry);
881
882
// takes the least of the two.
883
long millis = Math.min(nextExpiry, nextTimeout);
884
885
if (debugtimeout.on())
886
debugtimeout.log("Next deadline is %d",
887
(millis == 0 ? NODEADLINE : millis));
888
//debugPrint(selector);
889
int n = selector.select(millis == 0 ? NODEADLINE : millis);
890
if (n == 0) {
891
// Check whether client is still alive, and if not,
892
// gracefully stop this thread
893
if (!owner.isReferenced()) {
894
Log.logTrace("{0}: {1}",
895
getName(),
896
"HttpClient no longer referenced. Exiting...");
897
return;
898
}
899
owner.purgeTimeoutsAndReturnNextDeadline();
900
continue;
901
}
902
903
Set<SelectionKey> keys = selector.selectedKeys();
904
assert errorList.isEmpty();
905
906
for (SelectionKey key : keys) {
907
SelectorAttachment sa = (SelectorAttachment) key.attachment();
908
if (!key.isValid()) {
909
IOException ex = sa.chan.isOpen()
910
? new IOException("Invalid key")
911
: new ClosedChannelException();
912
sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
913
sa.pending.clear();
914
continue;
915
}
916
917
int eventsOccurred;
918
try {
919
eventsOccurred = key.readyOps();
920
} catch (CancelledKeyException ex) {
921
IOException io = Utils.getIOException(ex);
922
sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
923
sa.pending.clear();
924
continue;
925
}
926
sa.events(eventsOccurred).forEach(readyList::add);
927
resetList.add(() -> sa.resetInterestOps(eventsOccurred));
928
}
929
930
selector.selectNow(); // complete cancellation
931
selector.selectedKeys().clear();
932
933
// handle selected events
934
readyList.forEach((e) -> handleEvent(e, null));
935
readyList.clear();
936
937
// handle errors (closed channels etc...)
938
errorList.forEach((p) -> handleEvent(p.first, p.second));
939
errorList.clear();
940
941
// reset interest ops for selected channels
942
resetList.forEach(r -> r.run());
943
resetList.clear();
944
945
}
946
} catch (Throwable e) {
947
if (!closed) {
948
// This terminates thread. So, better just print stack trace
949
String err = Utils.stackTrace(e);
950
Log.logError("{0}: {1}: {2}", getName(),
951
"HttpClientImpl shutting down due to fatal error", err);
952
}
953
if (debug.on()) debug.log("shutting down", e);
954
if (Utils.ASSERTIONSENABLED && !debug.on()) {
955
e.printStackTrace(System.err); // always print the stack
956
}
957
} finally {
958
if (Log.channel()) Log.logChannel(getName() + ": stopping");
959
shutdown();
960
}
961
}
962
963
// void debugPrint(Selector selector) {
964
// System.err.println("Selector: debugprint start");
965
// Set<SelectionKey> keys = selector.keys();
966
// for (SelectionKey key : keys) {
967
// SelectableChannel c = key.channel();
968
// int ops = key.interestOps();
969
// System.err.printf("selector chan:%s ops:%d\n", c, ops);
970
// }
971
// System.err.println("Selector: debugprint end");
972
// }
973
974
/** Handles the given event. The given ioe may be null. */
975
void handleEvent(AsyncEvent event, IOException ioe) {
976
if (closed || ioe != null) {
977
event.abort(ioe);
978
} else {
979
event.handle();
980
}
981
}
982
}
983
984
final String debugInterestOps(SelectableChannel channel) {
985
try {
986
SelectionKey key = channel.keyFor(selmgr.selector);
987
if (key == null) return "channel not registered with selector";
988
String keyInterestOps = key.isValid()
989
? "key.interestOps=" + key.interestOps() : "invalid key";
990
return String.format("channel registered with selector, %s, sa.interestOps=%s",
991
keyInterestOps,
992
((SelectorAttachment)key.attachment()).interestOps);
993
} catch (Throwable t) {
994
return String.valueOf(t);
995
}
996
}
997
998
/**
999
* Tracks multiple user level registrations associated with one NIO
1000
* registration (SelectionKey). In this implementation, registrations
1001
* are one-off and when an event is posted the registration is cancelled
1002
* until explicitly registered again.
1003
*
1004
* <p> No external synchronization required as this class is only used
1005
* by the SelectorManager thread. One of these objects required per
1006
* connection.
1007
*/
1008
private static class SelectorAttachment {
1009
private final SelectableChannel chan;
1010
private final Selector selector;
1011
private final Set<AsyncEvent> pending;
1012
private final static Logger debug =
1013
Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
1014
private int interestOps;
1015
1016
SelectorAttachment(SelectableChannel chan, Selector selector) {
1017
this.pending = new HashSet<>();
1018
this.chan = chan;
1019
this.selector = selector;
1020
}
1021
1022
void register(AsyncEvent e) throws ClosedChannelException {
1023
int newOps = e.interestOps();
1024
// re register interest if we are not already interested
1025
// in the event. If the event is paused, then the pause will
1026
// be taken into account later when resetInterestOps is called.
1027
boolean reRegister = (interestOps & newOps) != newOps;
1028
interestOps |= newOps;
1029
pending.add(e);
1030
if (debug.on())
1031
debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
1032
if (reRegister) {
1033
// first time registration happens here also
1034
try {
1035
chan.register(selector, interestOps, this);
1036
} catch (Throwable x) {
1037
abortPending(x);
1038
}
1039
} else if (!chan.isOpen()) {
1040
abortPending(new ClosedChannelException());
1041
}
1042
}
1043
1044
/**
1045
* Returns a Stream<AsyncEvents> containing only events that are
1046
* registered with the given {@code interestOps}.
1047
*/
1048
Stream<AsyncEvent> events(int interestOps) {
1049
return pending.stream()
1050
.filter(ev -> (ev.interestOps() & interestOps) != 0);
1051
}
1052
1053
/**
1054
* Removes any events with the given {@code interestOps}, and if no
1055
* events remaining, cancels the associated SelectionKey.
1056
*/
1057
void resetInterestOps(int interestOps) {
1058
int newOps = 0;
1059
1060
Iterator<AsyncEvent> itr = pending.iterator();
1061
while (itr.hasNext()) {
1062
AsyncEvent event = itr.next();
1063
int evops = event.interestOps();
1064
if (event.repeating()) {
1065
newOps |= evops;
1066
continue;
1067
}
1068
if ((evops & interestOps) != 0) {
1069
itr.remove();
1070
} else {
1071
newOps |= evops;
1072
}
1073
}
1074
1075
this.interestOps = newOps;
1076
SelectionKey key = chan.keyFor(selector);
1077
if (newOps == 0 && key != null && pending.isEmpty()) {
1078
key.cancel();
1079
} else {
1080
try {
1081
if (key == null || !key.isValid()) {
1082
throw new CancelledKeyException();
1083
}
1084
key.interestOps(newOps);
1085
// double check after
1086
if (!chan.isOpen()) {
1087
abortPending(new ClosedChannelException());
1088
return;
1089
}
1090
assert key.interestOps() == newOps;
1091
} catch (CancelledKeyException x) {
1092
// channel may have been closed
1093
if (debug.on()) debug.log("key cancelled for " + chan);
1094
abortPending(x);
1095
}
1096
}
1097
}
1098
1099
void abortPending(Throwable x) {
1100
if (!pending.isEmpty()) {
1101
AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
1102
pending.clear();
1103
IOException io = Utils.getIOException(x);
1104
for (AsyncEvent event : evts) {
1105
event.abort(io);
1106
}
1107
}
1108
}
1109
}
1110
1111
/*package-private*/ SSLContext theSSLContext() {
1112
return sslContext;
1113
}
1114
1115
@Override
1116
public SSLContext sslContext() {
1117
return sslContext;
1118
}
1119
1120
@Override
1121
public SSLParameters sslParameters() {
1122
return Utils.copySSLParameters(sslParams);
1123
}
1124
1125
@Override
1126
public Optional<Authenticator> authenticator() {
1127
return Optional.ofNullable(authenticator);
1128
}
1129
1130
/*package-private*/ final DelegatingExecutor theExecutor() {
1131
return delegatingExecutor;
1132
}
1133
1134
@Override
1135
public final Optional<Executor> executor() {
1136
return isDefaultExecutor
1137
? Optional.empty()
1138
: Optional.of(delegatingExecutor.delegate());
1139
}
1140
1141
ConnectionPool connectionPool() {
1142
return connections;
1143
}
1144
1145
@Override
1146
public Redirect followRedirects() {
1147
return followRedirects;
1148
}
1149
1150
1151
@Override
1152
public Optional<CookieHandler> cookieHandler() {
1153
return Optional.ofNullable(cookieHandler);
1154
}
1155
1156
@Override
1157
public Optional<Duration> connectTimeout() {
1158
return Optional.ofNullable(connectTimeout);
1159
}
1160
1161
@Override
1162
public Optional<ProxySelector> proxy() {
1163
return Optional.ofNullable(userProxySelector);
1164
}
1165
1166
// Return the effective proxy that this client uses.
1167
ProxySelector proxySelector() {
1168
return proxySelector;
1169
}
1170
1171
@Override
1172
public WebSocket.Builder newWebSocketBuilder() {
1173
// Make sure to pass the HttpClientFacade to the WebSocket builder.
1174
// This will ensure that the facade is not released before the
1175
// WebSocket has been created, at which point the pendingOperationCount
1176
// will have been incremented by the RawChannelTube.
1177
// See RawChannelTube.
1178
return new BuilderImpl(this.facade(), proxySelector);
1179
}
1180
1181
@Override
1182
public Version version() {
1183
return version;
1184
}
1185
1186
String dbgString() {
1187
return dbgTag;
1188
}
1189
1190
@Override
1191
public String toString() {
1192
// Used by tests to get the client's id and compute the
1193
// name of the SelectorManager thread.
1194
return super.toString() + ("(" + id + ")");
1195
}
1196
1197
private void initFilters() {
1198
addFilter(AuthenticationFilter.class);
1199
addFilter(RedirectFilter.class);
1200
if (this.cookieHandler != null) {
1201
addFilter(CookieFilter.class);
1202
}
1203
}
1204
1205
private void addFilter(Class<? extends HeaderFilter> f) {
1206
filters.addFilter(f);
1207
}
1208
1209
final LinkedList<HeaderFilter> filterChain() {
1210
return filters.getFilterChain();
1211
}
1212
1213
// Timer controls.
1214
// Timers are implemented through timed Selector.select() calls.
1215
1216
synchronized void registerTimer(TimeoutEvent event) {
1217
Log.logTrace("Registering timer {0}", event);
1218
timeouts.add(event);
1219
selmgr.wakeupSelector();
1220
}
1221
1222
synchronized void cancelTimer(TimeoutEvent event) {
1223
Log.logTrace("Canceling timer {0}", event);
1224
timeouts.remove(event);
1225
}
1226
1227
/**
1228
* Purges ( handles ) timer events that have passed their deadline, and
1229
* returns the amount of time, in milliseconds, until the next earliest
1230
* event. A return value of 0 means that there are no events.
1231
*/
1232
private long purgeTimeoutsAndReturnNextDeadline() {
1233
long diff = 0L;
1234
List<TimeoutEvent> toHandle = null;
1235
int remaining = 0;
1236
// enter critical section to retrieve the timeout event to handle
1237
synchronized(this) {
1238
if (timeouts.isEmpty()) return 0L;
1239
1240
Instant now = Instant.now();
1241
Iterator<TimeoutEvent> itr = timeouts.iterator();
1242
while (itr.hasNext()) {
1243
TimeoutEvent event = itr.next();
1244
diff = now.until(event.deadline(), ChronoUnit.MILLIS);
1245
if (diff <= 0) {
1246
itr.remove();
1247
toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
1248
toHandle.add(event);
1249
} else {
1250
break;
1251
}
1252
}
1253
remaining = timeouts.size();
1254
}
1255
1256
// can be useful for debugging
1257
if (toHandle != null && Log.trace()) {
1258
Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
1259
+ toHandle.size() + " events, "
1260
+ "remaining " + remaining
1261
+ ", next deadline: " + (diff < 0 ? 0L : diff));
1262
}
1263
1264
// handle timeout events out of critical section
1265
if (toHandle != null) {
1266
Throwable failed = null;
1267
for (TimeoutEvent event : toHandle) {
1268
try {
1269
Log.logTrace("Firing timer {0}", event);
1270
event.handle();
1271
} catch (Error | RuntimeException e) {
1272
// Not expected. Handle remaining events then throw...
1273
// If e is an OOME or SOE it might simply trigger a new
1274
// error from here - but in this case there's not much we
1275
// could do anyway. Just let it flow...
1276
if (failed == null) failed = e;
1277
else failed.addSuppressed(e);
1278
Log.logTrace("Failed to handle event {0}: {1}", event, e);
1279
}
1280
}
1281
if (failed instanceof Error) throw (Error) failed;
1282
if (failed instanceof RuntimeException) throw (RuntimeException) failed;
1283
}
1284
1285
// return time to wait until next event. 0L if there's no more events.
1286
return diff < 0 ? 0L : diff;
1287
}
1288
1289
// used for the connection window
1290
int getReceiveBufferSize() {
1291
return Utils.getIntegerNetProperty(
1292
"jdk.httpclient.receiveBufferSize",
1293
0 // only set the size if > 0
1294
);
1295
}
1296
1297
// used for testing
1298
int getSendBufferSize() {
1299
return Utils.getIntegerNetProperty(
1300
"jdk.httpclient.sendBufferSize",
1301
0 // only set the size if > 0
1302
);
1303
}
1304
1305
// Optimization for reading SSL encrypted data
1306
// --------------------------------------------
1307
1308
// Returns a BufferSupplier that can be used for reading
1309
// encrypted bytes of the channel. These buffers can then
1310
// be recycled by the SSLFlowDelegate::Reader after their
1311
// content has been copied in the SSLFlowDelegate::Reader
1312
// readBuf.
1313
// Because allocating, reading, copying, and recycling
1314
// all happen in the SelectorManager thread,
1315
// then this BufferSupplier can be shared between all
1316
// the SSL connections managed by this client.
1317
BufferSupplier getSSLBufferSupplier() {
1318
return sslBufferSupplier;
1319
}
1320
1321
// An implementation of BufferSupplier that manages a pool of
1322
// maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
1323
// are used for reading encrypted bytes off the channel before
1324
// copying and subsequent unwrapping.
1325
private static final class SSLDirectBufferSupplier implements BufferSupplier {
1326
private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
1327
private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
1328
private final HttpClientImpl client;
1329
private final Logger debug;
1330
private int tail, count; // no need for volatile: only accessed in SM thread.
1331
1332
SSLDirectBufferSupplier(HttpClientImpl client) {
1333
this.client = Objects.requireNonNull(client);
1334
this.debug = client.debug;
1335
}
1336
1337
// Gets a buffer from the pool, or allocates a new one if needed.
1338
@Override
1339
public ByteBuffer get() {
1340
assert client.isSelectorThread();
1341
assert tail <= POOL_SIZE : "allocate tail is " + tail;
1342
ByteBuffer buf;
1343
if (tail == 0) {
1344
if (debug.on()) {
1345
// should not appear more than SocketTube.MAX_BUFFERS
1346
debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
1347
}
1348
assert count++ < POOL_SIZE : "trying to allocate more than "
1349
+ POOL_SIZE + " buffers";
1350
buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
1351
} else {
1352
assert tail > 0 : "non positive tail value: " + tail;
1353
tail--;
1354
buf = pool[tail];
1355
pool[tail] = null;
1356
}
1357
assert buf.isDirect();
1358
assert buf.position() == 0;
1359
assert buf.hasRemaining();
1360
assert buf.limit() == Utils.BUFSIZE;
1361
assert tail < POOL_SIZE;
1362
assert tail >= 0;
1363
return buf;
1364
}
1365
1366
// Returns the given buffer to the pool.
1367
@Override
1368
public void recycle(ByteBuffer buffer) {
1369
assert client.isSelectorThread();
1370
assert buffer.isDirect();
1371
assert !buffer.hasRemaining();
1372
assert tail < POOL_SIZE : "recycle tail is " + tail;
1373
assert tail >= 0;
1374
buffer.position(0);
1375
buffer.limit(buffer.capacity());
1376
// don't fail if assertions are off. we have asserted above.
1377
if (tail < POOL_SIZE) {
1378
pool[tail] = buffer;
1379
tail++;
1380
}
1381
assert tail <= POOL_SIZE;
1382
assert tail > 0;
1383
}
1384
}
1385
1386
}
1387
1388