Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
41171 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.io.UncheckedIOException;
31
import java.lang.invoke.MethodHandles;
32
import java.lang.invoke.VarHandle;
33
import java.net.URI;
34
import java.nio.ByteBuffer;
35
import java.util.ArrayList;
36
import java.util.Collections;
37
import java.util.List;
38
import java.util.concurrent.CompletableFuture;
39
import java.util.concurrent.ConcurrentLinkedDeque;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41
import java.util.concurrent.Executor;
42
import java.util.concurrent.Flow;
43
import java.util.concurrent.Flow.Subscription;
44
import java.util.concurrent.atomic.AtomicReference;
45
import java.util.function.BiPredicate;
46
import java.net.http.HttpClient;
47
import java.net.http.HttpHeaders;
48
import java.net.http.HttpRequest;
49
import java.net.http.HttpResponse;
50
import java.net.http.HttpResponse.BodySubscriber;
51
import jdk.internal.net.http.common.*;
52
import jdk.internal.net.http.frame.*;
53
import jdk.internal.net.http.hpack.DecodingCallback;
54
55
/**
56
* Http/2 Stream handling.
57
*
58
* REQUESTS
59
*
60
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
61
*
62
* sendRequest() -- sendHeadersOnly() + sendBody()
63
*
64
* sendBodyAsync() -- calls sendBody() in an executor thread.
65
*
66
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
67
*
68
* sendRequestAsync() -- calls sendRequest() in an executor thread
69
*
70
* RESPONSES
71
*
72
* Multiple responses can be received per request. Responses are queued up on
73
* a LinkedList of CF<HttpResponse> and the first one on the list is completed
74
* with the next response
75
*
76
* getResponseAsync() -- queries list of response CFs and returns first one
77
* if one exists. Otherwise, creates one and adds it to list
78
* and returns it. Completion is achieved through the
79
* incoming() upcall from connection reader thread.
80
*
81
* getResponse() -- calls getResponseAsync() and waits for CF to complete
82
*
83
* responseBodyAsync() -- calls responseBody() in an executor thread.
84
*
85
* incoming() -- entry point called from connection reader thread. Frames are
86
* either handled immediately without blocking or for data frames
87
* placed on the stream's inputQ which is consumed by the stream's
88
* reader thread.
89
*
90
* PushedStream sub class
91
* ======================
92
* Sending side methods are not used because the request comes from a PUSH_PROMISE
93
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream
94
* is created. PushedStream does not use responseCF list as there can be only
95
* one response. The CF is created when the object created and when the response
96
* HEADERS frame is received the object is completed.
97
*/
98
class Stream<T> extends ExchangeImpl<T> {
99
100
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
101
102
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
103
final SequentialScheduler sched =
104
SequentialScheduler.lockingScheduler(this::schedule);
105
final SubscriptionBase userSubscription =
106
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
107
108
/**
109
* This stream's identifier. Assigned lazily by the HTTP2Connection before
110
* the stream's first frame is sent.
111
*/
112
protected volatile int streamid;
113
114
long requestContentLen;
115
116
final Http2Connection connection;
117
final HttpRequestImpl request;
118
final HeadersConsumer rspHeadersConsumer;
119
final HttpHeadersBuilder responseHeadersBuilder;
120
final HttpHeaders requestPseudoHeaders;
121
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
122
final HttpRequest.BodyPublisher requestPublisher;
123
volatile RequestSubscriber requestSubscriber;
124
volatile int responseCode;
125
volatile Response response;
126
// The exception with which this stream was canceled.
127
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
128
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
129
volatile CompletableFuture<T> responseBodyCF;
130
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
131
volatile boolean stopRequested;
132
133
/** True if END_STREAM has been seen in a frame received on this stream. */
134
private volatile boolean remotelyClosed;
135
private volatile boolean closed;
136
private volatile boolean endStreamSent;
137
// Indicates the first reason that was invoked when sending a ResetFrame
138
// to the server. A streamState of 0 indicates that no reset was sent.
139
// (see markStream(int code)
140
private volatile int streamState; // assigned using STREAM_STATE varhandle.
141
private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.
142
143
// state flags
144
private boolean requestSent, responseReceived;
145
146
// send lock: prevent sending DataFrames after reset occurred.
147
private final Object sendLock = new Object();
148
149
/**
150
* A reference to this Stream's connection Send Window controller. The
151
* stream MUST acquire the appropriate amount of Send Window before
152
* sending any data. Will be null for PushStreams, as they cannot send data.
153
*/
154
private final WindowController windowController;
155
private final WindowUpdateSender windowUpdater;
156
157
@Override
158
HttpConnection connection() {
159
return connection.connection;
160
}
161
162
/**
163
* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
164
* of after user subscription window has re-opened, from SubscriptionBase.request()
165
*/
166
private void schedule() {
167
boolean onCompleteCalled = false;
168
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
169
try {
170
if (subscriber == null) {
171
subscriber = responseSubscriber = pendingResponseSubscriber;
172
if (subscriber == null) {
173
// can't process anything yet
174
return;
175
} else {
176
if (debug.on()) debug.log("subscribing user subscriber");
177
subscriber.onSubscribe(userSubscription);
178
}
179
}
180
while (!inputQ.isEmpty()) {
181
Http2Frame frame = inputQ.peek();
182
if (frame instanceof ResetFrame) {
183
inputQ.remove();
184
handleReset((ResetFrame)frame, subscriber);
185
return;
186
}
187
DataFrame df = (DataFrame)frame;
188
boolean finished = df.getFlag(DataFrame.END_STREAM);
189
190
List<ByteBuffer> buffers = df.getData();
191
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
192
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
193
if (size == 0 && finished) {
194
inputQ.remove();
195
connection.ensureWindowUpdated(df); // must update connection window
196
Log.logTrace("responseSubscriber.onComplete");
197
if (debug.on()) debug.log("incoming: onComplete");
198
sched.stop();
199
connection.decrementStreamsCount(streamid);
200
subscriber.onComplete();
201
onCompleteCalled = true;
202
setEndStreamReceived();
203
return;
204
} else if (userSubscription.tryDecrement()) {
205
inputQ.remove();
206
Log.logTrace("responseSubscriber.onNext {0}", size);
207
if (debug.on()) debug.log("incoming: onNext(%d)", size);
208
try {
209
subscriber.onNext(dsts);
210
} catch (Throwable t) {
211
connection.dropDataFrame(df); // must update connection window
212
throw t;
213
}
214
if (consumed(df)) {
215
Log.logTrace("responseSubscriber.onComplete");
216
if (debug.on()) debug.log("incoming: onComplete");
217
sched.stop();
218
connection.decrementStreamsCount(streamid);
219
subscriber.onComplete();
220
onCompleteCalled = true;
221
setEndStreamReceived();
222
return;
223
}
224
} else {
225
if (stopRequested) break;
226
return;
227
}
228
}
229
} catch (Throwable throwable) {
230
errorRef.compareAndSet(null, throwable);
231
} finally {
232
if (sched.isStopped()) drainInputQueue();
233
}
234
235
Throwable t = errorRef.get();
236
if (t != null) {
237
sched.stop();
238
try {
239
if (!onCompleteCalled) {
240
if (debug.on())
241
debug.log("calling subscriber.onError: %s", (Object) t);
242
subscriber.onError(t);
243
} else {
244
if (debug.on())
245
debug.log("already completed: dropping error %s", (Object) t);
246
}
247
} catch (Throwable x) {
248
Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
249
} finally {
250
cancelImpl(t);
251
drainInputQueue();
252
}
253
}
254
}
255
256
// must only be called from the scheduler schedule() loop.
257
// ensure that all received data frames are accounted for
258
// in the connection window flow control if the scheduler
259
// is stopped before all the data is consumed.
260
private void drainInputQueue() {
261
Http2Frame frame;
262
while ((frame = inputQ.poll()) != null) {
263
if (frame instanceof DataFrame) {
264
connection.dropDataFrame((DataFrame)frame);
265
}
266
}
267
}
268
269
@Override
270
void nullBody(HttpResponse<T> resp, Throwable t) {
271
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
272
// We should have an END_STREAM data frame waiting in the inputQ.
273
// We need a subscriber to force the scheduler to process it.
274
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
275
sched.runOrSchedule();
276
}
277
278
// Callback invoked after the Response BodySubscriber has consumed the
279
// buffers contained in a DataFrame.
280
// Returns true if END_STREAM is reached, false otherwise.
281
private boolean consumed(DataFrame df) {
282
// RFC 7540 6.1:
283
// The entire DATA frame payload is included in flow control,
284
// including the Pad Length and Padding fields if present
285
int len = df.payloadLength();
286
boolean endStream = df.getFlag(DataFrame.END_STREAM);
287
if (len == 0) return endStream;
288
289
connection.windowUpdater.update(len);
290
291
if (!endStream) {
292
// Don't send window update on a stream which is
293
// closed or half closed.
294
windowUpdater.update(len);
295
}
296
297
// true: end of stream; false: more data coming
298
return endStream;
299
}
300
301
boolean deRegister() {
302
return DEREGISTERED.compareAndSet(this, false, true);
303
}
304
305
@Override
306
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
307
boolean returnConnectionToPool,
308
Executor executor)
309
{
310
try {
311
Log.logTrace("Reading body on stream {0}", streamid);
312
debug.log("Getting BodySubscriber for: " + response);
313
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
314
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
315
316
PushGroup<?> pg = exchange.getPushGroup();
317
if (pg != null) {
318
// if an error occurs make sure it is recorded in the PushGroup
319
cf = cf.whenComplete((t, e) -> pg.pushError(e));
320
}
321
return cf;
322
} catch (Throwable t) {
323
// may be thrown by handler.apply
324
cancelImpl(t);
325
return MinimalFuture.failedFuture(t);
326
}
327
}
328
329
@Override
330
public String toString() {
331
StringBuilder sb = new StringBuilder();
332
sb.append("streamid: ")
333
.append(streamid);
334
return sb.toString();
335
}
336
337
private void receiveDataFrame(DataFrame df) {
338
inputQ.add(df);
339
sched.runOrSchedule();
340
}
341
342
/** Handles a RESET frame. RESET is always handled inline in the queue. */
343
private void receiveResetFrame(ResetFrame frame) {
344
inputQ.add(frame);
345
sched.runOrSchedule();
346
}
347
348
/**
349
* Records the first reason which was invoked when sending a ResetFrame
350
* to the server in the streamState, and return the previous value
351
* of the streamState. This is an atomic operation.
352
* A possible use of this method would be to send a ResetFrame only
353
* if no previous reset frame has been sent.
354
* For instance: <pre>{@code
355
* if (markStream(ResetFrame.CANCEL) == 0) {
356
* connection.sendResetFrame(streamId, ResetFrame.CANCEL);
357
* }
358
* }</pre>
359
* @param code the reason code as per HTTP/2 protocol
360
* @return the previous value of the stream state.
361
*/
362
int markStream(int code) {
363
if (code == 0) return streamState;
364
synchronized (sendLock) {
365
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
366
}
367
}
368
369
private void sendDataFrame(DataFrame frame) {
370
synchronized (sendLock) {
371
// must not send DataFrame after reset.
372
if (streamState == 0) {
373
connection.sendDataFrame(frame);
374
}
375
}
376
}
377
378
// pushes entire response body into response subscriber
379
// blocking when required by local or remote flow control
380
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
381
// We want to allow the subscriber's getBody() method to block so it
382
// can work with InputStreams. So, we offload execution.
383
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
384
new MinimalFuture<>(), this::cancelImpl);
385
386
if (isCanceled()) {
387
Throwable t = getCancelCause();
388
responseBodyCF.completeExceptionally(t);
389
} else {
390
pendingResponseSubscriber = bodySubscriber;
391
sched.runOrSchedule(); // in case data waiting already to be processed
392
}
393
return responseBodyCF;
394
}
395
396
@Override
397
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
398
return sendBodyImpl().thenApply( v -> this);
399
}
400
401
@SuppressWarnings("unchecked")
402
Stream(Http2Connection connection,
403
Exchange<T> e,
404
WindowController windowController)
405
{
406
super(e);
407
this.connection = connection;
408
this.windowController = windowController;
409
this.request = e.request();
410
this.requestPublisher = request.requestPublisher; // may be null
411
this.responseHeadersBuilder = new HttpHeadersBuilder();
412
this.rspHeadersConsumer = new HeadersConsumer();
413
this.requestPseudoHeaders = createPseudoHeaders(request);
414
this.windowUpdater = new StreamWindowUpdateSender(connection);
415
}
416
417
private boolean checkRequestCancelled() {
418
if (exchange.multi.requestCancelled()) {
419
if (errorRef.get() == null) cancel();
420
else sendCancelStreamFrame();
421
return true;
422
}
423
return false;
424
}
425
426
/**
427
* Entry point from Http2Connection reader thread.
428
*
429
* Data frames will be removed by response body thread.
430
*/
431
void incoming(Http2Frame frame) throws IOException {
432
if (debug.on()) debug.log("incoming: %s", frame);
433
var cancelled = checkRequestCancelled() || closed;
434
if ((frame instanceof HeaderFrame)) {
435
HeaderFrame hframe = (HeaderFrame) frame;
436
if (hframe.endHeaders()) {
437
Log.logTrace("handling response (streamid={0})", streamid);
438
handleResponse();
439
}
440
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
441
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
442
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
443
}
444
} else if (frame instanceof DataFrame) {
445
if (cancelled) connection.dropDataFrame((DataFrame) frame);
446
else receiveDataFrame((DataFrame) frame);
447
} else {
448
if (!cancelled) otherFrame(frame);
449
}
450
}
451
452
void otherFrame(Http2Frame frame) throws IOException {
453
switch (frame.type()) {
454
case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);
455
case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);
456
case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);
457
458
default -> throw new IOException("Unexpected frame: " + frame.toString());
459
}
460
}
461
462
// The Hpack decoder decodes into one of these consumers of name,value pairs
463
464
DecodingCallback rspHeadersConsumer() {
465
return rspHeadersConsumer;
466
}
467
468
protected void handleResponse() throws IOException {
469
HttpHeaders responseHeaders = responseHeadersBuilder.build();
470
responseCode = (int)responseHeaders
471
.firstValueAsLong(":status")
472
.orElseThrow(() -> new IOException("no statuscode in response"));
473
474
response = new Response(
475
request, exchange, responseHeaders, connection(),
476
responseCode, HttpClient.Version.HTTP_2);
477
478
/* TODO: review if needs to be removed
479
the value is not used, but in case `content-length` doesn't parse as
480
long, there will be NumberFormatException. If left as is, make sure
481
code up the stack handles NFE correctly. */
482
responseHeaders.firstValueAsLong("content-length");
483
484
if (Log.headers()) {
485
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
486
Log.dumpHeaders(sb, " ", responseHeaders);
487
Log.logHeaders(sb.toString());
488
}
489
490
// this will clear the response headers
491
rspHeadersConsumer.reset();
492
493
completeResponse(response);
494
}
495
496
void incoming_reset(ResetFrame frame) {
497
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
498
if (endStreamReceived()) {
499
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
500
} else if (closed) {
501
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
502
} else {
503
Flow.Subscriber<?> subscriber =
504
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
505
if (response == null && subscriber == null) {
506
// we haven't receive the headers yet, and won't receive any!
507
// handle reset now.
508
handleReset(frame, subscriber);
509
} else {
510
// put it in the input queue in order to read all
511
// pending data frames first. Indeed, a server may send
512
// RST_STREAM after sending END_STREAM, in which case we should
513
// ignore it. However, we won't know if we have received END_STREAM
514
// or not until all pending data frames are read.
515
receiveResetFrame(frame);
516
// RST_STREAM was pushed to the queue. It will be handled by
517
// asyncReceive after all pending data frames have been
518
// processed.
519
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
520
}
521
}
522
}
523
524
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
525
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
526
if (!closed) {
527
synchronized (this) {
528
if (closed) {
529
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
530
return;
531
}
532
closed = true;
533
}
534
try {
535
int error = frame.getErrorCode();
536
IOException e = new IOException("Received RST_STREAM: "
537
+ ErrorFrame.stringForCode(error));
538
if (errorRef.compareAndSet(null, e)) {
539
if (subscriber != null) {
540
subscriber.onError(e);
541
}
542
}
543
completeResponseExceptionally(e);
544
if (!requestBodyCF.isDone()) {
545
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
546
}
547
if (responseBodyCF != null) {
548
responseBodyCF.completeExceptionally(errorRef.get());
549
}
550
} finally {
551
connection.decrementStreamsCount(streamid);
552
connection.closeStream(streamid);
553
}
554
} else {
555
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
556
}
557
}
558
559
void incoming_priority(PriorityFrame frame) {
560
// TODO: implement priority
561
throw new UnsupportedOperationException("Not implemented");
562
}
563
564
private void incoming_windowUpdate(WindowUpdateFrame frame)
565
throws IOException
566
{
567
int amount = frame.getUpdate();
568
if (amount <= 0) {
569
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
570
streamid, amount);
571
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
572
} else {
573
assert streamid != 0;
574
boolean success = windowController.increaseStreamWindow(amount, streamid);
575
if (!success) { // overflow
576
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
577
}
578
}
579
}
580
581
void incoming_pushPromise(HttpRequestImpl pushRequest,
582
PushedStream<T> pushStream)
583
throws IOException
584
{
585
if (Log.requests()) {
586
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
587
}
588
PushGroup<T> pushGroup = exchange.getPushGroup();
589
if (pushGroup == null || exchange.multi.requestCancelled()) {
590
Log.logTrace("Rejecting push promise stream " + streamid);
591
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
592
pushStream.close();
593
return;
594
}
595
596
PushGroup.Acceptor<T> acceptor = null;
597
boolean accepted = false;
598
try {
599
acceptor = pushGroup.acceptPushRequest(pushRequest);
600
accepted = acceptor.accepted();
601
} catch (Throwable t) {
602
if (debug.on())
603
debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
604
(Object)t);
605
}
606
if (!accepted) {
607
// cancel / reject
608
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
609
if (Log.trace()) {
610
Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
611
ex.getMessage());
612
}
613
pushStream.cancelImpl(ex);
614
return;
615
}
616
617
assert accepted && acceptor != null;
618
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
619
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
620
assert pushHandler != null;
621
622
pushStream.requestSent();
623
pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
624
// setup housekeeping for when the push is received
625
// TODO: deal with ignoring of CF anti-pattern
626
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
627
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
628
t = Utils.getCompletionCause(t);
629
if (Log.trace()) {
630
Log.logTrace("Push completed on stream {0} for {1}{2}",
631
pushStream.streamid, resp,
632
((t==null) ? "": " with exception " + t));
633
}
634
if (t != null) {
635
pushGroup.pushError(t);
636
pushResponseCF.completeExceptionally(t);
637
} else {
638
pushResponseCF.complete(resp);
639
}
640
pushGroup.pushCompleted();
641
});
642
643
}
644
645
private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
646
HttpHeadersBuilder h = request.getSystemHeadersBuilder();
647
if (contentLength > 0) {
648
h.setHeader("content-length", Long.toString(contentLength));
649
}
650
HttpHeaders sysh = filterHeaders(h.build());
651
HttpHeaders userh = filterHeaders(request.getUserHeaders());
652
// Filter context restricted from userHeaders
653
userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));
654
655
final HttpHeaders uh = userh;
656
657
// Filter any headers from systemHeaders that are set in userHeaders
658
sysh = HttpHeaders.of(sysh.map(), (k,v) -> uh.firstValue(k).isEmpty());
659
660
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
661
if (contentLength == 0) {
662
f.setFlag(HeadersFrame.END_STREAM);
663
endStreamSent = true;
664
}
665
return f;
666
}
667
668
private boolean hasProxyAuthorization(HttpHeaders headers) {
669
return headers.firstValue("proxy-authorization")
670
.isPresent();
671
}
672
673
// Determines whether we need to build a new HttpHeader object.
674
//
675
// Ideally we should pass the filter to OutgoingHeaders refactor the
676
// code that creates the HeaderFrame to honor the filter.
677
// We're not there yet - so depending on the filter we need to
678
// apply and the content of the header we will try to determine
679
// whether anything might need to be filtered.
680
// If nothing needs filtering then we can just use the
681
// original headers.
682
private boolean needsFiltering(HttpHeaders headers,
683
BiPredicate<String, String> filter) {
684
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
685
// we're either connecting or proxying
686
// slight optimization: we only need to filter out
687
// disabled schemes, so if there are none just
688
// pass through.
689
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
690
&& hasProxyAuthorization(headers);
691
} else {
692
// we're talking to a server, either directly or through
693
// a tunnel.
694
// Slight optimization: we only need to filter out
695
// proxy authorization headers, so if there are none just
696
// pass through.
697
return hasProxyAuthorization(headers);
698
}
699
}
700
701
private HttpHeaders filterHeaders(HttpHeaders headers) {
702
HttpConnection conn = connection();
703
BiPredicate<String, String> filter = conn.headerFilter(request);
704
if (needsFiltering(headers, filter)) {
705
return HttpHeaders.of(headers.map(), filter);
706
}
707
return headers;
708
}
709
710
private static HttpHeaders createPseudoHeaders(HttpRequest request) {
711
HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
712
String method = request.method();
713
hdrs.setHeader(":method", method);
714
URI uri = request.uri();
715
hdrs.setHeader(":scheme", uri.getScheme());
716
// TODO: userinfo deprecated. Needs to be removed
717
hdrs.setHeader(":authority", uri.getAuthority());
718
// TODO: ensure header names beginning with : not in user headers
719
String query = uri.getRawQuery();
720
String path = uri.getRawPath();
721
if (path == null || path.isEmpty()) {
722
if (method.equalsIgnoreCase("OPTIONS")) {
723
path = "*";
724
} else {
725
path = "/";
726
}
727
}
728
if (query != null) {
729
path += "?" + query;
730
}
731
hdrs.setHeader(":path", Utils.encode(path));
732
return hdrs.build();
733
}
734
735
HttpHeaders getRequestPseudoHeaders() {
736
return requestPseudoHeaders;
737
}
738
739
/** Sets endStreamReceived. Should be called only once. */
740
void setEndStreamReceived() {
741
if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
742
assert remotelyClosed == false: "Unexpected endStream already set";
743
remotelyClosed = true;
744
responseReceived();
745
}
746
747
/** Tells whether, or not, the END_STREAM Flag has been seen in any frame
748
* received on this stream. */
749
private boolean endStreamReceived() {
750
return remotelyClosed;
751
}
752
753
@Override
754
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
755
if (debug.on()) debug.log("sendHeadersOnly()");
756
if (Log.requests() && request != null) {
757
Log.logRequest(request.toString());
758
}
759
if (requestPublisher != null) {
760
requestContentLen = requestPublisher.contentLength();
761
} else {
762
requestContentLen = 0;
763
}
764
765
// At this point the stream doesn't have a streamid yet.
766
// It will be allocated if we send the request headers.
767
Throwable t = errorRef.get();
768
if (t != null) {
769
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
770
return MinimalFuture.failedFuture(t);
771
}
772
773
// sending the headers will cause the allocation of the stream id
774
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
775
connection.sendFrame(f);
776
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
777
cf.complete(this); // #### good enough for now
778
return cf;
779
}
780
781
@Override
782
void released() {
783
if (streamid > 0) {
784
if (debug.on()) debug.log("Released stream %d", streamid);
785
// remove this stream from the Http2Connection map.
786
connection.decrementStreamsCount(streamid);
787
connection.closeStream(streamid);
788
} else {
789
if (debug.on()) debug.log("Can't release stream %d", streamid);
790
}
791
}
792
793
@Override
794
void completed() {
795
// There should be nothing to do here: the stream should have
796
// been already closed (or will be closed shortly after).
797
}
798
799
boolean registerStream(int id, boolean registerIfCancelled) {
800
boolean cancelled = closed || exchange.multi.requestCancelled();
801
if (!cancelled || registerIfCancelled) {
802
this.streamid = id;
803
connection.putStream(this, streamid);
804
if (debug.on()) {
805
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
806
streamid, cancelled, registerIfCancelled);
807
}
808
}
809
return !cancelled;
810
}
811
812
void signalWindowUpdate() {
813
RequestSubscriber subscriber = requestSubscriber;
814
assert subscriber != null;
815
if (debug.on()) debug.log("Signalling window update");
816
subscriber.sendScheduler.runOrSchedule();
817
}
818
819
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
820
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
821
// can be < 0 if the actual length is not known.
822
private final long contentLength;
823
private volatile long remainingContentLength;
824
private volatile Subscription subscription;
825
826
// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
827
// 1) The data that was published by the request body Publisher, and
828
// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
829
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
830
831
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
832
// A scheduler used to honor window updates. Writing must be paused
833
// when the window is exhausted, and resumed when the window acquires
834
// some space. The sendScheduler makes it possible to implement this
835
// behaviour in an asynchronous non-blocking way.
836
// See RequestSubscriber::trySend below.
837
final SequentialScheduler sendScheduler;
838
839
RequestSubscriber(long contentLen) {
840
this.contentLength = contentLen;
841
this.remainingContentLength = contentLen;
842
this.sendScheduler =
843
SequentialScheduler.lockingScheduler(this::trySend);
844
}
845
846
@Override
847
public void onSubscribe(Flow.Subscription subscription) {
848
if (this.subscription != null) {
849
throw new IllegalStateException("already subscribed");
850
}
851
this.subscription = subscription;
852
if (debug.on())
853
debug.log("RequestSubscriber: onSubscribe, request 1");
854
subscription.request(1);
855
}
856
857
@Override
858
public void onNext(ByteBuffer item) {
859
if (debug.on())
860
debug.log("RequestSubscriber: onNext(%d)", item.remaining());
861
int size = outgoing.size();
862
assert size == 0 : "non-zero size: " + size;
863
onNextImpl(item);
864
}
865
866
private void onNextImpl(ByteBuffer item) {
867
// Got some more request body bytes to send.
868
if (requestBodyCF.isDone()) {
869
// stream already cancelled, probably in timeout
870
sendScheduler.stop();
871
subscription.cancel();
872
return;
873
}
874
outgoing.add(item);
875
sendScheduler.runOrSchedule();
876
}
877
878
@Override
879
public void onError(Throwable throwable) {
880
if (debug.on())
881
debug.log(() -> "RequestSubscriber: onError: " + throwable);
882
// ensure that errors are handled within the flow.
883
if (errorRef.compareAndSet(null, throwable)) {
884
sendScheduler.runOrSchedule();
885
}
886
}
887
888
@Override
889
public void onComplete() {
890
if (debug.on()) debug.log("RequestSubscriber: onComplete");
891
int size = outgoing.size();
892
assert size == 0 || size == 1 : "non-zero or one size: " + size;
893
// last byte of request body has been obtained.
894
// ensure that everything is completed within the flow.
895
onNextImpl(COMPLETED);
896
}
897
898
// Attempts to send the data, if any.
899
// Handles errors and completion state.
900
// Pause writing if the send window is exhausted, resume it if the
901
// send window has some bytes that can be acquired.
902
void trySend() {
903
try {
904
// handle errors raised by onError;
905
Throwable t = errorRef.get();
906
if (t != null) {
907
sendScheduler.stop();
908
if (requestBodyCF.isDone()) return;
909
subscription.cancel();
910
requestBodyCF.completeExceptionally(t);
911
cancelImpl(t);
912
return;
913
}
914
int state = streamState;
915
916
do {
917
// handle COMPLETED;
918
ByteBuffer item = outgoing.peekFirst();
919
if (item == null) return;
920
else if (item == COMPLETED) {
921
sendScheduler.stop();
922
complete();
923
return;
924
}
925
926
// handle bytes to send downstream
927
while (item.hasRemaining() && state == 0) {
928
if (debug.on()) debug.log("trySend: %d", item.remaining());
929
DataFrame df = getDataFrame(item);
930
if (df == null) {
931
if (debug.on())
932
debug.log("trySend: can't send yet: %d", item.remaining());
933
return; // the send window is exhausted: come back later
934
}
935
936
if (contentLength > 0) {
937
remainingContentLength -= df.getDataLength();
938
if (remainingContentLength < 0) {
939
String msg = connection().getConnectionFlow()
940
+ " stream=" + streamid + " "
941
+ "[" + Thread.currentThread().getName() + "] "
942
+ "Too many bytes in request body. Expected: "
943
+ contentLength + ", got: "
944
+ (contentLength - remainingContentLength);
945
assert streamid > 0;
946
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
947
throw new IOException(msg);
948
} else if (remainingContentLength == 0) {
949
assert !endStreamSent : "internal error, send data after END_STREAM flag";
950
df.setFlag(DataFrame.END_STREAM);
951
endStreamSent = true;
952
}
953
} else {
954
assert !endStreamSent : "internal error, send data after END_STREAM flag";
955
}
956
if ((state = streamState) != 0) {
957
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
958
break;
959
}
960
if (debug.on())
961
debug.log("trySend: sending: %d", df.getDataLength());
962
sendDataFrame(df);
963
}
964
if (state != 0) break;
965
assert !item.hasRemaining();
966
ByteBuffer b = outgoing.removeFirst();
967
assert b == item;
968
} while (outgoing.peekFirst() != null);
969
970
if (state != 0) {
971
t = errorRef.get();
972
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
973
throw t;
974
}
975
976
if (debug.on()) debug.log("trySend: request 1");
977
subscription.request(1);
978
} catch (Throwable ex) {
979
if (debug.on()) debug.log("trySend: ", ex);
980
sendScheduler.stop();
981
subscription.cancel();
982
requestBodyCF.completeExceptionally(ex);
983
// need to cancel the stream to 1. tell the server
984
// we don't want to receive any more data and
985
// 2. ensure that the operation ref count will be
986
// decremented on the HttpClient.
987
cancelImpl(ex);
988
}
989
}
990
991
private void complete() throws IOException {
992
long remaining = remainingContentLength;
993
long written = contentLength - remaining;
994
if (remaining > 0) {
995
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
996
// let trySend() handle the exception
997
throw new IOException(connection().getConnectionFlow()
998
+ " stream=" + streamid + " "
999
+ "[" + Thread.currentThread().getName() +"] "
1000
+ "Too few bytes returned by the publisher ("
1001
+ written + "/"
1002
+ contentLength + ")");
1003
}
1004
if (!endStreamSent) {
1005
endStreamSent = true;
1006
connection.sendDataFrame(getEmptyEndStreamDataFrame());
1007
}
1008
requestBodyCF.complete(null);
1009
}
1010
}
1011
1012
/**
1013
* Send a RESET frame to tell server to stop sending data on this stream
1014
*/
1015
@Override
1016
public CompletableFuture<Void> ignoreBody() {
1017
try {
1018
connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
1019
return MinimalFuture.completedFuture(null);
1020
} catch (Throwable e) {
1021
Log.logTrace("Error resetting stream {0}", e.toString());
1022
return MinimalFuture.failedFuture(e);
1023
}
1024
}
1025
1026
DataFrame getDataFrame(ByteBuffer buffer) {
1027
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
1028
// blocks waiting for stream send window, if exhausted
1029
int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
1030
if (actualAmount <= 0) return null;
1031
ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);
1032
DataFrame df = new DataFrame(streamid, 0 , outBuf);
1033
return df;
1034
}
1035
1036
private DataFrame getEmptyEndStreamDataFrame() {
1037
return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
1038
}
1039
1040
/**
1041
* A List of responses relating to this stream. Normally there is only
1042
* one response, but intermediate responses like 100 are allowed
1043
* and must be passed up to higher level before continuing. Deals with races
1044
* such as if responses are returned before the CFs get created by
1045
* getResponseAsync()
1046
*/
1047
1048
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
1049
1050
@Override
1051
CompletableFuture<Response> getResponseAsync(Executor executor) {
1052
CompletableFuture<Response> cf;
1053
// The code below deals with race condition that can be caused when
1054
// completeResponse() is being called before getResponseAsync()
1055
synchronized (response_cfs) {
1056
if (!response_cfs.isEmpty()) {
1057
// This CompletableFuture was created by completeResponse().
1058
// it will be already completed.
1059
cf = response_cfs.remove(0);
1060
// if we find a cf here it should be already completed.
1061
// finding a non completed cf should not happen. just assert it.
1062
assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
1063
} else {
1064
// getResponseAsync() is called first. Create a CompletableFuture
1065
// that will be completed by completeResponse() when
1066
// completeResponse() is called.
1067
cf = new MinimalFuture<>();
1068
response_cfs.add(cf);
1069
}
1070
}
1071
if (executor != null && !cf.isDone()) {
1072
// protect from executing later chain of CompletableFuture operations from SelectorManager thread
1073
cf = cf.thenApplyAsync(r -> r, executor);
1074
}
1075
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
1076
PushGroup<?> pg = exchange.getPushGroup();
1077
if (pg != null) {
1078
// if an error occurs make sure it is recorded in the PushGroup
1079
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
1080
}
1081
return cf;
1082
}
1083
1084
/**
1085
* Completes the first uncompleted CF on list, and removes it. If there is no
1086
* uncompleted CF then creates one (completes it) and adds to list
1087
*/
1088
void completeResponse(Response resp) {
1089
synchronized (response_cfs) {
1090
CompletableFuture<Response> cf;
1091
int cfs_len = response_cfs.size();
1092
for (int i=0; i<cfs_len; i++) {
1093
cf = response_cfs.get(i);
1094
if (!cf.isDone()) {
1095
Log.logTrace("Completing response (streamid={0}): {1}",
1096
streamid, cf);
1097
if (debug.on())
1098
debug.log("Completing responseCF(%d) with response headers", i);
1099
response_cfs.remove(cf);
1100
cf.complete(resp);
1101
return;
1102
} // else we found the previous response: just leave it alone.
1103
}
1104
cf = MinimalFuture.completedFuture(resp);
1105
Log.logTrace("Created completed future (streamid={0}): {1}",
1106
streamid, cf);
1107
if (debug.on())
1108
debug.log("Adding completed responseCF(0) with response headers");
1109
response_cfs.add(cf);
1110
}
1111
}
1112
1113
// methods to update state and remove stream when finished
1114
1115
synchronized void requestSent() {
1116
requestSent = true;
1117
if (responseReceived) {
1118
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
1119
close();
1120
} else {
1121
if (debug.on()) {
1122
debug.log("requestSent: streamid=%d but response not received", streamid);
1123
}
1124
}
1125
}
1126
1127
synchronized void responseReceived() {
1128
responseReceived = true;
1129
if (requestSent) {
1130
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
1131
close();
1132
} else {
1133
if (debug.on()) {
1134
debug.log("responseReceived: streamid=%d but request not sent", streamid);
1135
}
1136
}
1137
}
1138
1139
/**
1140
* same as above but for errors
1141
*/
1142
void completeResponseExceptionally(Throwable t) {
1143
synchronized (response_cfs) {
1144
// use index to avoid ConcurrentModificationException
1145
// caused by removing the CF from within the loop.
1146
for (int i = 0; i < response_cfs.size(); i++) {
1147
CompletableFuture<Response> cf = response_cfs.get(i);
1148
if (!cf.isDone()) {
1149
response_cfs.remove(i);
1150
cf.completeExceptionally(t);
1151
return;
1152
}
1153
}
1154
response_cfs.add(MinimalFuture.failedFuture(t));
1155
}
1156
}
1157
1158
CompletableFuture<Void> sendBodyImpl() {
1159
requestBodyCF.whenComplete((v, t) -> requestSent());
1160
try {
1161
if (requestPublisher != null) {
1162
final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
1163
requestPublisher.subscribe(requestSubscriber = subscriber);
1164
} else {
1165
// there is no request body, therefore the request is complete,
1166
// END_STREAM has already sent with outgoing headers
1167
requestBodyCF.complete(null);
1168
}
1169
} catch (Throwable t) {
1170
cancelImpl(t);
1171
requestBodyCF.completeExceptionally(t);
1172
}
1173
return requestBodyCF;
1174
}
1175
1176
@Override
1177
void cancel() {
1178
if ((streamid == 0)) {
1179
cancel(new IOException("Stream cancelled before streamid assigned"));
1180
} else {
1181
cancel(new IOException("Stream " + streamid + " cancelled"));
1182
}
1183
}
1184
1185
void onSubscriptionError(Throwable t) {
1186
errorRef.compareAndSet(null, t);
1187
if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
1188
// This is the special case where the subscriber
1189
// has requested an illegal number of items.
1190
// In this case, the error doesn't come from
1191
// upstream, but from downstream, and we need to
1192
// handle the error without waiting for the inputQ
1193
// to be exhausted.
1194
stopRequested = true;
1195
sched.runOrSchedule();
1196
}
1197
1198
@Override
1199
void cancel(IOException cause) {
1200
cancelImpl(cause);
1201
}
1202
1203
void connectionClosing(Throwable cause) {
1204
Flow.Subscriber<?> subscriber =
1205
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
1206
errorRef.compareAndSet(null, cause);
1207
if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
1208
sched.runOrSchedule();
1209
} else cancelImpl(cause);
1210
}
1211
1212
// This method sends a RST_STREAM frame
1213
void cancelImpl(Throwable e) {
1214
errorRef.compareAndSet(null, e);
1215
if (debug.on()) {
1216
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
1217
else debug.log("cancelling stream %d: %s", streamid, e);
1218
}
1219
if (Log.trace()) {
1220
if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
1221
else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
1222
}
1223
boolean closing;
1224
if (closing = !closed) { // assigning closing to !closed
1225
synchronized (this) {
1226
if (closing = !closed) { // assigning closing to !closed
1227
closed=true;
1228
}
1229
}
1230
}
1231
if (closing) { // true if the stream has not been closed yet
1232
if (responseSubscriber != null || pendingResponseSubscriber != null)
1233
sched.runOrSchedule();
1234
}
1235
completeResponseExceptionally(e);
1236
if (!requestBodyCF.isDone()) {
1237
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
1238
}
1239
if (responseBodyCF != null) {
1240
responseBodyCF.completeExceptionally(errorRef.get());
1241
}
1242
try {
1243
// will send a RST_STREAM frame
1244
if (streamid != 0 && streamState == 0) {
1245
e = Utils.getCompletionCause(e);
1246
if (e instanceof EOFException) {
1247
// read EOF: no need to try & send reset
1248
connection.decrementStreamsCount(streamid);
1249
connection.closeStream(streamid);
1250
} else {
1251
// no use to send CANCEL if already closed.
1252
sendCancelStreamFrame();
1253
}
1254
}
1255
} catch (Throwable ex) {
1256
Log.logError(ex);
1257
}
1258
}
1259
1260
void sendCancelStreamFrame() {
1261
// do not reset a stream until it has a streamid.
1262
if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {
1263
connection.resetStream(streamid, ResetFrame.CANCEL);
1264
}
1265
close();
1266
}
1267
1268
// This method doesn't send any frame
1269
void close() {
1270
if (closed) return;
1271
synchronized(this) {
1272
if (closed) return;
1273
closed = true;
1274
}
1275
if (debug.on()) debug.log("close stream %d", streamid);
1276
Log.logTrace("Closing stream {0}", streamid);
1277
connection.closeStream(streamid);
1278
Log.logTrace("Stream {0} closed", streamid);
1279
}
1280
1281
static class PushedStream<T> extends Stream<T> {
1282
final PushGroup<T> pushGroup;
1283
// push streams need the response CF allocated up front as it is
1284
// given directly to user via the multi handler callback function.
1285
final CompletableFuture<Response> pushCF;
1286
CompletableFuture<HttpResponse<T>> responseCF;
1287
final HttpRequestImpl pushReq;
1288
HttpResponse.BodyHandler<T> pushHandler;
1289
1290
PushedStream(PushGroup<T> pushGroup,
1291
Http2Connection connection,
1292
Exchange<T> pushReq) {
1293
// ## no request body possible, null window controller
1294
super(connection, pushReq, null);
1295
this.pushGroup = pushGroup;
1296
this.pushReq = pushReq.request();
1297
this.pushCF = new MinimalFuture<>();
1298
this.responseCF = new MinimalFuture<>();
1299
1300
}
1301
1302
CompletableFuture<HttpResponse<T>> responseCF() {
1303
return responseCF;
1304
}
1305
1306
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
1307
this.pushHandler = pushHandler;
1308
}
1309
1310
synchronized HttpResponse.BodyHandler<T> getPushHandler() {
1311
// ignored parameters to function can be used as BodyHandler
1312
return this.pushHandler;
1313
}
1314
1315
// Following methods call the super class but in case of
1316
// error record it in the PushGroup. The error method is called
1317
// with a null value when no error occurred (is a no-op)
1318
@Override
1319
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
1320
return super.sendBodyAsync()
1321
.whenComplete((ExchangeImpl<T> v, Throwable t)
1322
-> pushGroup.pushError(Utils.getCompletionCause(t)));
1323
}
1324
1325
@Override
1326
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
1327
return super.sendHeadersAsync()
1328
.whenComplete((ExchangeImpl<T> ex, Throwable t)
1329
-> pushGroup.pushError(Utils.getCompletionCause(t)));
1330
}
1331
1332
@Override
1333
CompletableFuture<Response> getResponseAsync(Executor executor) {
1334
CompletableFuture<Response> cf = pushCF.whenComplete(
1335
(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1336
if(executor!=null && !cf.isDone()) {
1337
cf = cf.thenApplyAsync( r -> r, executor);
1338
}
1339
return cf;
1340
}
1341
1342
@Override
1343
CompletableFuture<T> readBodyAsync(
1344
HttpResponse.BodyHandler<T> handler,
1345
boolean returnConnectionToPool,
1346
Executor executor)
1347
{
1348
return super.readBodyAsync(handler, returnConnectionToPool, executor)
1349
.whenComplete((v, t) -> pushGroup.pushError(t));
1350
}
1351
1352
@Override
1353
void completeResponse(Response r) {
1354
Log.logResponse(r::toString);
1355
pushCF.complete(r); // not strictly required for push API
1356
// start reading the body using the obtained BodySubscriber
1357
CompletableFuture<Void> start = new MinimalFuture<>();
1358
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1359
.whenComplete((T body, Throwable t) -> {
1360
if (t != null) {
1361
responseCF.completeExceptionally(t);
1362
} else {
1363
HttpResponseImpl<T> resp =
1364
new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1365
responseCF.complete(resp);
1366
}
1367
});
1368
start.completeAsync(() -> null, getExchange().executor());
1369
}
1370
1371
@Override
1372
void completeResponseExceptionally(Throwable t) {
1373
pushCF.completeExceptionally(t);
1374
}
1375
1376
// @Override
1377
// synchronized void responseReceived() {
1378
// super.responseReceived();
1379
// }
1380
1381
// create and return the PushResponseImpl
1382
@Override
1383
protected void handleResponse() {
1384
HttpHeaders responseHeaders = responseHeadersBuilder.build();
1385
responseCode = (int)responseHeaders
1386
.firstValueAsLong(":status")
1387
.orElse(-1);
1388
1389
if (responseCode == -1) {
1390
completeResponseExceptionally(new IOException("No status code"));
1391
}
1392
1393
this.response = new Response(
1394
pushReq, exchange, responseHeaders, connection(),
1395
responseCode, HttpClient.Version.HTTP_2);
1396
1397
/* TODO: review if needs to be removed
1398
the value is not used, but in case `content-length` doesn't parse
1399
as long, there will be NumberFormatException. If left as is, make
1400
sure code up the stack handles NFE correctly. */
1401
responseHeaders.firstValueAsLong("content-length");
1402
1403
if (Log.headers()) {
1404
StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1405
sb.append(" (streamid=").append(streamid).append("):\n");
1406
Log.dumpHeaders(sb, " ", responseHeaders);
1407
Log.logHeaders(sb.toString());
1408
}
1409
1410
rspHeadersConsumer.reset();
1411
1412
// different implementations for normal streams and pushed streams
1413
completeResponse(response);
1414
}
1415
}
1416
1417
final class StreamWindowUpdateSender extends WindowUpdateSender {
1418
1419
StreamWindowUpdateSender(Http2Connection connection) {
1420
super(connection);
1421
}
1422
1423
@Override
1424
int getStreamId() {
1425
return streamid;
1426
}
1427
1428
@Override
1429
String dbgString() {
1430
String dbg = dbgString;
1431
if (dbg != null) return dbg;
1432
if (streamid == 0) {
1433
return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
1434
} else {
1435
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
1436
return dbgString = dbg;
1437
}
1438
}
1439
}
1440
1441
/**
1442
* Returns true if this exchange was canceled.
1443
* @return true if this exchange was canceled.
1444
*/
1445
synchronized boolean isCanceled() {
1446
return errorRef.get() != null;
1447
}
1448
1449
/**
1450
* Returns the cause for which this exchange was canceled, if available.
1451
* @return the cause for which this exchange was canceled, if available.
1452
*/
1453
synchronized Throwable getCancelCause() {
1454
return errorRef.get();
1455
}
1456
1457
final String dbgString() {
1458
return connection.dbgString() + "/Stream("+streamid+")";
1459
}
1460
1461
private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
1462
1463
void reset() {
1464
super.reset();
1465
responseHeadersBuilder.clear();
1466
debug.log("Response builder cleared, ready to receive new headers.");
1467
}
1468
1469
@Override
1470
public void onDecoded(CharSequence name, CharSequence value)
1471
throws UncheckedIOException
1472
{
1473
String n = name.toString();
1474
String v = value.toString();
1475
super.onDecoded(n, v);
1476
responseHeadersBuilder.addHeader(n, v);
1477
if (Log.headers() && Log.trace()) {
1478
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
1479
streamid, n, v);
1480
}
1481
}
1482
}
1483
1484
private static final VarHandle STREAM_STATE;
1485
private static final VarHandle DEREGISTERED;
1486
static {
1487
try {
1488
STREAM_STATE = MethodHandles.lookup()
1489
.findVarHandle(Stream.class, "streamState", int.class);
1490
DEREGISTERED = MethodHandles.lookup()
1491
.findVarHandle(Stream.class, "deRegistered", boolean.class);
1492
} catch (Exception x) {
1493
throw new ExceptionInInitializerError(x);
1494
}
1495
}
1496
}
1497
1498