Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java
41171 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.Closeable;
29
import java.io.IOException;
30
import java.net.InetSocketAddress;
31
import java.nio.ByteBuffer;
32
import java.nio.channels.SocketChannel;
33
import java.util.Arrays;
34
import java.util.IdentityHashMap;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.TreeMap;
38
import java.util.concurrent.CompletableFuture;
39
import java.util.concurrent.CompletionStage;
40
import java.util.concurrent.ConcurrentLinkedDeque;
41
import java.util.concurrent.Flow;
42
import java.util.function.BiPredicate;
43
import java.util.function.Predicate;
44
import java.net.http.HttpClient;
45
import java.net.http.HttpClient.Version;
46
import java.net.http.HttpHeaders;
47
import jdk.internal.net.http.common.Demand;
48
import jdk.internal.net.http.common.FlowTube;
49
import jdk.internal.net.http.common.Logger;
50
import jdk.internal.net.http.common.SequentialScheduler;
51
import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
52
import jdk.internal.net.http.common.Log;
53
import jdk.internal.net.http.common.Utils;
54
import static java.net.http.HttpClient.Version.HTTP_2;
55
import static jdk.internal.net.http.common.Utils.ProxyHeaders;
56
57
/**
58
* Wraps socket channel layer and takes care of SSL also.
59
*
60
* Subtypes are:
61
* PlainHttpConnection: regular direct TCP connection to server
62
* PlainProxyConnection: plain text proxy connection
63
* PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
64
* AsyncSSLConnection: TLS channel direct to server
65
* AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
66
*/
67
abstract class HttpConnection implements Closeable {
68
69
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
70
final static Logger DEBUG_LOGGER = Utils.getDebugLogger(
71
() -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
72
73
/** The address this connection is connected to. Could be a server or a proxy. */
74
final InetSocketAddress address;
75
private final HttpClientImpl client;
76
private final TrailingOperations trailingOperations;
77
78
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
79
this.address = address;
80
this.client = client;
81
trailingOperations = new TrailingOperations();
82
}
83
84
private static final class TrailingOperations {
85
private final Map<CompletionStage<?>, Boolean> operations =
86
new IdentityHashMap<>();
87
void add(CompletionStage<?> cf) {
88
synchronized(operations) {
89
operations.put(cf, Boolean.TRUE);
90
cf.whenComplete((r,t)-> remove(cf));
91
}
92
}
93
boolean remove(CompletionStage<?> cf) {
94
synchronized(operations) {
95
return operations.remove(cf);
96
}
97
}
98
}
99
100
final void addTrailingOperation(CompletionStage<?> cf) {
101
trailingOperations.add(cf);
102
}
103
104
// final void removeTrailingOperation(CompletableFuture<?> cf) {
105
// trailingOperations.remove(cf);
106
// }
107
108
final HttpClientImpl client() {
109
return client;
110
}
111
112
/**
113
* Initiates the connect phase.
114
*
115
* Returns a CompletableFuture that completes when the underlying
116
* TCP connection has been established or an error occurs.
117
*/
118
public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
119
120
/**
121
* Finishes the connection phase.
122
*
123
* Returns a CompletableFuture that completes when any additional,
124
* type specific, setup has been done. Must be called after connectAsync. */
125
public abstract CompletableFuture<Void> finishConnect();
126
127
/** Tells whether, or not, this connection is connected to its destination. */
128
abstract boolean connected();
129
130
/** Tells whether, or not, this connection is secure ( over SSL ) */
131
abstract boolean isSecure();
132
133
/**
134
* Tells whether, or not, this connection is proxied.
135
* Returns true for tunnel connections, or clear connection to
136
* any host through proxy.
137
*/
138
abstract boolean isProxied();
139
140
/**
141
* Returns the address of the proxy used by this connection.
142
* Returns the proxy address for tunnel connections, or
143
* clear connection to any host through proxy.
144
* Returns {@code null} otherwise.
145
*/
146
abstract InetSocketAddress proxy();
147
148
/** Tells whether, or not, this connection is open. */
149
final boolean isOpen() {
150
return channel().isOpen() &&
151
(connected() ? !getConnectionFlow().isFinished() : true);
152
}
153
154
/**
155
* Forces a call to the native implementation of the
156
* connection's channel to verify that this channel is still
157
* open.
158
* <p>
159
* This method should only be called just after an HTTP/1.1
160
* connection is retrieved from the HTTP/1.1 connection pool.
161
* It is used to trigger an early detection of the channel state,
162
* before handling the connection over to the HTTP stack.
163
* It helps minimizing race conditions where the selector manager
164
* thread hasn't woken up - or hasn't raised the event, before
165
* the connection was retrieved from the pool. It helps reduce
166
* the occurrence of "HTTP/1.1 parser received no bytes"
167
* exception, when the server closes the connection while
168
* it's being taken out of the pool.
169
* <p>
170
* This method attempts to read one byte from the underlying
171
* channel. Because the connection was in the pool - there
172
* should be nothing to read.
173
* <p>
174
* If {@code read} manages to read a byte off the connection, this is a
175
* protocol error: the method closes the connection and returns false.
176
* If {@code read} returns EOF, the method closes the connection and
177
* returns false.
178
* If {@code read} throws an exception, the method returns false.
179
* Otherwise, {@code read} returns 0, the channel appears to be
180
* still open, and the method returns true.
181
* @return true if the channel appears to be still open.
182
*/
183
final boolean checkOpen() {
184
if (isOpen()) {
185
try {
186
// channel is non blocking
187
int read = channel().read(ByteBuffer.allocate(1));
188
if (read == 0) return true;
189
close();
190
} catch (IOException x) {
191
debug.log("Pooled connection is no longer operational: %s",
192
x.toString());
193
return false;
194
}
195
}
196
return false;
197
}
198
199
interface HttpPublisher extends FlowTube.TubePublisher {
200
void enqueue(List<ByteBuffer> buffers) throws IOException;
201
void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
202
void signalEnqueued() throws IOException;
203
}
204
205
/**
206
* Returns the HTTP publisher associated with this connection. May be null
207
* if invoked before connecting.
208
*/
209
abstract HttpPublisher publisher();
210
211
// HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
212
private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
213
proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
214
215
/**
216
* Returns true if the given client's SSL parameter protocols contains at
217
* least one TLS version that HTTP/2 requires.
218
*/
219
private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
220
String[] protos = client.sslParameters().getProtocols();
221
if (protos != null) {
222
return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
223
} else {
224
return false;
225
}
226
}
227
228
/**
229
* Factory for retrieving HttpConnections. A connection can be retrieved
230
* from the connection pool, or a new one created if none available.
231
*
232
* The given {@code addr} is the ultimate destination. Any proxies,
233
* etc, are determined from the request. Returns a concrete instance which
234
* is one of the following:
235
* {@link PlainHttpConnection}
236
* {@link PlainTunnelingConnection}
237
*
238
* The returned connection, if not from the connection pool, must have its,
239
* connect() or connectAsync() method invoked, which ( when it completes
240
* successfully ) renders the connection usable for requests.
241
*/
242
public static HttpConnection getConnection(InetSocketAddress addr,
243
HttpClientImpl client,
244
HttpRequestImpl request,
245
Version version) {
246
// The default proxy selector may select a proxy whose address is
247
// unresolved. We must resolve the address before connecting to it.
248
InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
249
HttpConnection c = null;
250
boolean secure = request.secure();
251
ConnectionPool pool = client.connectionPool();
252
253
if (!secure) {
254
c = pool.getConnection(false, addr, proxy);
255
if (c != null && c.checkOpen() /* may have been eof/closed when in the pool */) {
256
final HttpConnection conn = c;
257
if (DEBUG_LOGGER.on())
258
DEBUG_LOGGER.log(conn.getConnectionFlow()
259
+ ": plain connection retrieved from HTTP/1.1 pool");
260
return c;
261
} else {
262
return getPlainConnection(addr, proxy, request, client);
263
}
264
} else { // secure
265
if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
266
c = pool.getConnection(true, addr, proxy);
267
}
268
if (c != null && c.isOpen()) {
269
final HttpConnection conn = c;
270
if (DEBUG_LOGGER.on())
271
DEBUG_LOGGER.log(conn.getConnectionFlow()
272
+ ": SSL connection retrieved from HTTP/1.1 pool");
273
return c;
274
} else {
275
String[] alpn = null;
276
if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
277
alpn = new String[] { "h2", "http/1.1" };
278
}
279
return getSSLConnection(addr, proxy, alpn, request, client);
280
}
281
}
282
}
283
284
private static HttpConnection getSSLConnection(InetSocketAddress addr,
285
InetSocketAddress proxy,
286
String[] alpn,
287
HttpRequestImpl request,
288
HttpClientImpl client) {
289
if (proxy != null)
290
return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
291
proxyTunnelHeaders(request));
292
else
293
return new AsyncSSLConnection(addr, client, alpn);
294
}
295
296
/**
297
* This method is used to build a filter that will accept or
298
* veto (header-name, value) tuple for transmission on the
299
* wire.
300
* The filter is applied to the headers when sending the headers
301
* to the remote party.
302
* Which tuple is accepted/vetoed depends on:
303
* <pre>
304
* - whether the connection is a tunnel connection
305
* [talking to a server through a proxy tunnel]
306
* - whether the method is CONNECT
307
* [establishing a CONNECT tunnel through a proxy]
308
* - whether the request is using a proxy
309
* (and the connection is not a tunnel)
310
* [talking to a server through a proxy]
311
* - whether the request is a direct connection to
312
* a server (no tunnel, no proxy).
313
* </pre>
314
* @param request
315
* @return
316
*/
317
BiPredicate<String,String> headerFilter(HttpRequestImpl request) {
318
if (isTunnel()) {
319
// talking to a server through a proxy tunnel
320
// don't send proxy-* headers to a plain server
321
assert !request.isConnect();
322
return Utils.NO_PROXY_HEADERS_FILTER;
323
} else if (request.isConnect()) {
324
// establishing a proxy tunnel
325
// check for proxy tunnel disabled schemes
326
// assert !this.isTunnel();
327
assert request.proxy() == null;
328
return Utils.PROXY_TUNNEL_FILTER;
329
} else if (request.proxy() != null) {
330
// talking to a server through a proxy (no tunnel)
331
// check for proxy disabled schemes
332
// assert !isTunnel() && !request.isConnect();
333
return Utils.PROXY_FILTER;
334
} else {
335
// talking to a server directly (no tunnel, no proxy)
336
// don't send proxy-* headers to a plain server
337
// assert request.proxy() == null && !request.isConnect();
338
return Utils.NO_PROXY_HEADERS_FILTER;
339
}
340
}
341
342
BiPredicate<String,String> contextRestricted(HttpRequestImpl request, HttpClient client) {
343
if (!isTunnel() && request.isConnect()) {
344
// establishing a proxy tunnel
345
assert request.proxy() == null;
346
return Utils.PROXY_TUNNEL_RESTRICTED(client);
347
} else {
348
return Utils.CONTEXT_RESTRICTED(client);
349
}
350
}
351
352
// Composes a new immutable HttpHeaders that combines the
353
// user and system header but only keeps those headers that
354
// start with "proxy-"
355
private static ProxyHeaders proxyTunnelHeaders(HttpRequestImpl request) {
356
HttpHeaders userHeaders = HttpHeaders.of(request.headers().map(), Utils.PROXY_TUNNEL_FILTER);
357
HttpHeaders systemHeaders = HttpHeaders.of(request.getSystemHeadersBuilder().map(), Utils.PROXY_TUNNEL_FILTER);
358
return new ProxyHeaders(userHeaders, systemHeaders);
359
}
360
361
/* Returns either a plain HTTP connection or a plain tunnelling connection
362
* for proxied WebSocket */
363
private static HttpConnection getPlainConnection(InetSocketAddress addr,
364
InetSocketAddress proxy,
365
HttpRequestImpl request,
366
HttpClientImpl client) {
367
if (request.isWebSocket() && proxy != null)
368
return new PlainTunnelingConnection(addr, proxy, client,
369
proxyTunnelHeaders(request));
370
371
if (proxy == null)
372
return new PlainHttpConnection(addr, client);
373
else
374
return new PlainProxyConnection(proxy, client);
375
}
376
377
void closeOrReturnToCache(HttpHeaders hdrs) {
378
if (hdrs == null) {
379
// the connection was closed by server, eof
380
Log.logTrace("Cannot return connection to pool: closing {0}", this);
381
close();
382
return;
383
}
384
HttpClientImpl client = client();
385
if (client == null) {
386
Log.logTrace("Client released: closing {0}", this);
387
close();
388
return;
389
}
390
ConnectionPool pool = client.connectionPool();
391
boolean keepAlive = hdrs.firstValue("Connection")
392
.map((s) -> !s.equalsIgnoreCase("close"))
393
.orElse(true);
394
395
if (keepAlive && checkOpen()) {
396
Log.logTrace("Returning connection to the pool: {0}", this);
397
pool.returnToPool(this);
398
} else {
399
Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}",
400
keepAlive, isOpen(), this);
401
close();
402
}
403
}
404
405
/* Tells whether or not this connection is a tunnel through a proxy */
406
boolean isTunnel() { return false; }
407
408
abstract SocketChannel channel();
409
410
final InetSocketAddress address() {
411
return address;
412
}
413
414
abstract ConnectionPool.CacheKey cacheKey();
415
416
/**
417
* Closes this connection, by returning the socket to its connection pool.
418
*/
419
@Override
420
public abstract void close();
421
422
abstract FlowTube getConnectionFlow();
423
424
/**
425
* A publisher that makes it possible to publish (write) ordered (normal
426
* priority) and unordered (high priority) buffers downstream.
427
*/
428
final class PlainHttpPublisher implements HttpPublisher {
429
final Object reading;
430
PlainHttpPublisher() {
431
this(new Object());
432
}
433
PlainHttpPublisher(Object readingLock) {
434
this.reading = readingLock;
435
}
436
final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
437
final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
438
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
439
volatile HttpWriteSubscription subscription;
440
final SequentialScheduler writeScheduler =
441
new SequentialScheduler(this::flushTask);
442
@Override
443
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
444
synchronized (reading) {
445
//assert this.subscription == null;
446
//assert this.subscriber == null;
447
if (subscription == null) {
448
subscription = new HttpWriteSubscription();
449
}
450
this.subscriber = subscriber;
451
}
452
// TODO: should we do this in the flow?
453
subscriber.onSubscribe(subscription);
454
signal();
455
}
456
457
void flushTask(DeferredCompleter completer) {
458
try {
459
HttpWriteSubscription sub = subscription;
460
if (sub != null) sub.flush();
461
} finally {
462
completer.complete();
463
}
464
}
465
466
void signal() {
467
writeScheduler.runOrSchedule();
468
}
469
470
final class HttpWriteSubscription implements Flow.Subscription {
471
final Demand demand = new Demand();
472
473
@Override
474
public void request(long n) {
475
if (n <= 0) throw new IllegalArgumentException("non-positive request");
476
demand.increase(n);
477
if (debug.on())
478
debug.log("HttpPublisher: got request of " + n + " from "
479
+ getConnectionFlow());
480
writeScheduler.runOrSchedule();
481
}
482
483
@Override
484
public void cancel() {
485
if (debug.on())
486
debug.log("HttpPublisher: cancelled by " + getConnectionFlow());
487
}
488
489
private boolean isEmpty() {
490
return queue.isEmpty() && priority.isEmpty();
491
}
492
493
private List<ByteBuffer> poll() {
494
List<ByteBuffer> elem = priority.poll();
495
return elem == null ? queue.poll() : elem;
496
}
497
498
void flush() {
499
while (!isEmpty() && demand.tryDecrement()) {
500
List<ByteBuffer> elem = poll();
501
if (debug.on())
502
debug.log("HttpPublisher: sending "
503
+ Utils.remaining(elem) + " bytes ("
504
+ elem.size() + " buffers) to "
505
+ getConnectionFlow());
506
subscriber.onNext(elem);
507
}
508
}
509
}
510
511
@Override
512
public void enqueue(List<ByteBuffer> buffers) throws IOException {
513
queue.add(buffers);
514
int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
515
debug.log("added %d bytes to the write queue", bytes);
516
}
517
518
@Override
519
public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
520
// Unordered frames are sent before existing frames.
521
int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
522
priority.add(buffers);
523
debug.log("added %d bytes in the priority write queue", bytes);
524
}
525
526
@Override
527
public void signalEnqueued() throws IOException {
528
debug.log("signalling the publisher of the write queue");
529
signal();
530
}
531
}
532
533
String dbgTag;
534
final String dbgString() {
535
FlowTube flow = getConnectionFlow();
536
String tag = dbgTag;
537
if (tag == null && flow != null) {
538
dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
539
} else if (tag == null) {
540
tag = this.getClass().getSimpleName() + "(?)";
541
}
542
return tag;
543
}
544
545
@Override
546
public String toString() {
547
return "HttpConnection: " + channel().toString();
548
}
549
}
550
551