Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
41171 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.IOException;
29
import java.net.InetSocketAddress;
30
import java.net.http.HttpClient;
31
import java.net.http.HttpResponse;
32
import java.net.http.HttpResponse.BodyHandler;
33
import java.net.http.HttpResponse.BodySubscriber;
34
import java.nio.ByteBuffer;
35
import java.util.Objects;
36
import java.util.concurrent.CompletableFuture;
37
import java.util.LinkedList;
38
import java.util.List;
39
import java.util.concurrent.ConcurrentLinkedDeque;
40
import java.util.concurrent.Executor;
41
import java.util.concurrent.Flow;
42
import jdk.internal.net.http.common.Demand;
43
import jdk.internal.net.http.common.Log;
44
import jdk.internal.net.http.common.FlowTube;
45
import jdk.internal.net.http.common.Logger;
46
import jdk.internal.net.http.common.SequentialScheduler;
47
import jdk.internal.net.http.common.MinimalFuture;
48
import jdk.internal.net.http.common.Utils;
49
import static java.net.http.HttpClient.Version.HTTP_1_1;
50
import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
51
52
/**
53
* Encapsulates one HTTP/1.1 request/response exchange.
54
*/
55
class Http1Exchange<T> extends ExchangeImpl<T> {
56
57
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
58
final HttpRequestImpl request; // main request
59
final Http1Request requestAction;
60
private volatile Http1Response<T> response;
61
final HttpConnection connection;
62
final HttpClientImpl client;
63
final Executor executor;
64
private final Http1AsyncReceiver asyncReceiver;
65
private volatile boolean upgraded;
66
67
/** Records a possible cancellation raised before any operation
68
* has been initiated, or an error received while sending the request. */
69
private Throwable failed;
70
private final List<CompletableFuture<?>> operations; // used for cancel
71
72
/** Must be held when operating on any internal state or data. */
73
private final Object lock = new Object();
74
75
/** Holds the outgoing data, either the headers or a request body part. Or
76
* an error from the request body publisher. At most there can be ~2 pieces
77
* of outgoing data ( onComplete|onError can be invoked without demand ).*/
78
final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
79
80
/** The write publisher, responsible for writing the complete request ( both
81
* headers and body ( if any ). */
82
private final Http1Publisher writePublisher = new Http1Publisher();
83
84
/** Completed when the header have been published, or there is an error */
85
private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>();
86
/** Completed when the body has been published, or there is an error */
87
private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
88
89
/** The subscriber to the request's body published. Maybe null. */
90
private volatile Http1BodySubscriber bodySubscriber;
91
92
enum State { INITIAL,
93
HEADERS,
94
BODY,
95
ERROR, // terminal state
96
COMPLETING,
97
COMPLETED } // terminal state
98
99
private State state = State.INITIAL;
100
101
/** A carrier for either data or an error. Used to carry data, and communicate
102
* errors from the request ( both headers and body ) to the exchange. */
103
static class DataPair {
104
Throwable throwable;
105
List<ByteBuffer> data;
106
DataPair(List<ByteBuffer> data, Throwable throwable){
107
this.data = data;
108
this.throwable = throwable;
109
}
110
@Override
111
public String toString() {
112
return "DataPair [data=" + data + ", throwable=" + throwable + "]";
113
}
114
}
115
116
/** An abstract supertype for HTTP/1.1 body subscribers. There are two
117
* concrete implementations: {@link Http1Request.StreamSubscriber}, and
118
* {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
119
* fixed length bodies, respectively. */
120
static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
121
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
122
private volatile Flow.Subscription subscription;
123
volatile boolean complete;
124
private final Logger debug;
125
Http1BodySubscriber(Logger debug) {
126
assert debug != null;
127
this.debug = debug;
128
}
129
130
/** Final sentinel in the stream of request body. */
131
static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
132
133
final void request(long n) {
134
if (debug.on())
135
debug.log("Http1BodySubscriber requesting %d, from %s",
136
n, subscription);
137
subscription.request(n);
138
}
139
140
/** A current-state message suitable for inclusion in an exception detail message. */
141
abstract String currentStateMessage();
142
143
final boolean isSubscribed() {
144
return subscription != null;
145
}
146
147
final void setSubscription(Flow.Subscription subscription) {
148
this.subscription = subscription;
149
whenSubscribed.complete(subscription);
150
}
151
152
final void cancelSubscription() {
153
try {
154
subscription.cancel();
155
} catch(Throwable t) {
156
String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
157
if (debug.on()) debug.log("%s: %s", msg, t);
158
Log.logError("{0}: {1}", msg, (Object)t);
159
}
160
}
161
162
static Http1BodySubscriber completeSubscriber(Logger debug) {
163
return new Http1BodySubscriber(debug) {
164
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
165
@Override public void onNext(ByteBuffer item) { error(); }
166
@Override public void onError(Throwable throwable) { error(); }
167
@Override public void onComplete() { error(); }
168
@Override String currentStateMessage() { return null; }
169
private void error() {
170
throw new InternalError("should not reach here");
171
}
172
};
173
}
174
}
175
176
@Override
177
public String toString() {
178
return "HTTP/1.1 " + request.toString();
179
}
180
181
HttpRequestImpl request() {
182
return request;
183
}
184
185
Http1Exchange(Exchange<T> exchange, HttpConnection connection)
186
throws IOException
187
{
188
super(exchange);
189
this.request = exchange.request();
190
this.client = exchange.client();
191
this.executor = exchange.executor();
192
this.operations = new LinkedList<>();
193
operations.add(headersSentCF);
194
operations.add(bodySentCF);
195
if (connection != null) {
196
this.connection = connection;
197
} else {
198
InetSocketAddress addr = request.getAddress();
199
this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
200
}
201
this.requestAction = new Http1Request(request, this);
202
this.asyncReceiver = new Http1AsyncReceiver(executor, this);
203
}
204
205
@Override
206
HttpConnection connection() {
207
return connection;
208
}
209
210
private void connectFlows(HttpConnection connection) {
211
FlowTube tube = connection.getConnectionFlow();
212
if (debug.on()) debug.log("%s connecting flows", tube);
213
214
// Connect the flow to our Http1TubeSubscriber:
215
// asyncReceiver.subscriber().
216
tube.connectFlows(writePublisher,
217
asyncReceiver.subscriber());
218
}
219
220
@Override
221
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
222
// create the response before sending the request headers, so that
223
// the response can set the appropriate receivers.
224
if (debug.on()) debug.log("Sending headers only");
225
// If the first attempt to read something triggers EOF, or
226
// IOException("channel reset by peer"), we're going to retry.
227
// Instruct the asyncReceiver to throw ConnectionExpiredException
228
// to force a retry.
229
asyncReceiver.setRetryOnError(true);
230
if (response == null) {
231
response = new Http1Response<>(connection, this, asyncReceiver);
232
}
233
234
if (debug.on()) debug.log("response created in advance");
235
236
CompletableFuture<Void> connectCF;
237
if (!connection.connected()) {
238
if (debug.on()) debug.log("initiating connect async");
239
connectCF = connection.connectAsync(exchange)
240
.thenCompose(unused -> connection.finishConnect());
241
Throwable cancelled;
242
synchronized (lock) {
243
if ((cancelled = failed) == null) {
244
operations.add(connectCF);
245
}
246
}
247
if (cancelled != null) {
248
if (client.isSelectorThread()) {
249
executor.execute(() ->
250
connectCF.completeExceptionally(cancelled));
251
} else {
252
connectCF.completeExceptionally(cancelled);
253
}
254
}
255
} else {
256
connectCF = new MinimalFuture<>();
257
connectCF.complete(null);
258
}
259
260
return connectCF
261
.thenCompose(unused -> {
262
CompletableFuture<Void> cf = new MinimalFuture<>();
263
try {
264
asyncReceiver.whenFinished.whenComplete((r,t) -> {
265
if (t != null) {
266
if (debug.on())
267
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
268
if (!headersSentCF.isDone())
269
headersSentCF.completeAsync(() -> this, executor);
270
}
271
});
272
connectFlows(connection);
273
274
if (debug.on()) debug.log("requestAction.headers");
275
List<ByteBuffer> data = requestAction.headers();
276
synchronized (lock) {
277
state = State.HEADERS;
278
}
279
if (debug.on()) debug.log("setting outgoing with headers");
280
assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
281
appendToOutgoing(data);
282
cf.complete(null);
283
return cf;
284
} catch (Throwable t) {
285
if (debug.on()) debug.log("Failed to send headers: %s", t);
286
headersSentCF.completeExceptionally(t);
287
bodySentCF.completeExceptionally(t);
288
connection.close();
289
cf.completeExceptionally(t);
290
return cf;
291
} })
292
.thenCompose(unused -> headersSentCF);
293
}
294
295
private void cancelIfFailed(Flow.Subscription s) {
296
asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
297
if (debug.on())
298
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
299
if (t != null) {
300
s.cancel();
301
// Don't complete exceptionally here as 't'
302
// might not be the right exception: it will
303
// not have been decorated yet.
304
// t is an exception raised by the read side,
305
// an EOFException or Broken Pipe...
306
// We are cancelling the BodyPublisher subscription
307
// and completing bodySentCF to allow the next step
308
// to flow and call readHeaderAsync, which will
309
// get the right exception from the asyncReceiver.
310
bodySentCF.complete(this);
311
}
312
}, executor);
313
}
314
315
@Override
316
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
317
assert headersSentCF.isDone();
318
if (debug.on()) debug.log("sendBodyAsync");
319
try {
320
bodySubscriber = requestAction.continueRequest();
321
if (debug.on()) debug.log("bodySubscriber is %s",
322
bodySubscriber == null ? null : bodySubscriber.getClass());
323
if (bodySubscriber == null) {
324
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
325
appendToOutgoing(Http1BodySubscriber.COMPLETED);
326
} else {
327
// start
328
bodySubscriber.whenSubscribed
329
.thenAccept((s) -> cancelIfFailed(s))
330
.thenAccept((s) -> requestMoreBody());
331
}
332
} catch (Throwable t) {
333
cancelImpl(t);
334
bodySentCF.completeExceptionally(t);
335
}
336
return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
337
}
338
339
@Override
340
CompletableFuture<Response> getResponseAsync(Executor executor) {
341
if (debug.on()) debug.log("reading headers");
342
CompletableFuture<Response> cf = response.readHeadersAsync(executor);
343
Throwable cause;
344
synchronized (lock) {
345
operations.add(cf);
346
cause = failed;
347
failed = null;
348
}
349
350
if (cause != null) {
351
Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
352
+ "\n\tCompleting exceptionally with {2}\n",
353
request.uri(),
354
request.timeout().isPresent() ?
355
// calling duration.toMillis() can throw an exception.
356
// this is just debugging, we don't care if it overflows.
357
(request.timeout().get().getSeconds() * 1000
358
+ request.timeout().get().getNano() / 1000000) : -1,
359
cause);
360
boolean acknowledged = cf.completeExceptionally(cause);
361
if (debug.on())
362
debug.log(acknowledged ? ("completed response with " + cause)
363
: ("response already completed, ignoring " + cause));
364
}
365
return Utils.wrapForDebug(debug, "getResponseAsync", cf);
366
}
367
368
@Override
369
CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
370
boolean returnConnectionToPool,
371
Executor executor)
372
{
373
BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
374
response.responseHeaders(),
375
HTTP_1_1));
376
CompletableFuture<T> bodyCF = response.readBody(bs,
377
returnConnectionToPool,
378
executor);
379
return bodyCF;
380
}
381
382
@Override
383
CompletableFuture<Void> ignoreBody() {
384
return response.ignoreBody(executor);
385
}
386
387
// Used for those response codes that have no body associated
388
@Override
389
public void nullBody(HttpResponse<T> resp, Throwable t) {
390
response.nullBody(resp, t);
391
}
392
393
394
ByteBuffer drainLeftOverBytes() {
395
synchronized (lock) {
396
asyncReceiver.stop();
397
return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
398
}
399
}
400
401
void released() {
402
Http1Response<T> resp = this.response;
403
if (resp != null) resp.completed();
404
asyncReceiver.clear();
405
}
406
407
void completed() {
408
Http1Response<T> resp = this.response;
409
if (resp != null) resp.completed();
410
}
411
412
/**
413
* Cancel checks to see if request and responseAsync finished already.
414
* If not it closes the connection and completes all pending operations
415
*/
416
@Override
417
void cancel() {
418
cancelImpl(new IOException("Request cancelled"));
419
}
420
421
/**
422
* Cancel checks to see if request and responseAsync finished already.
423
* If not it closes the connection and completes all pending operations
424
*/
425
@Override
426
void cancel(IOException cause) {
427
cancelImpl(cause);
428
}
429
430
private void cancelImpl(Throwable cause) {
431
LinkedList<CompletableFuture<?>> toComplete = null;
432
int count = 0;
433
Throwable error;
434
synchronized (lock) {
435
if ((error = failed) == null) {
436
failed = error = cause;
437
}
438
if (debug.on()) {
439
debug.log(request.uri() + ": " + error);
440
}
441
if (requestAction != null && requestAction.finished()
442
&& response != null && response.finished()) {
443
return;
444
}
445
writePublisher.writeScheduler.stop();
446
if (operations.isEmpty()) {
447
Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
448
+ "\n\tCan''t cancel yet with {2}",
449
request.uri(),
450
request.timeout().isPresent() ?
451
// calling duration.toMillis() can throw an exception.
452
// this is just debugging, we don't care if it overflows.
453
(request.timeout().get().getSeconds() * 1000
454
+ request.timeout().get().getNano() / 1000000) : -1,
455
cause);
456
} else {
457
for (CompletableFuture<?> cf : operations) {
458
if (!cf.isDone()) {
459
if (toComplete == null) toComplete = new LinkedList<>();
460
toComplete.add(cf);
461
count++;
462
}
463
}
464
operations.clear();
465
}
466
}
467
try {
468
Log.logError("Http1Exchange.cancel: count=" + count);
469
if (toComplete != null) {
470
// We might be in the selector thread in case of timeout, when
471
// the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
472
// There may or may not be other places that reach here
473
// from the SelectorManager thread, so just make sure we
474
// don't complete any CF from within the selector manager
475
// thread.
476
Executor exec = client.isSelectorThread()
477
? executor
478
: this::runInline;
479
Throwable x = error;
480
while (!toComplete.isEmpty()) {
481
CompletableFuture<?> cf = toComplete.poll();
482
exec.execute(() -> {
483
if (cf.completeExceptionally(x)) {
484
if (debug.on())
485
debug.log("%s: completed cf with %s", request.uri(), x);
486
}
487
});
488
}
489
}
490
} finally {
491
if (!upgraded)
492
connection.close();
493
}
494
}
495
496
void upgraded() {
497
upgraded = true;
498
}
499
500
private void runInline(Runnable run) {
501
assert !client.isSelectorThread();
502
run.run();
503
}
504
505
/** Returns true if this exchange was canceled. */
506
boolean isCanceled() {
507
synchronized (lock) {
508
return failed != null;
509
}
510
}
511
512
/** Returns the cause for which this exchange was canceled, if available. */
513
Throwable getCancelCause() {
514
synchronized (lock) {
515
return failed;
516
}
517
}
518
519
/** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
520
void appendToOutgoing(Throwable throwable) {
521
appendToOutgoing(new DataPair(null, throwable));
522
}
523
524
/** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
525
void appendToOutgoing(List<ByteBuffer> item) {
526
appendToOutgoing(new DataPair(item, null));
527
}
528
529
private void appendToOutgoing(DataPair dp) {
530
if (debug.on()) debug.log("appending to outgoing " + dp);
531
outgoing.add(dp);
532
writePublisher.writeScheduler.runOrSchedule();
533
}
534
535
/** Tells whether, or not, there is any outgoing data that can be published,
536
* or if there is an error. */
537
private boolean hasOutgoing() {
538
return !outgoing.isEmpty();
539
}
540
541
private void requestMoreBody() {
542
try {
543
if (debug.on()) debug.log("requesting more request body from the subscriber");
544
bodySubscriber.request(1);
545
} catch (Throwable t) {
546
if (debug.on()) debug.log("Subscription::request failed", t);
547
cancelImpl(t);
548
bodySentCF.completeExceptionally(t);
549
}
550
}
551
552
private void cancelUpstreamSubscription() {
553
final Executor exec = client.theExecutor();
554
if (debug.on()) debug.log("cancelling upstream publisher");
555
if (bodySubscriber != null) {
556
exec.execute(bodySubscriber::cancelSubscription);
557
} else if (debug.on()) {
558
debug.log("bodySubscriber is null");
559
}
560
}
561
562
// Invoked only by the publisher
563
// ALL tasks should execute off the Selector-Manager thread
564
/** Returns the next portion of the HTTP request, or the error. */
565
private DataPair getOutgoing() {
566
final Executor exec = client.theExecutor();
567
final DataPair dp = outgoing.pollFirst();
568
569
if (writePublisher.cancelled) {
570
cancelUpstreamSubscription();
571
headersSentCF.completeAsync(() -> this, exec);
572
bodySentCF.completeAsync(() -> this, exec);
573
return null;
574
}
575
576
if (dp == null) // publisher has not published anything yet
577
return null;
578
579
if (dp.throwable != null) {
580
synchronized (lock) {
581
state = State.ERROR;
582
}
583
exec.execute(() -> {
584
headersSentCF.completeExceptionally(dp.throwable);
585
bodySentCF.completeExceptionally(dp.throwable);
586
connection.close();
587
});
588
return dp;
589
}
590
591
switch (state) {
592
case HEADERS:
593
synchronized (lock) {
594
state = State.BODY;
595
}
596
// completeAsync, since dependent tasks should run in another thread
597
if (debug.on()) debug.log("initiating completion of headersSentCF");
598
headersSentCF.completeAsync(() -> this, exec);
599
break;
600
case BODY:
601
if (dp.data == Http1BodySubscriber.COMPLETED) {
602
synchronized (lock) {
603
state = State.COMPLETING;
604
}
605
if (debug.on()) debug.log("initiating completion of bodySentCF");
606
bodySentCF.completeAsync(() -> this, exec);
607
} else {
608
exec.execute(this::requestMoreBody);
609
}
610
break;
611
case INITIAL:
612
case ERROR:
613
case COMPLETING:
614
case COMPLETED:
615
default:
616
assert false : "Unexpected state:" + state;
617
}
618
619
return dp;
620
}
621
622
/** A Publisher of HTTP/1.1 headers and request body. */
623
final class Http1Publisher implements FlowTube.TubePublisher {
624
625
final Logger debug = Utils.getDebugLogger(this::dbgString);
626
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
627
volatile boolean cancelled;
628
final Http1WriteSubscription subscription = new Http1WriteSubscription();
629
final Demand demand = new Demand();
630
final SequentialScheduler writeScheduler =
631
SequentialScheduler.lockingScheduler(new WriteTask());
632
633
@Override
634
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
635
assert state == State.INITIAL;
636
Objects.requireNonNull(s);
637
assert subscriber == null;
638
639
subscriber = s;
640
if (debug.on()) debug.log("got subscriber: %s", s);
641
s.onSubscribe(subscription);
642
}
643
644
volatile String dbgTag;
645
String dbgString() {
646
String tag = dbgTag;
647
Object flow = connection.getConnectionFlow();
648
if (tag == null && flow != null) {
649
dbgTag = tag = "Http1Publisher(" + flow + ")";
650
} else if (tag == null) {
651
tag = "Http1Publisher(?)";
652
}
653
return tag;
654
}
655
656
@SuppressWarnings("fallthrough")
657
private boolean checkRequestCancelled() {
658
if (exchange.multi.requestCancelled()) {
659
if (debug.on()) debug.log("request cancelled");
660
if (subscriber == null) {
661
if (debug.on()) debug.log("no subscriber yet");
662
return true;
663
}
664
switch (state) {
665
case BODY:
666
cancelUpstreamSubscription();
667
// fall trough to HEADERS
668
case HEADERS:
669
Throwable cause = getCancelCause();
670
if (cause == null) cause = new IOException("Request cancelled");
671
subscriber.onError(cause);
672
writeScheduler.stop();
673
return true;
674
}
675
}
676
return false;
677
}
678
679
680
final class WriteTask implements Runnable {
681
@Override
682
public void run() {
683
assert state != State.COMPLETED : "Unexpected state:" + state;
684
if (debug.on()) debug.log("WriteTask");
685
686
if (cancelled) {
687
if (debug.on()) debug.log("handling cancellation");
688
writeScheduler.stop();
689
getOutgoing();
690
return;
691
}
692
693
if (checkRequestCancelled()) return;
694
695
if (subscriber == null) {
696
if (debug.on()) debug.log("no subscriber yet");
697
return;
698
}
699
700
if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
701
while (hasOutgoing() && demand.tryDecrement()) {
702
DataPair dp = getOutgoing();
703
if (dp == null)
704
break;
705
706
if (dp.throwable != null) {
707
if (debug.on()) debug.log("onError");
708
// Do not call the subscriber's onError, it is not required.
709
writeScheduler.stop();
710
} else {
711
List<ByteBuffer> data = dp.data;
712
if (data == Http1BodySubscriber.COMPLETED) {
713
synchronized (lock) {
714
assert state == State.COMPLETING : "Unexpected state:" + state;
715
state = State.COMPLETED;
716
}
717
if (debug.on())
718
debug.log("completed, stopping %s", writeScheduler);
719
writeScheduler.stop();
720
// Do nothing more. Just do not publish anything further.
721
// The next Subscriber will eventually take over.
722
723
} else {
724
if (checkRequestCancelled()) return;
725
if (debug.on())
726
debug.log("onNext with " + Utils.remaining(data) + " bytes");
727
subscriber.onNext(data);
728
}
729
}
730
}
731
}
732
}
733
734
final class Http1WriteSubscription implements Flow.Subscription {
735
736
@Override
737
public void request(long n) {
738
if (cancelled)
739
return; //no-op
740
demand.increase(n);
741
if (debug.on())
742
debug.log("subscription request(%d), demand=%s", n, demand);
743
writeScheduler.runOrSchedule(client.theExecutor());
744
}
745
746
@Override
747
public void cancel() {
748
if (debug.on()) debug.log("subscription cancelled");
749
if (cancelled)
750
return; //no-op
751
cancelled = true;
752
writeScheduler.runOrSchedule(client.theExecutor());
753
}
754
}
755
}
756
757
HttpClient client() {
758
return client;
759
}
760
761
String dbgString() {
762
return "Http1Exchange";
763
}
764
}
765
766