Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
41171 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.io.UncheckedIOException;
31
import java.net.InetSocketAddress;
32
import java.net.URI;
33
import java.nio.ByteBuffer;
34
import java.nio.charset.StandardCharsets;
35
import java.util.Iterator;
36
import java.util.List;
37
import java.util.Locale;
38
import java.util.Map;
39
import java.util.Set;
40
import java.util.concurrent.CompletableFuture;
41
import java.util.ArrayList;
42
import java.util.Objects;
43
import java.util.concurrent.ConcurrentMap;
44
import java.util.concurrent.ConcurrentHashMap;
45
import java.util.concurrent.ConcurrentLinkedQueue;
46
import java.util.concurrent.Flow;
47
import java.util.function.Function;
48
import java.util.function.Supplier;
49
import javax.net.ssl.SSLEngine;
50
import javax.net.ssl.SSLException;
51
import java.net.http.HttpClient;
52
import java.net.http.HttpHeaders;
53
import jdk.internal.net.http.HttpConnection.HttpPublisher;
54
import jdk.internal.net.http.common.FlowTube;
55
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
56
import jdk.internal.net.http.common.HttpHeadersBuilder;
57
import jdk.internal.net.http.common.Log;
58
import jdk.internal.net.http.common.Logger;
59
import jdk.internal.net.http.common.MinimalFuture;
60
import jdk.internal.net.http.common.SequentialScheduler;
61
import jdk.internal.net.http.common.Utils;
62
import jdk.internal.net.http.frame.ContinuationFrame;
63
import jdk.internal.net.http.frame.DataFrame;
64
import jdk.internal.net.http.frame.ErrorFrame;
65
import jdk.internal.net.http.frame.FramesDecoder;
66
import jdk.internal.net.http.frame.FramesEncoder;
67
import jdk.internal.net.http.frame.GoAwayFrame;
68
import jdk.internal.net.http.frame.HeaderFrame;
69
import jdk.internal.net.http.frame.HeadersFrame;
70
import jdk.internal.net.http.frame.Http2Frame;
71
import jdk.internal.net.http.frame.MalformedFrame;
72
import jdk.internal.net.http.frame.OutgoingHeaders;
73
import jdk.internal.net.http.frame.PingFrame;
74
import jdk.internal.net.http.frame.PushPromiseFrame;
75
import jdk.internal.net.http.frame.ResetFrame;
76
import jdk.internal.net.http.frame.SettingsFrame;
77
import jdk.internal.net.http.frame.WindowUpdateFrame;
78
import jdk.internal.net.http.hpack.Encoder;
79
import jdk.internal.net.http.hpack.Decoder;
80
import jdk.internal.net.http.hpack.DecodingCallback;
81
import static java.nio.charset.StandardCharsets.UTF_8;
82
import static jdk.internal.net.http.frame.SettingsFrame.*;
83
84
/**
85
* An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
86
* over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
87
*
88
* Http2Connections belong to a Http2ClientImpl, (one of) which belongs
89
* to a HttpClientImpl.
90
*
91
* Creation cases:
92
* 1) upgraded HTTP/1.1 plain tcp connection
93
* 2) prior knowledge directly created plain tcp connection
94
* 3) directly created HTTP/2 SSL connection which uses ALPN.
95
*
96
* Sending is done by writing directly to underlying HttpConnection object which
97
* is operating in async mode. No flow control applies on output at this level
98
* and all writes are just executed as puts to an output Q belonging to HttpConnection
99
* Flow control is implemented by HTTP/2 protocol itself.
100
*
101
* Hpack header compression
102
* and outgoing stream creation is also done here, because these operations
103
* must be synchronized at the socket level. Stream objects send frames simply
104
* by placing them on the connection's output Queue. sendFrame() is called
105
* from a higher level (Stream) thread.
106
*
107
* asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
108
* incoming Http2Frames, and directs them to the appropriate Stream.incoming()
109
* or handles them directly itself. This thread performs hpack decompression
110
* and incoming stream creation (Server push). Incoming frames destined for a
111
* stream are provided by calling Stream.incoming().
112
*/
113
class Http2Connection {
114
115
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
116
final static Logger DEBUG_LOGGER =
117
Utils.getDebugLogger("Http2Connection"::toString, Utils.DEBUG);
118
private final Logger debugHpack =
119
Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK);
120
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
121
122
static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
123
static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
124
static private final int BUFFER = 8; // added as an upper bound
125
126
/**
127
* Flag set when no more streams to be opened on this connection.
128
* Two cases where it is used.
129
*
130
* 1. Two connections to the same server were opened concurrently, in which
131
* case one of them will be put in the cache, and the second will expire
132
* when all its opened streams (which usually should be a single client
133
* stream + possibly some additional push-promise server streams) complete.
134
* 2. A cached connection reaches its maximum number of streams (~ 2^31-1)
135
* either server / or client allocated, in which case it will be taken
136
* out of the cache - allowing a new connection to replace it. It will
137
* expire when all its still open streams (which could be many) eventually
138
* complete.
139
*/
140
private boolean finalStream;
141
142
/*
143
* ByteBuffer pooling strategy for HTTP/2 protocol.
144
*
145
* In general there are 4 points where ByteBuffers are used:
146
* - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing
147
* encrypted data in case of SSL connection.
148
*
149
* 1. Outgoing frames encoded to ByteBuffers.
150
*
151
* Outgoing ByteBuffers are created with required size and frequently
152
* small (except DataFrames, etc). At this place no pools at all. All
153
* outgoing buffers should eventually be collected by GC.
154
*
155
* 2. Incoming ByteBuffers (decoded to frames).
156
*
157
* Here, total elimination of BB pool is not a good idea.
158
* We don't know how many bytes we will receive through network.
159
*
160
* A possible future improvement ( currently not implemented ):
161
* Allocate buffers of reasonable size. The following life of the BB:
162
* - If all frames decoded from the BB are other than DataFrame and
163
* HeaderFrame (and HeaderFrame subclasses) BB is returned to pool,
164
* - If a DataFrame is decoded from the BB. In that case DataFrame refers
165
* to sub-buffer obtained by slice(). Such a BB is never returned to the
166
* pool and will eventually be GC'ed.
167
* - If a HeadersFrame is decoded from the BB. Then header decoding is
168
* performed inside processFrame method and the buffer could be release
169
* back to pool.
170
*
171
* 3. SSL encrypted buffers ( received ).
172
*
173
* The current implementation recycles encrypted buffers read from the
174
* channel. The pool of buffers has a maximum size of 3, SocketTube.MAX_BUFFERS,
175
* direct buffers which are shared by all connections on a given client.
176
* The pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2,
177
* but only for SSL encrypted buffers that circulate between the SocketTube
178
* Publisher and the SSLFlowDelegate Reader. Limiting the pool to this
179
* particular segment allows the use of direct buffers, thus avoiding any
180
* additional copy in the NIO socket channel implementation. See
181
* HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource,
182
* and SSLTube.recycler.
183
*/
184
185
186
// A small class that allows to control frames with respect to the state of
187
// the connection preface. Any data received before the connection
188
// preface is sent will be buffered.
189
private final class FramesController {
190
volatile boolean prefaceSent;
191
volatile List<ByteBuffer> pending;
192
193
boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
194
throws IOException
195
{
196
// if preface is not sent, buffers data in the pending list
197
if (!prefaceSent) {
198
if (debug.on())
199
debug.log("Preface not sent: buffering %d", buf.remaining());
200
synchronized (this) {
201
if (!prefaceSent) {
202
if (pending == null) pending = new ArrayList<>();
203
pending.add(buf);
204
if (debug.on())
205
debug.log("there are now %d bytes buffered waiting for preface to be sent"
206
+ Utils.remaining(pending)
207
);
208
return false;
209
}
210
}
211
}
212
213
// Preface is sent. Checks for pending data and flush it.
214
// We rely on this method being called from within the Http2TubeSubscriber
215
// scheduler, so we know that no other thread could execute this method
216
// concurrently while we're here.
217
// This ensures that later incoming buffers will not
218
// be processed before we have flushed the pending queue.
219
// No additional synchronization is therefore necessary here.
220
List<ByteBuffer> pending = this.pending;
221
this.pending = null;
222
if (pending != null) {
223
// flush pending data
224
if (debug.on()) debug.log(() -> "Processing buffered data: "
225
+ Utils.remaining(pending));
226
for (ByteBuffer b : pending) {
227
decoder.decode(b);
228
}
229
}
230
// push the received buffer to the frames decoder.
231
if (buf != EMPTY_TRIGGER) {
232
if (debug.on()) debug.log("Processing %d", buf.remaining());
233
decoder.decode(buf);
234
}
235
return true;
236
}
237
238
// Mark that the connection preface is sent
239
void markPrefaceSent() {
240
assert !prefaceSent;
241
synchronized (this) {
242
prefaceSent = true;
243
}
244
}
245
}
246
247
volatile boolean closed;
248
249
//-------------------------------------
250
final HttpConnection connection;
251
private final Http2ClientImpl client2;
252
private final ConcurrentMap<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
253
private int nextstreamid;
254
private int nextPushStream = 2;
255
// actual stream ids are not allocated until the Headers frame is ready
256
// to be sent. The following two fields are updated as soon as a stream
257
// is created and assigned to a connection. They are checked before
258
// assigning a stream to a connection.
259
private int lastReservedClientStreamid = 1;
260
private int lastReservedServerStreamid = 0;
261
private int numReservedClientStreams = 0; // count of current streams
262
private int numReservedServerStreams = 0; // count of current streams
263
private final Encoder hpackOut;
264
private final Decoder hpackIn;
265
final SettingsFrame clientSettings;
266
private volatile SettingsFrame serverSettings;
267
private final String key; // for HttpClientImpl.connections map
268
private final FramesDecoder framesDecoder;
269
private final FramesEncoder framesEncoder = new FramesEncoder();
270
271
/**
272
* Send Window controller for both connection and stream windows.
273
* Each of this connection's Streams MUST use this controller.
274
*/
275
private final WindowController windowController = new WindowController();
276
private final FramesController framesController = new FramesController();
277
private final Http2TubeSubscriber subscriber;
278
final ConnectionWindowUpdateSender windowUpdater;
279
private volatile Throwable cause;
280
private volatile Supplier<ByteBuffer> initial;
281
282
static final int DEFAULT_FRAME_SIZE = 16 * 1024;
283
284
285
// TODO: need list of control frames from other threads
286
// that need to be sent
287
288
private Http2Connection(HttpConnection connection,
289
Http2ClientImpl client2,
290
int nextstreamid,
291
String key) {
292
this.connection = connection;
293
this.client2 = client2;
294
this.subscriber = new Http2TubeSubscriber(client2.client());
295
this.nextstreamid = nextstreamid;
296
this.key = key;
297
this.clientSettings = this.client2.getClientSettings();
298
this.framesDecoder = new FramesDecoder(this::processFrame,
299
clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
300
// serverSettings will be updated by server
301
this.serverSettings = SettingsFrame.defaultRFCSettings();
302
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
303
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
304
if (debugHpack.on()) {
305
debugHpack.log("For the record:" + super.toString());
306
debugHpack.log("Decoder created: %s", hpackIn);
307
debugHpack.log("Encoder created: %s", hpackOut);
308
}
309
this.windowUpdater = new ConnectionWindowUpdateSender(this,
310
client2.getConnectionWindowSize(clientSettings));
311
}
312
313
/**
314
* Case 1) Create from upgraded HTTP/1.1 connection.
315
* Is ready to use. Can't be SSL. exchange is the Exchange
316
* that initiated the connection, whose response will be delivered
317
* on a Stream.
318
*/
319
private Http2Connection(HttpConnection connection,
320
Http2ClientImpl client2,
321
Exchange<?> exchange,
322
Supplier<ByteBuffer> initial)
323
throws IOException, InterruptedException
324
{
325
this(connection,
326
client2,
327
3, // stream 1 is registered during the upgrade
328
keyFor(connection));
329
reserveStream(true);
330
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
331
332
Stream<?> initialStream = createStream(exchange);
333
boolean opened = initialStream.registerStream(1, true);
334
if (debug.on() && !opened) {
335
debug.log("Initial stream was cancelled - but connection is maintained: " +
336
"reset frame will need to be sent later");
337
}
338
windowController.registerStream(1, getInitialSendWindowSize());
339
initialStream.requestSent();
340
// Upgrading:
341
// set callbacks before sending preface - makes sure anything that
342
// might be sent by the server will come our way.
343
this.initial = initial;
344
connectFlows(connection);
345
sendConnectionPreface();
346
if (!opened) {
347
debug.log("ensure reset frame is sent to cancel initial stream");
348
initialStream.sendCancelStreamFrame();
349
}
350
351
}
352
353
// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
354
// agreement from the server. Async style but completes immediately, because
355
// the connection is already connected.
356
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
357
Http2ClientImpl client2,
358
Exchange<?> exchange,
359
Supplier<ByteBuffer> initial)
360
{
361
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
362
}
363
364
// Requires TLS handshake. So, is really async
365
static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
366
Http2ClientImpl h2client,
367
Exchange<?> exchange) {
368
assert request.secure();
369
AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
370
HttpConnection.getConnection(request.getAddress(),
371
h2client.client(),
372
request,
373
HttpClient.Version.HTTP_2);
374
375
// Expose the underlying connection to the exchange's aborter so it can
376
// be closed if a timeout occurs.
377
exchange.connectionAborter.connection(connection);
378
379
return connection.connectAsync(exchange)
380
.thenCompose(unused -> connection.finishConnect())
381
.thenCompose(unused -> checkSSLConfig(connection))
382
.thenCompose(notused-> {
383
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
384
try {
385
Http2Connection hc = new Http2Connection(request, h2client, connection);
386
cf.complete(hc);
387
} catch (IOException e) {
388
cf.completeExceptionally(e);
389
}
390
return cf; } );
391
}
392
393
/**
394
* Cases 2) 3)
395
*
396
* request is request to be sent.
397
*/
398
private Http2Connection(HttpRequestImpl request,
399
Http2ClientImpl h2client,
400
HttpConnection connection)
401
throws IOException
402
{
403
this(connection,
404
h2client,
405
1,
406
keyFor(request.uri(), request.proxy()));
407
408
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
409
410
// safe to resume async reading now.
411
connectFlows(connection);
412
sendConnectionPreface();
413
}
414
415
private void connectFlows(HttpConnection connection) {
416
FlowTube tube = connection.getConnectionFlow();
417
// Connect the flow to our Http2TubeSubscriber:
418
tube.connectFlows(connection.publisher(), subscriber);
419
}
420
421
final HttpClientImpl client() {
422
return client2.client();
423
}
424
425
// call these before assigning a request/stream to a connection
426
// if false returned then a new Http2Connection is required
427
// if true, the stream may be assigned to this connection
428
// for server push, if false returned, then the stream should be cancelled
429
synchronized boolean reserveStream(boolean clientInitiated) throws IOException {
430
if (finalStream) {
431
return false;
432
}
433
if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
434
setFinalStream();
435
client2.deleteConnection(this);
436
return false;
437
} else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
438
setFinalStream();
439
client2.deleteConnection(this);
440
return false;
441
}
442
if (clientInitiated)
443
lastReservedClientStreamid+=2;
444
else
445
lastReservedServerStreamid+=2;
446
447
assert numReservedClientStreams >= 0;
448
assert numReservedServerStreams >= 0;
449
if (clientInitiated &&numReservedClientStreams >= maxConcurrentClientInitiatedStreams()) {
450
throw new IOException("too many concurrent streams");
451
} else if (clientInitiated) {
452
numReservedClientStreams++;
453
}
454
if (!clientInitiated && numReservedServerStreams >= maxConcurrentServerInitiatedStreams()) {
455
return false;
456
} else if (!clientInitiated) {
457
numReservedServerStreams++;
458
}
459
return true;
460
}
461
462
/**
463
* Throws an IOException if h2 was not negotiated
464
*/
465
private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
466
assert aconn.isSecure();
467
468
Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
469
CompletableFuture<Void> cf = new MinimalFuture<>();
470
SSLEngine engine = aconn.getEngine();
471
String engineAlpn = engine.getApplicationProtocol();
472
assert Objects.equals(alpn, engineAlpn)
473
: "alpn: %s, engine: %s".formatted(alpn, engineAlpn);
474
475
DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn );
476
477
if (alpn == null || !alpn.equals("h2")) {
478
String msg;
479
if (alpn == null) {
480
Log.logSSL("ALPN not supported");
481
msg = "ALPN not supported";
482
} else {
483
switch (alpn) {
484
case "":
485
Log.logSSL(msg = "No ALPN negotiated");
486
break;
487
case "http/1.1":
488
Log.logSSL( msg = "HTTP/1.1 ALPN returned");
489
break;
490
default:
491
Log.logSSL(msg = "Unexpected ALPN: " + alpn);
492
cf.completeExceptionally(new IOException(msg));
493
}
494
}
495
cf.completeExceptionally(new ALPNException(msg, aconn));
496
return cf;
497
}
498
cf.complete(null);
499
return cf;
500
};
501
502
return aconn.getALPN()
503
.whenComplete((r,t) -> {
504
if (t != null && t instanceof SSLException) {
505
// something went wrong during the initial handshake
506
// close the connection
507
aconn.close();
508
}
509
})
510
.thenCompose(checkAlpnCF);
511
}
512
513
synchronized boolean finalStream() {
514
return finalStream;
515
}
516
517
/**
518
* Mark this connection so no more streams created on it and it will close when
519
* all are complete.
520
*/
521
synchronized void setFinalStream() {
522
finalStream = true;
523
}
524
525
static String keyFor(HttpConnection connection) {
526
boolean isProxy = connection.isProxied(); // tunnel or plain clear connection through proxy
527
boolean isSecure = connection.isSecure();
528
InetSocketAddress addr = connection.address();
529
InetSocketAddress proxyAddr = connection.proxy();
530
assert isProxy == (proxyAddr != null);
531
532
return keyString(isSecure, proxyAddr, addr.getHostString(), addr.getPort());
533
}
534
535
static String keyFor(URI uri, InetSocketAddress proxy) {
536
boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
537
538
String host = uri.getHost();
539
int port = uri.getPort();
540
return keyString(isSecure, proxy, host, port);
541
}
542
543
544
// Compute the key for an HttpConnection in the Http2ClientImpl pool:
545
// The key string follows one of the three forms below:
546
// {C,S}:H:host:port
547
// C:P:proxy-host:proxy-port
548
// S:T:H:host:port;P:proxy-host:proxy-port
549
// C indicates clear text connection "http"
550
// S indicates secure "https"
551
// H indicates host (direct) connection
552
// P indicates proxy
553
// T indicates a tunnel connection through a proxy
554
//
555
// The first form indicates a direct connection to a server:
556
// - direct clear connection to an HTTP host:
557
// e.g.: "C:H:foo.com:80"
558
// - direct secure connection to an HTTPS host:
559
// e.g.: "S:H:foo.com:443"
560
// The second form indicates a clear connection to an HTTP/1.1 proxy:
561
// e.g.: "C:P:myproxy:8080"
562
// The third form indicates a secure tunnel connection to an HTTPS
563
// host through an HTTP/1.1 proxy:
564
// e.g: "S:T:H:foo.com:80;P:myproxy:8080"
565
static String keyString(boolean secure, InetSocketAddress proxy, String host, int port) {
566
if (secure && port == -1)
567
port = 443;
568
else if (!secure && port == -1)
569
port = 80;
570
var key = (secure ? "S:" : "C:");
571
if (proxy != null && !secure) {
572
// clear connection through proxy
573
key = key + "P:" + proxy.getHostString() + ":" + proxy.getPort();
574
} else if (proxy == null) {
575
// direct connection to host
576
key = key + "H:" + host + ":" + port;
577
} else {
578
// tunnel connection through proxy
579
key = key + "T:H:" + host + ":" + port + ";P:" + proxy.getHostString() + ":" + proxy.getPort();
580
}
581
return key;
582
}
583
584
String key() {
585
return this.key;
586
}
587
588
boolean offerConnection() {
589
return client2.offerConnection(this);
590
}
591
592
private HttpPublisher publisher() {
593
return connection.publisher();
594
}
595
596
private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
597
throws IOException
598
{
599
if (debugHpack.on()) debugHpack.log("decodeHeaders(%s)", decoder);
600
601
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
602
603
List<ByteBuffer> buffers = frame.getHeaderBlock();
604
int len = buffers.size();
605
for (int i = 0; i < len; i++) {
606
ByteBuffer b = buffers.get(i);
607
hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
608
}
609
}
610
611
final int getInitialSendWindowSize() {
612
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
613
}
614
615
final int maxConcurrentClientInitiatedStreams() {
616
return serverSettings.getParameter(MAX_CONCURRENT_STREAMS);
617
}
618
619
final int maxConcurrentServerInitiatedStreams() {
620
return clientSettings.getParameter(MAX_CONCURRENT_STREAMS);
621
}
622
623
void close() {
624
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
625
GoAwayFrame f = new GoAwayFrame(0,
626
ErrorFrame.NO_ERROR,
627
"Requested by user".getBytes(UTF_8));
628
// TODO: set last stream. For now zero ok.
629
sendFrame(f);
630
}
631
632
long count;
633
final void asyncReceive(ByteBuffer buffer) {
634
// We don't need to read anything and
635
// we don't want to send anything back to the server
636
// until the connection preface has been sent.
637
// Therefore we're going to wait if needed before reading
638
// (and thus replying) to anything.
639
// Starting to reply to something (e.g send an ACK to a
640
// SettingsFrame sent by the server) before the connection
641
// preface is fully sent might result in the server
642
// sending a GOAWAY frame with 'invalid_preface'.
643
//
644
// Note: asyncReceive is only called from the Http2TubeSubscriber
645
// sequential scheduler.
646
try {
647
Supplier<ByteBuffer> bs = initial;
648
// ensure that we always handle the initial buffer first,
649
// if any.
650
if (bs != null) {
651
initial = null;
652
ByteBuffer b = bs.get();
653
if (b.hasRemaining()) {
654
long c = ++count;
655
if (debug.on())
656
debug.log(() -> "H2 Receiving Initial(" + c +"): " + b.remaining());
657
framesController.processReceivedData(framesDecoder, b);
658
}
659
}
660
ByteBuffer b = buffer;
661
// the Http2TubeSubscriber scheduler ensures that the order of incoming
662
// buffers is preserved.
663
if (b == EMPTY_TRIGGER) {
664
if (debug.on()) debug.log("H2 Received EMPTY_TRIGGER");
665
boolean prefaceSent = framesController.prefaceSent;
666
assert prefaceSent;
667
// call framesController.processReceivedData to potentially
668
// trigger the processing of all the data buffered there.
669
framesController.processReceivedData(framesDecoder, buffer);
670
if (debug.on()) debug.log("H2 processed buffered data");
671
} else {
672
long c = ++count;
673
if (debug.on())
674
debug.log("H2 Receiving(%d): %d", c, b.remaining());
675
framesController.processReceivedData(framesDecoder, buffer);
676
if (debug.on()) debug.log("H2 processed(%d)", c);
677
}
678
} catch (Throwable e) {
679
String msg = Utils.stackTrace(e);
680
Log.logTrace(msg);
681
shutdown(e);
682
}
683
}
684
685
Throwable getRecordedCause() {
686
return cause;
687
}
688
689
void shutdown(Throwable t) {
690
if (debug.on()) debug.log(() -> "Shutting down h2c (closed="+closed+"): " + t);
691
if (closed == true) return;
692
synchronized (this) {
693
if (closed == true) return;
694
closed = true;
695
}
696
if (Log.errors()) {
697
if (!(t instanceof EOFException) || isActive()) {
698
Log.logError(t);
699
} else if (t != null) {
700
Log.logError("Shutting down connection: {0}", t.getMessage());
701
}
702
}
703
Throwable initialCause = this.cause;
704
if (initialCause == null) this.cause = t;
705
client2.deleteConnection(this);
706
for (Stream<?> s : streams.values()) {
707
try {
708
s.connectionClosing(t);
709
} catch (Throwable e) {
710
Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
711
}
712
}
713
connection.close();
714
}
715
716
/**
717
* Streams initiated by a client MUST use odd-numbered stream
718
* identifiers; those initiated by the server MUST use even-numbered
719
* stream identifiers.
720
*/
721
private static final boolean isServerInitiatedStream(int streamid) {
722
return (streamid & 0x1) == 0;
723
}
724
725
/**
726
* Handles stream 0 (common) frames that apply to whole connection and passes
727
* other stream specific frames to that Stream object.
728
*
729
* Invokes Stream.incoming() which is expected to process frame without
730
* blocking.
731
*/
732
void processFrame(Http2Frame frame) throws IOException {
733
Log.logFrames(frame, "IN");
734
int streamid = frame.streamid();
735
if (frame instanceof MalformedFrame) {
736
Log.logError(((MalformedFrame) frame).getMessage());
737
if (streamid == 0) {
738
framesDecoder.close("Malformed frame on stream 0");
739
protocolError(((MalformedFrame) frame).getErrorCode(),
740
((MalformedFrame) frame).getMessage());
741
} else {
742
if (debug.on())
743
debug.log(() -> "Reset stream: " + ((MalformedFrame) frame).getMessage());
744
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
745
}
746
return;
747
}
748
if (streamid == 0) {
749
handleConnectionFrame(frame);
750
} else {
751
if (frame instanceof SettingsFrame) {
752
// The stream identifier for a SETTINGS frame MUST be zero
753
framesDecoder.close(
754
"The stream identifier for a SETTINGS frame MUST be zero");
755
protocolError(GoAwayFrame.PROTOCOL_ERROR);
756
return;
757
}
758
759
Stream<?> stream = getStream(streamid);
760
if (stream == null) {
761
// Should never receive a frame with unknown stream id
762
763
if (frame instanceof HeaderFrame) {
764
// always decode the headers as they may affect
765
// connection-level HPACK decoding state
766
DecodingCallback decoder = new ValidatingHeadersConsumer();
767
try {
768
decodeHeaders((HeaderFrame) frame, decoder);
769
} catch (UncheckedIOException e) {
770
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
771
return;
772
}
773
}
774
775
if (!(frame instanceof ResetFrame)) {
776
if (frame instanceof DataFrame) {
777
dropDataFrame((DataFrame)frame);
778
}
779
if (isServerInitiatedStream(streamid)) {
780
if (streamid < nextPushStream) {
781
// trailing data on a cancelled push promise stream,
782
// reset will already have been sent, ignore
783
Log.logTrace("Ignoring cancelled push promise frame " + frame);
784
} else {
785
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
786
}
787
} else if (streamid >= nextstreamid) {
788
// otherwise the stream has already been reset/closed
789
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
790
}
791
}
792
return;
793
}
794
if (frame instanceof PushPromiseFrame) {
795
PushPromiseFrame pp = (PushPromiseFrame)frame;
796
try {
797
handlePushPromise(stream, pp);
798
} catch (UncheckedIOException e) {
799
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
800
return;
801
}
802
} else if (frame instanceof HeaderFrame) {
803
// decode headers (or continuation)
804
try {
805
decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
806
} catch (UncheckedIOException e) {
807
debug.log("Error decoding headers: " + e.getMessage(), e);
808
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
809
return;
810
}
811
stream.incoming(frame);
812
} else {
813
stream.incoming(frame);
814
}
815
}
816
}
817
818
final void dropDataFrame(DataFrame df) {
819
if (closed) return;
820
if (debug.on()) {
821
debug.log("Dropping data frame for stream %d (%d payload bytes)",
822
df.streamid(), df.payloadLength());
823
}
824
ensureWindowUpdated(df);
825
}
826
827
final void ensureWindowUpdated(DataFrame df) {
828
try {
829
if (closed) return;
830
int length = df.payloadLength();
831
if (length > 0) {
832
windowUpdater.update(length);
833
}
834
} catch(Throwable t) {
835
Log.logError("Unexpected exception while updating window: {0}", (Object)t);
836
}
837
}
838
839
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
840
throws IOException
841
{
842
// always decode the headers as they may affect connection-level HPACK
843
// decoding state
844
HeaderDecoder decoder = new HeaderDecoder();
845
decodeHeaders(pp, decoder);
846
847
HttpRequestImpl parentReq = parent.request;
848
int promisedStreamid = pp.getPromisedStream();
849
if (promisedStreamid != nextPushStream) {
850
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
851
return;
852
} else if (!reserveStream(false)) {
853
resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM);
854
return;
855
} else {
856
nextPushStream += 2;
857
}
858
859
HttpHeaders headers = decoder.headers();
860
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
861
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
862
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
863
pushExch.exchImpl = pushStream;
864
pushStream.registerStream(promisedStreamid, true);
865
parent.incoming_pushPromise(pushReq, pushStream);
866
}
867
868
private void handleConnectionFrame(Http2Frame frame)
869
throws IOException
870
{
871
switch (frame.type()) {
872
case SettingsFrame.TYPE -> handleSettings((SettingsFrame) frame);
873
case PingFrame.TYPE -> handlePing((PingFrame) frame);
874
case GoAwayFrame.TYPE -> handleGoAway((GoAwayFrame) frame);
875
case WindowUpdateFrame.TYPE -> handleWindowUpdate((WindowUpdateFrame) frame);
876
877
default -> protocolError(ErrorFrame.PROTOCOL_ERROR);
878
}
879
}
880
881
void resetStream(int streamid, int code) {
882
try {
883
if (connection.channel().isOpen()) {
884
// no need to try & send a reset frame if the
885
// connection channel is already closed.
886
Log.logError(
887
"Resetting stream {0,number,integer} with error code {1,number,integer}",
888
streamid, code);
889
markStream(streamid, code);
890
ResetFrame frame = new ResetFrame(streamid, code);
891
sendFrame(frame);
892
} else if (debug.on()) {
893
debug.log("Channel already closed, no need to reset stream %d",
894
streamid);
895
}
896
} finally {
897
decrementStreamsCount(streamid);
898
closeStream(streamid);
899
}
900
}
901
902
private void markStream(int streamid, int code) {
903
Stream<?> s = streams.get(streamid);
904
if (s != null) s.markStream(code);
905
}
906
907
// reduce count of streams by 1 if stream still exists
908
synchronized void decrementStreamsCount(int streamid) {
909
Stream<?> s = streams.get(streamid);
910
if (s == null || !s.deRegister())
911
return;
912
if (streamid % 2 == 1) {
913
numReservedClientStreams--;
914
assert numReservedClientStreams >= 0 :
915
"negative client stream count for stream=" + streamid;
916
} else {
917
numReservedServerStreams--;
918
assert numReservedServerStreams >= 0 :
919
"negative server stream count for stream=" + streamid;
920
}
921
}
922
923
void closeStream(int streamid) {
924
if (debug.on()) debug.log("Closed stream %d", streamid);
925
boolean isClient = (streamid % 2) == 1;
926
Stream<?> s = streams.remove(streamid);
927
if (s != null) {
928
// decrement the reference count on the HttpClientImpl
929
// to allow the SelectorManager thread to exit if no
930
// other operation is pending and the facade is no
931
// longer referenced.
932
client().streamUnreference();
933
}
934
// ## Remove s != null. It is a hack for delayed cancellation,reset
935
if (s != null && !(s instanceof Stream.PushedStream)) {
936
// Since PushStreams have no request body, then they have no
937
// corresponding entry in the window controller.
938
windowController.removeStream(streamid);
939
}
940
if (finalStream() && streams.isEmpty()) {
941
// should be only 1 stream, but there might be more if server push
942
close();
943
}
944
}
945
946
/**
947
* Increments this connection's send Window by the amount in the given frame.
948
*/
949
private void handleWindowUpdate(WindowUpdateFrame f)
950
throws IOException
951
{
952
int amount = f.getUpdate();
953
if (amount <= 0) {
954
// ## temporarily disable to workaround a bug in Jetty where it
955
// ## sends Window updates with a 0 update value.
956
//protocolError(ErrorFrame.PROTOCOL_ERROR);
957
} else {
958
boolean success = windowController.increaseConnectionWindow(amount);
959
if (!success) {
960
protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow
961
}
962
}
963
}
964
965
private void protocolError(int errorCode)
966
throws IOException
967
{
968
protocolError(errorCode, null);
969
}
970
971
private void protocolError(int errorCode, String msg)
972
throws IOException
973
{
974
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
975
sendFrame(frame);
976
shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
977
}
978
979
private void handleSettings(SettingsFrame frame)
980
throws IOException
981
{
982
assert frame.streamid() == 0;
983
if (!frame.getFlag(SettingsFrame.ACK)) {
984
int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
985
if (newWindowSize != -1) {
986
int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
987
int diff = newWindowSize - oldWindowSize;
988
if (diff != 0) {
989
windowController.adjustActiveStreams(diff);
990
}
991
}
992
993
serverSettings.update(frame);
994
sendFrame(new SettingsFrame(SettingsFrame.ACK));
995
}
996
}
997
998
private void handlePing(PingFrame frame)
999
throws IOException
1000
{
1001
frame.setFlag(PingFrame.ACK);
1002
sendUnorderedFrame(frame);
1003
}
1004
1005
private void handleGoAway(GoAwayFrame frame)
1006
throws IOException
1007
{
1008
shutdown(new IOException(
1009
String.valueOf(connection.channel().getLocalAddress())
1010
+": GOAWAY received"));
1011
}
1012
1013
/**
1014
* Max frame size we are allowed to send
1015
*/
1016
public int getMaxSendFrameSize() {
1017
int param = serverSettings.getParameter(MAX_FRAME_SIZE);
1018
if (param == -1) {
1019
param = DEFAULT_FRAME_SIZE;
1020
}
1021
return param;
1022
}
1023
1024
/**
1025
* Max frame size we will receive
1026
*/
1027
public int getMaxReceiveFrameSize() {
1028
return clientSettings.getParameter(MAX_FRAME_SIZE);
1029
}
1030
1031
private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1032
1033
private static final byte[] PREFACE_BYTES =
1034
CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
1035
1036
/**
1037
* Sends Connection preface and Settings frame with current preferred
1038
* values
1039
*/
1040
private void sendConnectionPreface() throws IOException {
1041
Log.logTrace("{0}: start sending connection preface to {1}",
1042
connection.channel().getLocalAddress(),
1043
connection.address());
1044
SettingsFrame sf = new SettingsFrame(clientSettings);
1045
ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
1046
Log.logFrames(sf, "OUT");
1047
// send preface bytes and SettingsFrame together
1048
HttpPublisher publisher = publisher();
1049
publisher.enqueueUnordered(List.of(buf));
1050
publisher.signalEnqueued();
1051
// mark preface sent.
1052
framesController.markPrefaceSent();
1053
Log.logTrace("PREFACE_BYTES sent");
1054
Log.logTrace("Settings Frame sent");
1055
1056
// send a Window update for the receive buffer we are using
1057
// minus the initial 64 K -1 specified in protocol:
1058
// RFC 7540, Section 6.9.2:
1059
// "[...] the connection flow-control window is set to the default
1060
// initial window size until a WINDOW_UPDATE frame is received."
1061
//
1062
// Note that the default initial window size, not to be confused
1063
// with the initial window size, is defined by RFC 7540 as
1064
// 64K -1.
1065
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
1066
if (len != 0) {
1067
if (Log.channel()) {
1068
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
1069
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
1070
}
1071
windowUpdater.sendWindowUpdate(len);
1072
}
1073
// there will be an ACK to the windows update - which should
1074
// cause any pending data stored before the preface was sent to be
1075
// flushed (see PrefaceController).
1076
Log.logTrace("finished sending connection preface");
1077
if (debug.on())
1078
debug.log("Triggering processing of buffered data"
1079
+ " after sending connection preface");
1080
subscriber.onNext(List.of(EMPTY_TRIGGER));
1081
}
1082
1083
/**
1084
* Returns an existing Stream with given id, or null if doesn't exist
1085
*/
1086
@SuppressWarnings("unchecked")
1087
<T> Stream<T> getStream(int streamid) {
1088
return (Stream<T>)streams.get(streamid);
1089
}
1090
1091
/**
1092
* Creates Stream with given id.
1093
*/
1094
final <T> Stream<T> createStream(Exchange<T> exchange) {
1095
Stream<T> stream = new Stream<>(this, exchange, windowController);
1096
return stream;
1097
}
1098
1099
<T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
1100
PushGroup<T> pg = parent.exchange.getPushGroup();
1101
return new Stream.PushedStream<>(pg, this, pushEx);
1102
}
1103
1104
<T> void putStream(Stream<T> stream, int streamid) {
1105
// increment the reference count on the HttpClientImpl
1106
// to prevent the SelectorManager thread from exiting until
1107
// the stream is closed.
1108
client().streamReference();
1109
streams.put(streamid, stream);
1110
}
1111
1112
/**
1113
* Encode the headers into a List<ByteBuffer> and then create HEADERS
1114
* and CONTINUATION frames from the list and return the List<Http2Frame>.
1115
*/
1116
private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
1117
// max value of frame size is clamped by default frame size to avoid OOM
1118
int bufferSize = Math.min(Math.max(getMaxSendFrameSize(), 1024), DEFAULT_FRAME_SIZE);
1119
List<ByteBuffer> buffers = encodeHeadersImpl(
1120
bufferSize,
1121
frame.getAttachment().getRequestPseudoHeaders(),
1122
frame.getUserHeaders(),
1123
frame.getSystemHeaders());
1124
1125
List<HeaderFrame> frames = new ArrayList<>(buffers.size());
1126
Iterator<ByteBuffer> bufIterator = buffers.iterator();
1127
HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
1128
frames.add(oframe);
1129
while(bufIterator.hasNext()) {
1130
oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
1131
frames.add(oframe);
1132
}
1133
oframe.setFlag(HeaderFrame.END_HEADERS);
1134
return frames;
1135
}
1136
1137
// Dedicated cache for headers encoding ByteBuffer.
1138
// There can be no concurrent access to this buffer as all access to this buffer
1139
// and its content happen within a single critical code block section protected
1140
// by the sendLock. / (see sendFrame())
1141
// private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
1142
1143
private ByteBuffer getHeaderBuffer(int size) {
1144
ByteBuffer buf = ByteBuffer.allocate(size);
1145
buf.limit(size);
1146
return buf;
1147
}
1148
1149
/*
1150
* Encodes all the headers from the given HttpHeaders into the given List
1151
* of buffers.
1152
*
1153
* From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
1154
*
1155
* ...Just as in HTTP/1.x, header field names are strings of ASCII
1156
* characters that are compared in a case-insensitive fashion. However,
1157
* header field names MUST be converted to lowercase prior to their
1158
* encoding in HTTP/2...
1159
*/
1160
private List<ByteBuffer> encodeHeadersImpl(int bufferSize, HttpHeaders... headers) {
1161
ByteBuffer buffer = getHeaderBuffer(bufferSize);
1162
List<ByteBuffer> buffers = new ArrayList<>();
1163
for(HttpHeaders header : headers) {
1164
for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
1165
String lKey = e.getKey().toLowerCase(Locale.US);
1166
List<String> values = e.getValue();
1167
for (String value : values) {
1168
hpackOut.header(lKey, value);
1169
while (!hpackOut.encode(buffer)) {
1170
buffer.flip();
1171
buffers.add(buffer);
1172
buffer = getHeaderBuffer(bufferSize);
1173
}
1174
}
1175
}
1176
}
1177
buffer.flip();
1178
buffers.add(buffer);
1179
return buffers;
1180
}
1181
1182
1183
private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
1184
oh.streamid(stream.streamid);
1185
if (Log.headers()) {
1186
StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
1187
sb.append(stream.streamid).append(")\n");
1188
Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
1189
Log.dumpHeaders(sb, " ", oh.getSystemHeaders());
1190
Log.dumpHeaders(sb, " ", oh.getUserHeaders());
1191
Log.logHeaders(sb.toString());
1192
}
1193
List<HeaderFrame> frames = encodeHeaders(oh);
1194
return encodeFrames(frames);
1195
}
1196
1197
private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
1198
if (Log.frames()) {
1199
frames.forEach(f -> Log.logFrames(f, "OUT"));
1200
}
1201
return framesEncoder.encodeFrames(frames);
1202
}
1203
1204
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
1205
Stream<?> stream = oh.getAttachment();
1206
assert stream.streamid == 0;
1207
int streamid = nextstreamid;
1208
if (stream.registerStream(streamid, false)) {
1209
// set outgoing window here. This allows thread sending
1210
// body to proceed.
1211
nextstreamid += 2;
1212
windowController.registerStream(streamid, getInitialSendWindowSize());
1213
return stream;
1214
} else {
1215
stream.cancelImpl(new IOException("Request cancelled"));
1216
if (finalStream() && streams.isEmpty()) {
1217
close();
1218
}
1219
return null;
1220
}
1221
}
1222
1223
private final Object sendlock = new Object();
1224
1225
void sendFrame(Http2Frame frame) {
1226
try {
1227
HttpPublisher publisher = publisher();
1228
synchronized (sendlock) {
1229
if (frame instanceof OutgoingHeaders) {
1230
@SuppressWarnings("unchecked")
1231
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
1232
Stream<?> stream = registerNewStream(oh);
1233
// provide protection from inserting unordered frames between Headers and Continuation
1234
if (stream != null) {
1235
publisher.enqueue(encodeHeaders(oh, stream));
1236
}
1237
} else {
1238
publisher.enqueue(encodeFrame(frame));
1239
}
1240
}
1241
publisher.signalEnqueued();
1242
} catch (IOException e) {
1243
if (!closed) {
1244
Log.logError(e);
1245
shutdown(e);
1246
}
1247
}
1248
}
1249
1250
private List<ByteBuffer> encodeFrame(Http2Frame frame) {
1251
Log.logFrames(frame, "OUT");
1252
return framesEncoder.encodeFrame(frame);
1253
}
1254
1255
void sendDataFrame(DataFrame frame) {
1256
try {
1257
HttpPublisher publisher = publisher();
1258
publisher.enqueue(encodeFrame(frame));
1259
publisher.signalEnqueued();
1260
} catch (IOException e) {
1261
if (!closed) {
1262
Log.logError(e);
1263
shutdown(e);
1264
}
1265
}
1266
}
1267
1268
/*
1269
* Direct call of the method bypasses synchronization on "sendlock" and
1270
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
1271
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
1272
*/
1273
void sendUnorderedFrame(Http2Frame frame) {
1274
try {
1275
HttpPublisher publisher = publisher();
1276
publisher.enqueueUnordered(encodeFrame(frame));
1277
publisher.signalEnqueued();
1278
} catch (IOException e) {
1279
if (!closed) {
1280
Log.logError(e);
1281
shutdown(e);
1282
}
1283
}
1284
}
1285
1286
/**
1287
* A simple tube subscriber for reading from the connection flow.
1288
*/
1289
final class Http2TubeSubscriber implements TubeSubscriber {
1290
private volatile Flow.Subscription subscription;
1291
private volatile boolean completed;
1292
private volatile boolean dropped;
1293
private volatile Throwable error;
1294
private final ConcurrentLinkedQueue<ByteBuffer> queue
1295
= new ConcurrentLinkedQueue<>();
1296
private final SequentialScheduler scheduler =
1297
SequentialScheduler.lockingScheduler(this::processQueue);
1298
private final HttpClientImpl client;
1299
1300
Http2TubeSubscriber(HttpClientImpl client) {
1301
this.client = Objects.requireNonNull(client);
1302
}
1303
1304
final void processQueue() {
1305
try {
1306
while (!queue.isEmpty() && !scheduler.isStopped()) {
1307
ByteBuffer buffer = queue.poll();
1308
if (debug.on())
1309
debug.log("sending %d to Http2Connection.asyncReceive",
1310
buffer.remaining());
1311
asyncReceive(buffer);
1312
}
1313
} catch (Throwable t) {
1314
Throwable x = error;
1315
if (x == null) error = t;
1316
} finally {
1317
Throwable x = error;
1318
if (x != null) {
1319
if (debug.on()) debug.log("Stopping scheduler", x);
1320
scheduler.stop();
1321
Http2Connection.this.shutdown(x);
1322
}
1323
}
1324
}
1325
1326
private final void runOrSchedule() {
1327
if (client.isSelectorThread()) {
1328
scheduler.runOrSchedule(client.theExecutor());
1329
} else scheduler.runOrSchedule();
1330
}
1331
1332
@Override
1333
public void onSubscribe(Flow.Subscription subscription) {
1334
// supports being called multiple time.
1335
// doesn't cancel the previous subscription, since that is
1336
// most probably the same as the new subscription.
1337
assert this.subscription == null || dropped == false;
1338
this.subscription = subscription;
1339
dropped = false;
1340
// TODO FIXME: request(1) should be done by the delegate.
1341
if (!completed) {
1342
if (debug.on())
1343
debug.log("onSubscribe: requesting Long.MAX_VALUE for reading");
1344
subscription.request(Long.MAX_VALUE);
1345
} else {
1346
if (debug.on()) debug.log("onSubscribe: already completed");
1347
}
1348
}
1349
1350
@Override
1351
public void onNext(List<ByteBuffer> item) {
1352
if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
1353
+ " bytes in " + item.size() + " buffers");
1354
queue.addAll(item);
1355
runOrSchedule();
1356
}
1357
1358
@Override
1359
public void onError(Throwable throwable) {
1360
if (debug.on()) debug.log(() -> "onError: " + throwable);
1361
error = throwable;
1362
completed = true;
1363
runOrSchedule();
1364
}
1365
1366
@Override
1367
public void onComplete() {
1368
String msg = isActive()
1369
? "EOF reached while reading"
1370
: "Idle connection closed by HTTP/2 peer";
1371
if (debug.on()) debug.log(msg);
1372
error = new EOFException(msg);
1373
completed = true;
1374
runOrSchedule();
1375
}
1376
1377
@Override
1378
public void dropSubscription() {
1379
if (debug.on()) debug.log("dropSubscription");
1380
// we could probably set subscription to null here...
1381
// then we might not need the 'dropped' boolean?
1382
dropped = true;
1383
}
1384
}
1385
1386
synchronized boolean isActive() {
1387
return numReservedClientStreams > 0 || numReservedServerStreams > 0;
1388
}
1389
1390
@Override
1391
public final String toString() {
1392
return dbgString();
1393
}
1394
1395
final String dbgString() {
1396
return "Http2Connection("
1397
+ connection.getConnectionFlow() + ")";
1398
}
1399
1400
static class HeaderDecoder extends ValidatingHeadersConsumer {
1401
1402
HttpHeadersBuilder headersBuilder;
1403
1404
HeaderDecoder() {
1405
this.headersBuilder = new HttpHeadersBuilder();
1406
}
1407
1408
@Override
1409
public void onDecoded(CharSequence name, CharSequence value) {
1410
String n = name.toString();
1411
String v = value.toString();
1412
super.onDecoded(n, v);
1413
headersBuilder.addHeader(n, v);
1414
}
1415
1416
HttpHeaders headers() {
1417
return headersBuilder.build();
1418
}
1419
}
1420
1421
/*
1422
* Checks RFC 7540 rules (relaxed) compliance regarding pseudo-headers.
1423
*/
1424
static class ValidatingHeadersConsumer implements DecodingCallback {
1425
1426
private static final Set<String> PSEUDO_HEADERS =
1427
Set.of(":authority", ":method", ":path", ":scheme", ":status");
1428
1429
/** Used to check that if there are pseudo-headers, they go first */
1430
private boolean pseudoHeadersEnded;
1431
1432
/**
1433
* Called when END_HEADERS was received. This consumer may be invoked
1434
* again after reset() is called, but for a whole new set of headers.
1435
*/
1436
void reset() {
1437
pseudoHeadersEnded = false;
1438
}
1439
1440
@Override
1441
public void onDecoded(CharSequence name, CharSequence value)
1442
throws UncheckedIOException
1443
{
1444
String n = name.toString();
1445
if (n.startsWith(":")) {
1446
if (pseudoHeadersEnded) {
1447
throw newException("Unexpected pseudo-header '%s'", n);
1448
} else if (!PSEUDO_HEADERS.contains(n)) {
1449
throw newException("Unknown pseudo-header '%s'", n);
1450
}
1451
} else {
1452
pseudoHeadersEnded = true;
1453
if (!Utils.isValidName(n)) {
1454
throw newException("Bad header name '%s'", n);
1455
}
1456
}
1457
String v = value.toString();
1458
if (!Utils.isValidValue(v)) {
1459
throw newException("Bad header value '%s'", v);
1460
}
1461
}
1462
1463
private UncheckedIOException newException(String message, String header)
1464
{
1465
return new UncheckedIOException(
1466
new IOException(String.format(message, header)));
1467
}
1468
}
1469
1470
static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
1471
1472
final int initialWindowSize;
1473
public ConnectionWindowUpdateSender(Http2Connection connection,
1474
int initialWindowSize) {
1475
super(connection, initialWindowSize);
1476
this.initialWindowSize = initialWindowSize;
1477
}
1478
1479
@Override
1480
int getStreamId() {
1481
return 0;
1482
}
1483
}
1484
1485
/**
1486
* Thrown when https handshake negotiates http/1.1 alpn instead of h2
1487
*/
1488
static final class ALPNException extends IOException {
1489
private static final long serialVersionUID = 0L;
1490
final transient AbstractAsyncSSLConnection connection;
1491
1492
ALPNException(String msg, AbstractAsyncSSLConnection connection) {
1493
super(msg);
1494
this.connection = connection;
1495
}
1496
1497
AbstractAsyncSSLConnection getConnection() {
1498
return connection;
1499
}
1500
}
1501
}
1502
1503