Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
41171 views
1
/*
2
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.lang.System.Logger.Level;
30
import java.nio.ByteBuffer;
31
import java.util.List;
32
import java.util.concurrent.CompletableFuture;
33
import java.util.concurrent.CompletionStage;
34
import java.util.concurrent.Executor;
35
import java.util.concurrent.Flow;
36
import java.util.concurrent.atomic.AtomicBoolean;
37
import java.util.concurrent.atomic.AtomicLong;
38
import java.util.function.Consumer;
39
import java.util.function.Function;
40
import java.net.http.HttpHeaders;
41
import java.net.http.HttpResponse;
42
import jdk.internal.net.http.ResponseContent.BodyParser;
43
import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser;
44
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
45
import jdk.internal.net.http.common.Log;
46
import jdk.internal.net.http.common.Logger;
47
import jdk.internal.net.http.common.MinimalFuture;
48
import jdk.internal.net.http.common.Utils;
49
import static java.net.http.HttpClient.Version.HTTP_1_1;
50
import static java.net.http.HttpResponse.BodySubscribers.discarding;
51
import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
52
import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED;
53
54
/**
55
* Handles a HTTP/1.1 response (headers + body).
56
* There can be more than one of these per Http exchange.
57
*/
58
class Http1Response<T> {
59
60
private volatile ResponseContent content;
61
private final HttpRequestImpl request;
62
private Response response;
63
private final HttpConnection connection;
64
private HttpHeaders headers;
65
private int responseCode;
66
private final Http1Exchange<T> exchange;
67
private boolean return2Cache; // return connection to cache when finished
68
private final HeadersReader headersReader; // used to read the headers
69
private final BodyReader bodyReader; // used to read the body
70
private final Http1AsyncReceiver asyncReceiver;
71
private volatile EOFException eof;
72
private volatile BodyParser bodyParser;
73
// max number of bytes of (fixed length) body to ignore on redirect
74
private final static int MAX_IGNORE = 1024;
75
76
// Revisit: can we get rid of this?
77
static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
78
private volatile State readProgress = State.INITIAL;
79
80
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
81
final static AtomicLong responseCount = new AtomicLong();
82
final long id = responseCount.incrementAndGet();
83
private Http1HeaderParser hd;
84
85
Http1Response(HttpConnection conn,
86
Http1Exchange<T> exchange,
87
Http1AsyncReceiver asyncReceiver) {
88
this.readProgress = State.INITIAL;
89
this.request = exchange.request();
90
this.exchange = exchange;
91
this.connection = conn;
92
this.asyncReceiver = asyncReceiver;
93
headersReader = new HeadersReader(this::advance);
94
bodyReader = new BodyReader(this::advance);
95
96
hd = new Http1HeaderParser();
97
readProgress = State.READING_HEADERS;
98
headersReader.start(hd);
99
asyncReceiver.subscribe(headersReader);
100
}
101
102
String dbgTag;
103
private String dbgString() {
104
String dbg = dbgTag;
105
if (dbg == null) {
106
String cdbg = connection.dbgTag;
107
if (cdbg != null) {
108
dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
109
} else {
110
dbg = "Http1Response(id=" + id + ")";
111
}
112
}
113
return dbg;
114
}
115
116
// The ClientRefCountTracker is used to track the state
117
// of a pending operation. Altough there usually is a single
118
// point where the operation starts, it may terminate at
119
// different places.
120
private final class ClientRefCountTracker {
121
final HttpClientImpl client = connection.client();
122
// state & 0x01 != 0 => acquire called
123
// state & 0x02 != 0 => tryRelease called
124
byte state;
125
126
public synchronized void acquire() {
127
if (state == 0) {
128
// increment the reference count on the HttpClientImpl
129
// to prevent the SelectorManager thread from exiting
130
// until our operation is complete.
131
if (debug.on())
132
debug.log("Operation started: incrementing ref count for %s", client);
133
client.reference();
134
state = 0x01;
135
} else {
136
if (debug.on())
137
debug.log("Operation ref count for %s is already %s",
138
client, ((state & 0x2) == 0x2) ? "released." : "incremented!" );
139
assert (state & 0x01) == 0 : "reference count already incremented";
140
}
141
}
142
143
public synchronized void tryRelease() {
144
if (state == 0x01) {
145
// decrement the reference count on the HttpClientImpl
146
// to allow the SelectorManager thread to exit if no
147
// other operation is pending and the facade is no
148
// longer referenced.
149
if (debug.on())
150
debug.log("Operation finished: decrementing ref count for %s", client);
151
client.unreference();
152
} else if (state == 0) {
153
if (debug.on())
154
debug.log("Operation finished: releasing ref count for %s", client);
155
} else if ((state & 0x02) == 0x02) {
156
if (debug.on())
157
debug.log("ref count for %s already released", client);
158
}
159
state |= 0x02;
160
}
161
}
162
163
private volatile boolean firstTimeAround = true;
164
165
public CompletableFuture<Response> readHeadersAsync(Executor executor) {
166
if (debug.on())
167
debug.log("Reading Headers: (remaining: "
168
+ asyncReceiver.remaining() +") " + readProgress);
169
170
if (firstTimeAround) {
171
if (debug.on()) debug.log("First time around");
172
firstTimeAround = false;
173
} else {
174
// with expect continue we will resume reading headers + body.
175
asyncReceiver.unsubscribe(bodyReader);
176
bodyReader.reset();
177
178
hd = new Http1HeaderParser();
179
readProgress = State.READING_HEADERS;
180
headersReader.reset();
181
headersReader.start(hd);
182
asyncReceiver.subscribe(headersReader);
183
}
184
185
CompletableFuture<State> cf = headersReader.completion();
186
assert cf != null : "parsing not started";
187
if (debug.on()) {
188
debug.log("headersReader is %s",
189
cf == null ? "not yet started"
190
: cf.isDone() ? "already completed"
191
: "not yet completed");
192
}
193
194
Function<State, Response> lambda = (State completed) -> {
195
assert completed == State.READING_HEADERS;
196
if (debug.on())
197
debug.log("Reading Headers: creating Response object;"
198
+ " state is now " + readProgress);
199
asyncReceiver.unsubscribe(headersReader);
200
responseCode = hd.responseCode();
201
headers = hd.headers();
202
203
response = new Response(request,
204
exchange.getExchange(),
205
headers,
206
connection,
207
responseCode,
208
HTTP_1_1);
209
210
if (Log.headers()) {
211
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
212
Log.dumpHeaders(sb, " ", headers);
213
Log.logHeaders(sb.toString());
214
}
215
216
return response;
217
};
218
219
if (executor != null) {
220
return cf.thenApplyAsync(lambda, executor);
221
} else {
222
return cf.thenApply(lambda);
223
}
224
}
225
226
private boolean finished;
227
228
synchronized void completed() {
229
finished = true;
230
}
231
232
synchronized boolean finished() {
233
return finished;
234
}
235
236
/**
237
* Return known fixed content length or -1 if chunked, or -2 if no content-length
238
* information in which case, connection termination delimits the response body
239
*/
240
long fixupContentLen(long clen) {
241
if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {
242
return 0L;
243
}
244
if (clen == -1L) {
245
if (headers.firstValue("Transfer-encoding").orElse("")
246
.equalsIgnoreCase("chunked")) {
247
return -1L;
248
}
249
if (responseCode == 101) {
250
// this is a h2c or websocket upgrade, contentlength must be zero
251
return 0L;
252
}
253
return -2L;
254
}
255
return clen;
256
}
257
258
/**
259
* Read up to MAX_IGNORE bytes discarding
260
*/
261
public CompletableFuture<Void> ignoreBody(Executor executor) {
262
int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
263
if (clen == -1 || clen > MAX_IGNORE) {
264
connection.close();
265
return MinimalFuture.completedFuture(null); // not treating as error
266
} else {
267
return readBody(discarding(), !request.isWebSocket(), executor);
268
}
269
}
270
271
// Used for those response codes that have no body associated
272
public void nullBody(HttpResponse<T> resp, Throwable t) {
273
if (t != null) connection.close();
274
else {
275
return2Cache = !request.isWebSocket();
276
onFinished();
277
}
278
}
279
280
static final Flow.Subscription NOP = new Flow.Subscription() {
281
@Override
282
public void request(long n) { }
283
public void cancel() { }
284
};
285
286
/**
287
* The Http1AsyncReceiver ensures that all calls to
288
* the subscriber, including onSubscribe, occur sequentially.
289
* There could however be some race conditions that could happen
290
* in case of unexpected errors thrown at unexpected places, which
291
* may cause onError to be called multiple times.
292
* The Http1BodySubscriber will ensure that the user subscriber
293
* is actually completed only once - and only after it is
294
* subscribed.
295
* @param <U> The type of response.
296
*/
297
final static class Http1BodySubscriber<U> implements TrustedSubscriber<U> {
298
final HttpResponse.BodySubscriber<U> userSubscriber;
299
final AtomicBoolean completed = new AtomicBoolean();
300
volatile Throwable withError;
301
volatile boolean subscribed;
302
Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
303
this.userSubscriber = userSubscriber;
304
}
305
306
@Override
307
public boolean needsExecutor() {
308
return TrustedSubscriber.needsExecutor(userSubscriber);
309
}
310
311
// propagate the error to the user subscriber, even if not
312
// subscribed yet.
313
private void propagateError(Throwable t) {
314
assert t != null;
315
try {
316
// if unsubscribed at this point, it will not
317
// get subscribed later - so do it now and
318
// propagate the error
319
if (subscribed == false) {
320
subscribed = true;
321
userSubscriber.onSubscribe(NOP);
322
}
323
} finally {
324
// if onError throws then there is nothing to do
325
// here: let the caller deal with it by logging
326
// and closing the connection.
327
userSubscriber.onError(t);
328
}
329
}
330
331
// complete the subscriber, either normally or exceptionally
332
// ensure that the subscriber is completed only once.
333
private void complete(Throwable t) {
334
if (completed.compareAndSet(false, true)) {
335
t = withError = Utils.getCompletionCause(t);
336
if (t == null) {
337
assert subscribed;
338
try {
339
userSubscriber.onComplete();
340
} catch (Throwable x) {
341
// Simply propagate the error by calling
342
// onError on the user subscriber, and let the
343
// connection be reused since we should have received
344
// and parsed all the bytes when we reach here.
345
// If onError throws in turn, then we will simply
346
// let that new exception flow up to the caller
347
// and let it deal with it.
348
// (i.e: log and close the connection)
349
// Note that rethrowing here could introduce a
350
// race that might cause the next send() operation to
351
// fail as the connection has already been put back
352
// into the cache when we reach here.
353
propagateError(t = withError = Utils.getCompletionCause(x));
354
}
355
} else {
356
propagateError(t);
357
}
358
}
359
}
360
361
@Override
362
public CompletionStage<U> getBody() {
363
return userSubscriber.getBody();
364
}
365
366
@Override
367
public void onSubscribe(Flow.Subscription subscription) {
368
if (!subscribed) {
369
subscribed = true;
370
userSubscriber.onSubscribe(subscription);
371
} else {
372
// could be already subscribed and completed
373
// if an unexpected error occurred before the actual
374
// subscription - though that's not supposed
375
// happen.
376
assert completed.get();
377
}
378
}
379
@Override
380
public void onNext(List<ByteBuffer> item) {
381
assert !completed.get();
382
userSubscriber.onNext(item);
383
}
384
@Override
385
public void onError(Throwable throwable) {
386
complete(throwable);
387
}
388
@Override
389
public void onComplete() {
390
complete(null);
391
}
392
}
393
394
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
395
boolean return2Cache,
396
Executor executor) {
397
if (debug.on()) {
398
debug.log("readBody: return2Cache: " + return2Cache);
399
if (request.isWebSocket() && return2Cache && connection != null) {
400
debug.log("websocket connection will be returned to cache: "
401
+ connection.getClass() + "/" + connection );
402
}
403
}
404
assert !return2Cache || !request.isWebSocket();
405
this.return2Cache = return2Cache;
406
final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
407
408
final CompletableFuture<U> cf = new MinimalFuture<>();
409
410
long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
411
final long clen = fixupContentLen(clen0);
412
413
// expect-continue reads headers and body twice.
414
// if we reach here, we must reset the headersReader state.
415
asyncReceiver.unsubscribe(headersReader);
416
headersReader.reset();
417
ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
418
419
// We need to keep hold on the client facade until the
420
// tracker has been incremented.
421
connection.client().reference();
422
executor.execute(() -> {
423
try {
424
content = new ResponseContent(
425
connection, clen, headers, subscriber,
426
this::onFinished
427
);
428
if (cf.isCompletedExceptionally()) {
429
// if an error occurs during subscription
430
connection.close();
431
return;
432
}
433
// increment the reference count on the HttpClientImpl
434
// to prevent the SelectorManager thread from exiting until
435
// the body is fully read.
436
refCountTracker.acquire();
437
bodyParser = content.getBodyParser(
438
(t) -> {
439
try {
440
if (t != null) {
441
try {
442
subscriber.onError(t);
443
} finally {
444
cf.completeExceptionally(t);
445
}
446
}
447
} finally {
448
bodyReader.onComplete(t);
449
if (t != null) {
450
connection.close();
451
}
452
}
453
});
454
bodyReader.start(bodyParser);
455
CompletableFuture<State> bodyReaderCF = bodyReader.completion();
456
asyncReceiver.subscribe(bodyReader);
457
assert bodyReaderCF != null : "parsing not started";
458
// Make sure to keep a reference to asyncReceiver from
459
// within this
460
CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> {
461
t = Utils.getCompletionCause(t);
462
try {
463
if (t == null) {
464
if (debug.on()) debug.log("Finished reading body: " + s);
465
assert s == State.READING_BODY;
466
}
467
if (t != null) {
468
subscriber.onError(t);
469
cf.completeExceptionally(t);
470
}
471
} catch (Throwable x) {
472
// not supposed to happen
473
asyncReceiver.onReadError(x);
474
} finally {
475
// we're done: release the ref count for
476
// the current operation.
477
refCountTracker.tryRelease();
478
}
479
});
480
connection.addTrailingOperation(trailingOp);
481
} catch (Throwable t) {
482
if (debug.on()) debug.log("Failed reading body: " + t);
483
try {
484
subscriber.onError(t);
485
cf.completeExceptionally(t);
486
} finally {
487
asyncReceiver.onReadError(t);
488
}
489
} finally {
490
connection.client().unreference();
491
}
492
});
493
494
ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> {
495
cf.completeExceptionally(t);
496
asyncReceiver.setRetryOnError(false);
497
asyncReceiver.onReadError(t);
498
});
499
500
return cf.whenComplete((s,t) -> {
501
if (t != null) {
502
// If an exception occurred, release the
503
// ref count for the current operation, as
504
// it may never be triggered otherwise
505
// (BodySubscriber ofInputStream)
506
// If there was no exception then the
507
// ref count will be/have been released when
508
// the last byte of the response is/was received
509
refCountTracker.tryRelease();
510
}
511
});
512
}
513
514
515
private void onFinished() {
516
asyncReceiver.clear();
517
if (return2Cache) {
518
Log.logTrace("Attempting to return connection to the pool: {0}", connection);
519
// TODO: need to do something here?
520
// connection.setAsyncCallbacks(null, null, null);
521
522
// don't return the connection to the cache if EOF happened.
523
if (debug.on())
524
debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool");
525
connection.closeOrReturnToCache(eof == null ? headers : null);
526
}
527
}
528
529
HttpHeaders responseHeaders() {
530
return headers;
531
}
532
533
int responseCode() {
534
return responseCode;
535
}
536
537
// ================ Support for plugging into Http1Receiver =================
538
// ============================================================================
539
540
// Callback: Error receiver: Consumer of Throwable.
541
void onReadError(Throwable t) {
542
Log.logError(t);
543
Receiver<?> receiver = receiver(readProgress);
544
if (t instanceof EOFException) {
545
debug.log(Level.DEBUG, "onReadError: received EOF");
546
eof = (EOFException) t;
547
}
548
CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
549
debug.log(Level.DEBUG, () -> "onReadError: cf is "
550
+ (cf == null ? "null"
551
: (cf.isDone() ? "already completed"
552
: "not yet completed")));
553
if (cf != null) {
554
cf.completeExceptionally(t);
555
} else {
556
debug.log(Level.DEBUG, "onReadError", t);
557
}
558
debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
559
connection.close();
560
}
561
562
// ========================================================================
563
564
private State advance(State previous) {
565
assert readProgress == previous;
566
switch(previous) {
567
case READING_HEADERS:
568
asyncReceiver.unsubscribe(headersReader);
569
return readProgress = State.READING_BODY;
570
case READING_BODY:
571
asyncReceiver.unsubscribe(bodyReader);
572
return readProgress = State.DONE;
573
default:
574
throw new InternalError("can't advance from " + previous);
575
}
576
}
577
578
Receiver<?> receiver(State state) {
579
return switch (state) {
580
case READING_HEADERS -> headersReader;
581
case READING_BODY -> bodyReader;
582
583
default -> null;
584
};
585
586
}
587
588
static abstract class Receiver<T>
589
implements Http1AsyncReceiver.Http1AsyncDelegate {
590
abstract void start(T parser);
591
abstract CompletableFuture<State> completion();
592
// accepts a buffer from upstream.
593
// this should be implemented as a simple call to
594
// accept(ref, parser, cf)
595
public abstract boolean tryAsyncReceive(ByteBuffer buffer);
596
public abstract void onReadError(Throwable t);
597
// handle a byte buffer received from upstream.
598
// this method should set the value of Http1Response.buffer
599
// to ref.get() before beginning parsing.
600
abstract void handle(ByteBuffer buf, T parser,
601
CompletableFuture<State> cf);
602
// resets this objects state so that it can be reused later on
603
// typically puts the reference to parser and completion to null
604
abstract void reset();
605
606
// accepts a byte buffer received from upstream
607
// returns true if the buffer is fully parsed and more data can
608
// be accepted, false otherwise.
609
final boolean accept(ByteBuffer buf, T parser,
610
CompletableFuture<State> cf) {
611
if (cf == null || parser == null || cf.isDone()) return false;
612
handle(buf, parser, cf);
613
return !cf.isDone();
614
}
615
public abstract void onSubscribe(AbstractSubscription s);
616
public abstract AbstractSubscription subscription();
617
618
}
619
620
// Invoked with each new ByteBuffer when reading headers...
621
final class HeadersReader extends Receiver<Http1HeaderParser> {
622
final Consumer<State> onComplete;
623
volatile Http1HeaderParser parser;
624
volatile CompletableFuture<State> cf;
625
volatile long count; // bytes parsed (for debug)
626
volatile AbstractSubscription subscription;
627
628
HeadersReader(Consumer<State> onComplete) {
629
this.onComplete = onComplete;
630
}
631
632
@Override
633
public AbstractSubscription subscription() {
634
return subscription;
635
}
636
637
@Override
638
public void onSubscribe(AbstractSubscription s) {
639
this.subscription = s;
640
s.request(1);
641
}
642
643
@Override
644
void reset() {
645
cf = null;
646
parser = null;
647
count = 0;
648
subscription = null;
649
}
650
651
// Revisit: do we need to support restarting?
652
@Override
653
final void start(Http1HeaderParser hp) {
654
count = 0;
655
cf = new MinimalFuture<>();
656
parser = hp;
657
}
658
659
@Override
660
CompletableFuture<State> completion() {
661
return cf;
662
}
663
664
@Override
665
public final boolean tryAsyncReceive(ByteBuffer ref) {
666
boolean hasDemand = subscription.demand().tryDecrement();
667
assert hasDemand;
668
boolean needsMore = accept(ref, parser, cf);
669
if (needsMore) subscription.request(1);
670
return needsMore;
671
}
672
673
@Override
674
public final void onReadError(Throwable t) {
675
t = wrapWithExtraDetail(t, parser::currentStateMessage);
676
Http1Response.this.onReadError(t);
677
}
678
679
@Override
680
final void handle(ByteBuffer b,
681
Http1HeaderParser parser,
682
CompletableFuture<State> cf) {
683
assert cf != null : "parsing not started";
684
assert parser != null : "no parser";
685
try {
686
count += b.remaining();
687
if (debug.on())
688
debug.log("Sending " + b.remaining() + "/" + b.capacity()
689
+ " bytes to header parser");
690
if (parser.parse(b)) {
691
count -= b.remaining();
692
if (debug.on())
693
debug.log("Parsing headers completed. bytes=" + count);
694
onComplete.accept(State.READING_HEADERS);
695
cf.complete(State.READING_HEADERS);
696
}
697
} catch (Throwable t) {
698
if (debug.on())
699
debug.log("Header parser failed to handle buffer: " + t);
700
cf.completeExceptionally(t);
701
}
702
}
703
704
@Override
705
public void close(Throwable error) {
706
// if there's no error nothing to do: the cf should/will
707
// be completed.
708
if (error != null) {
709
CompletableFuture<State> cf = this.cf;
710
if (cf != null) {
711
if (debug.on())
712
debug.log("close: completing header parser CF with " + error);
713
cf.completeExceptionally(error);
714
}
715
}
716
}
717
}
718
719
// Invoked with each new ByteBuffer when reading bodies...
720
final class BodyReader extends Receiver<BodyParser> {
721
final Consumer<State> onComplete;
722
volatile BodyParser parser;
723
volatile CompletableFuture<State> cf;
724
volatile AbstractSubscription subscription;
725
BodyReader(Consumer<State> onComplete) {
726
this.onComplete = onComplete;
727
}
728
729
@Override
730
void reset() {
731
parser = null;
732
cf = null;
733
subscription = null;
734
}
735
736
// Revisit: do we need to support restarting?
737
@Override
738
final void start(BodyParser parser) {
739
cf = new MinimalFuture<>();
740
this.parser = parser;
741
}
742
743
@Override
744
CompletableFuture<State> completion() {
745
return cf;
746
}
747
748
@Override
749
public final boolean tryAsyncReceive(ByteBuffer b) {
750
return accept(b, parser, cf);
751
}
752
753
@Override
754
public final void onReadError(Throwable t) {
755
if (t instanceof EOFException && bodyParser != null &&
756
bodyParser instanceof UnknownLengthBodyParser) {
757
((UnknownLengthBodyParser)bodyParser).complete();
758
return;
759
}
760
t = wrapWithExtraDetail(t, parser::currentStateMessage);
761
Http1Response.this.onReadError(t);
762
}
763
764
@Override
765
public AbstractSubscription subscription() {
766
return subscription;
767
}
768
769
@Override
770
public void onSubscribe(AbstractSubscription s) {
771
this.subscription = s;
772
try {
773
parser.onSubscribe(s);
774
} catch (Throwable t) {
775
cf.completeExceptionally(t);
776
throw t;
777
}
778
}
779
780
@Override
781
final void handle(ByteBuffer b,
782
BodyParser parser,
783
CompletableFuture<State> cf) {
784
assert cf != null : "parsing not started";
785
assert parser != null : "no parser";
786
try {
787
if (debug.on())
788
debug.log("Sending " + b.remaining() + "/" + b.capacity()
789
+ " bytes to body parser");
790
parser.accept(b);
791
} catch (Throwable t) {
792
if (debug.on())
793
debug.log("Body parser failed to handle buffer: " + t);
794
if (!cf.isDone()) {
795
cf.completeExceptionally(t);
796
}
797
}
798
}
799
800
final void onComplete(Throwable closedExceptionally) {
801
if (cf.isDone()) return;
802
if (closedExceptionally != null) {
803
cf.completeExceptionally(closedExceptionally);
804
} else {
805
onComplete.accept(State.READING_BODY);
806
cf.complete(State.READING_BODY);
807
}
808
}
809
810
@Override
811
public final void close(Throwable error) {
812
CompletableFuture<State> cf = this.cf;
813
if (cf != null && !cf.isDone()) {
814
// we want to make sure dependent actions are triggered
815
// in order to make sure the client reference count
816
// is decremented
817
if (error != null) {
818
if (debug.on())
819
debug.log("close: completing body parser CF with " + error);
820
cf.completeExceptionally(error);
821
} else {
822
if (debug.on())
823
debug.log("close: completing body parser CF");
824
cf.complete(State.READING_BODY);
825
}
826
}
827
}
828
829
@Override
830
public String toString() {
831
return super.toString() + "/parser=" + String.valueOf(parser);
832
}
833
}
834
}
835
836