Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
41171 views
1
/*
2
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.nio.ByteBuffer;
31
import java.util.Arrays;
32
import java.util.HashSet;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ConcurrentLinkedDeque;
36
import java.util.concurrent.Executor;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Flow;
39
import java.util.concurrent.atomic.AtomicBoolean;
40
import java.util.concurrent.atomic.AtomicLong;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import jdk.internal.net.http.common.Demand;
44
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
45
import jdk.internal.net.http.common.Log;
46
import jdk.internal.net.http.common.Logger;
47
import jdk.internal.net.http.common.MinimalFuture;
48
import jdk.internal.net.http.common.SequentialScheduler;
49
import jdk.internal.net.http.common.ConnectionExpiredException;
50
import jdk.internal.net.http.common.Utils;
51
52
53
/**
54
* A helper class that will queue up incoming data until the receiving
55
* side is ready to handle it.
56
*/
57
class Http1AsyncReceiver {
58
59
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
60
61
/**
62
* A delegate that can asynchronously receive data from an upstream flow,
63
* parse, it, then possibly transform it and either store it (response
64
* headers) or possibly pass it to a downstream subscriber (response body).
65
* Usually, there will be one Http1AsyncDelegate in charge of receiving
66
* and parsing headers, and another one in charge of receiving, parsing,
67
* and forwarding body. Each will sequentially subscribe with the
68
* Http1AsyncReceiver in turn. There may be additional delegates which
69
* subscribe to the Http1AsyncReceiver, mainly for the purpose of handling
70
* errors while the connection is busy transmitting the request body and the
71
* Http1Exchange::readBody method hasn't been called yet, and response
72
* delegates haven't subscribed yet.
73
*/
74
static interface Http1AsyncDelegate {
75
/**
76
* Receives and handles a byte buffer reference.
77
* @param ref A byte buffer reference coming from upstream.
78
* @return false, if the byte buffer reference should be kept in the queue.
79
* Usually, this means that either the byte buffer reference
80
* was handled and parsing is finished, or that the receiver
81
* didn't handle the byte reference at all.
82
* There may or may not be any remaining data in the
83
* byte buffer, and the byte buffer reference must not have
84
* been cleared.
85
* true, if the byte buffer reference was fully read and
86
* more data can be received.
87
*/
88
public boolean tryAsyncReceive(ByteBuffer ref);
89
90
/**
91
* Called when an exception is raised.
92
* @param ex The raised Throwable.
93
*/
94
public void onReadError(Throwable ex);
95
96
/**
97
* Must be called before any other method on the delegate.
98
* The subscription can be either used directly by the delegate
99
* to request more data (e.g. if the delegate is a header parser),
100
* or can be forwarded to a downstream subscriber (if the delegate
101
* is a body parser that wraps a response BodySubscriber).
102
* In all cases, it is the responsibility of the delegate to ensure
103
* that request(n) and demand.tryDecrement() are called appropriately.
104
* No data will be sent to {@code tryAsyncReceive} unless
105
* the subscription has some demand.
106
*
107
* @param s A subscription that allows the delegate to control the
108
* data flow.
109
*/
110
public void onSubscribe(AbstractSubscription s);
111
112
/**
113
* Returns the subscription that was passed to {@code onSubscribe}
114
* @return the subscription that was passed to {@code onSubscribe}..
115
*/
116
public AbstractSubscription subscription();
117
118
/**
119
* Called to make sure resources are released when the
120
* when the Http1AsyncReceiver is stopped.
121
* @param error The Http1AsyncReceiver pending error ref,
122
* if any.
123
*/
124
public void close(Throwable error);
125
126
}
127
128
/**
129
* A simple subclass of AbstractSubscription that ensures the
130
* SequentialScheduler will be run when request() is called and demand
131
* becomes positive again.
132
*/
133
private static final class Http1AsyncDelegateSubscription
134
extends AbstractSubscription
135
{
136
private final Runnable onCancel;
137
private final Consumer<Throwable> onError;
138
private final SequentialScheduler scheduler;
139
private volatile boolean cancelled;
140
Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
141
Runnable onCancel,
142
Consumer<Throwable> onError) {
143
this.scheduler = scheduler;
144
this.onCancel = onCancel;
145
this.onError = onError;
146
}
147
@Override
148
public void request(long n) {
149
if (cancelled) return;
150
try {
151
final Demand demand = demand();
152
if (demand.increase(n)) {
153
scheduler.runOrSchedule();
154
}
155
} catch (IllegalArgumentException x) {
156
cancelled = true;
157
onError.accept(x);
158
}
159
}
160
@Override
161
public void cancel() {
162
cancelled = true;
163
onCancel.run();
164
}
165
}
166
167
private final ConcurrentLinkedDeque<ByteBuffer> queue
168
= new ConcurrentLinkedDeque<>();
169
private final SequentialScheduler scheduler =
170
SequentialScheduler.lockingScheduler(this::flush);
171
final MinimalFuture<Void> whenFinished;
172
private final Executor executor;
173
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
174
private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
175
private final AtomicLong received = new AtomicLong();
176
final AtomicBoolean canRequestMore = new AtomicBoolean();
177
178
private volatile Throwable error;
179
private volatile Http1AsyncDelegate delegate;
180
// This reference is only used to prevent early GC of the exchange.
181
private volatile Http1Exchange<?> owner;
182
// Only used for checking whether we run on the selector manager thread.
183
private final HttpClientImpl client;
184
private boolean retry;
185
private volatile boolean stopRequested;
186
187
public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
188
this.pendingDelegateRef = new AtomicReference<>();
189
this.executor = executor;
190
this.whenFinished = new MinimalFuture<>();
191
this.owner = owner;
192
this.client = owner.client;
193
}
194
195
// This is the main loop called by the SequentialScheduler.
196
// It attempts to empty the queue until the scheduler is stopped,
197
// or the delegate is unregistered, or the delegate is unable to
198
// process the data (because it's not ready or already done), which
199
// it signals by returning 'true';
200
private void flush() {
201
ByteBuffer buf;
202
try {
203
// we should not be running in the selector here,
204
// except if the custom Executor supplied to the client is
205
// something like (r) -> r.run();
206
assert !client.isSelectorThread()
207
|| !(client.theExecutor().delegate() instanceof ExecutorService) :
208
"Http1AsyncReceiver::flush should not run in the selector: "
209
+ Thread.currentThread().getName();
210
211
// First check whether we have a pending delegate that has
212
// just subscribed, and if so, create a Subscription for it
213
// and call onSubscribe.
214
handlePendingDelegate();
215
216
// Then start emptying the queue, if possible.
217
while ((buf = queue.peek()) != null && !stopRequested) {
218
Http1AsyncDelegate delegate = this.delegate;
219
if (debug.on())
220
debug.log("Got %s bytes for delegate %s",
221
buf.remaining(), delegate);
222
if (!hasDemand(delegate)) {
223
// The scheduler will be invoked again later when the demand
224
// becomes positive.
225
return;
226
}
227
228
assert delegate != null;
229
if (debug.on())
230
debug.log("Forwarding %s bytes to delegate %s",
231
buf.remaining(), delegate);
232
// The delegate has demand: feed it the next buffer.
233
if (!delegate.tryAsyncReceive(buf)) {
234
final long remaining = buf.remaining();
235
if (debug.on()) debug.log(() -> {
236
// If the scheduler is stopped, the queue may already
237
// be empty and the reference may already be released.
238
String remstr = scheduler.isStopped() ? "" :
239
" remaining in ref: "
240
+ remaining;
241
remstr += remstr
242
+ " total remaining: " + remaining();
243
return "Delegate done: " + remaining;
244
});
245
canRequestMore.set(false);
246
// The last buffer parsed may have remaining unparsed bytes.
247
// Don't take it out of the queue.
248
return; // done.
249
}
250
251
// removed parsed buffer from queue, and continue with next
252
// if available
253
ByteBuffer parsed = queue.remove();
254
canRequestMore.set(queue.isEmpty() && !stopRequested);
255
assert parsed == buf;
256
}
257
258
// queue is empty: let's see if we should request more
259
checkRequestMore();
260
261
} catch (Throwable t) {
262
Throwable x = error;
263
if (x == null) error = t; // will be handled in the finally block
264
if (debug.on()) debug.log("Unexpected error caught in flush()", t);
265
} finally {
266
// Handles any pending error.
267
// The most recently subscribed delegate will get the error.
268
checkForErrors();
269
}
270
}
271
272
private String describe() {
273
Http1Exchange<?> exchange = owner;
274
if (exchange != null) {
275
return String.valueOf(exchange.request());
276
}
277
return "<uri unavailable>";
278
}
279
280
/**
281
* Must be called from within the scheduler main loop.
282
* Handles any pending errors by calling delegate.onReadError().
283
* If the error can be forwarded to the delegate, stops the scheduler.
284
*/
285
private void checkForErrors() {
286
// Handles any pending error.
287
// The most recently subscribed delegate will get the error.
288
// If the delegate is null, the error will be handled by the next
289
// delegate that subscribes.
290
// If the queue is not empty, wait until it is empty before
291
// handling the error.
292
Http1AsyncDelegate delegate = pendingDelegateRef.get();
293
if (delegate == null) delegate = this.delegate;
294
Throwable x = error;
295
if (delegate != null && x != null && (stopRequested || queue.isEmpty())) {
296
// forward error only after emptying the queue.
297
final Object captured = delegate;
298
if (debug.on())
299
debug.log(() -> "flushing " + x + "\n\t delegate: " + captured
300
+ "\t\t queue.isEmpty: " + queue.isEmpty());
301
scheduler.stop();
302
delegate.onReadError(x);
303
whenFinished.completeExceptionally(x);
304
if (Log.channel()) {
305
Log.logChannel("HTTP/1 read subscriber stopped for: {0}", describe());
306
}
307
if (stopRequested) {
308
// This is the special case where the subscriber
309
// has requested an illegal number of items.
310
// In this case, the error doesn't come from
311
// upstream, but from downstream, and we need to
312
// close the upstream connection.
313
Http1Exchange<?> exchg = owner;
314
stop();
315
if (exchg != null) exchg.connection().close();
316
}
317
}
318
}
319
320
/**
321
* Must be called from within the scheduler main loop.
322
* Figure out whether more data should be requested from the
323
* Http1TubeSubscriber.
324
*/
325
private void checkRequestMore() {
326
Http1AsyncDelegate delegate = this.delegate;
327
boolean more = this.canRequestMore.get();
328
boolean hasDemand = hasDemand(delegate);
329
if (debug.on())
330
debug.log("checkRequestMore: " + "canRequestMore=" + more
331
+ ", hasDemand=" + hasDemand
332
+ (delegate == null ? ", delegate=null" : ""));
333
if (hasDemand) {
334
subscriber.requestMore();
335
}
336
}
337
338
/**
339
* Must be called from within the scheduler main loop.
340
* Return true if the delegate is not null and has some demand.
341
* @param delegate The Http1AsyncDelegate delegate
342
* @return true if the delegate is not null and has some demand
343
*/
344
private boolean hasDemand(Http1AsyncDelegate delegate) {
345
if (delegate == null) return false;
346
AbstractSubscription subscription = delegate.subscription();
347
long demand = subscription.demand().get();
348
if (debug.on())
349
debug.log("downstream subscription demand is %s", demand);
350
return demand > 0;
351
}
352
353
/**
354
* Must be called from within the scheduler main loop.
355
* Handles pending delegate subscription.
356
* Return true if there was some pending delegate subscription and a new
357
* delegate was subscribed, false otherwise.
358
*
359
* @return true if there was some pending delegate subscription and a new
360
* delegate was subscribed, false otherwise.
361
*/
362
private boolean handlePendingDelegate() {
363
Http1AsyncDelegate pending = pendingDelegateRef.get();
364
if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
365
Http1AsyncDelegate delegate = this.delegate;
366
if (delegate != null) unsubscribe(delegate);
367
Consumer<Throwable> onSubscriptionError = (x) -> {
368
setRetryOnError(false);
369
stopRequested = true;
370
onReadError(x);
371
};
372
Runnable cancel = () -> {
373
if (debug.on())
374
debug.log("Downstream subscription cancelled by %s", pending);
375
// The connection should be closed, as some data may
376
// be left over in the stream.
377
try {
378
setRetryOnError(false);
379
pending.close(null);
380
onReadError(new IOException("subscription cancelled"));
381
unsubscribe(pending);
382
} finally {
383
Http1Exchange<?> exchg = owner;
384
stop();
385
if (exchg != null) exchg.connection().close();
386
}
387
};
388
// The subscription created by a delegate is only loosely
389
// coupled with the upstream subscription. This is partly because
390
// the header/body parser work with a flow of ByteBuffer, whereas
391
// we have a flow List<ByteBuffer> upstream.
392
Http1AsyncDelegateSubscription subscription =
393
new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
394
try {
395
pending.onSubscribe(subscription);
396
} finally {
397
this.delegate = delegate = pending;
398
}
399
final Object captured = delegate;
400
if (debug.on())
401
debug.log("delegate is now " + captured
402
+ ", demand=" + subscription.demand().get()
403
+ ", canRequestMore=" + canRequestMore.get()
404
+ ", queue.isEmpty=" + queue.isEmpty());
405
return true;
406
}
407
return false;
408
}
409
410
synchronized void setRetryOnError(boolean retry) {
411
this.retry = retry;
412
}
413
414
void clear() {
415
if (debug.on()) debug.log("cleared");
416
this.pendingDelegateRef.set(null);
417
this.delegate = null;
418
this.owner = null;
419
}
420
421
void subscribe(Http1AsyncDelegate delegate) {
422
synchronized(this) {
423
pendingDelegateRef.set(delegate);
424
}
425
if (queue.isEmpty()) {
426
canRequestMore.set(true);
427
}
428
if (debug.on())
429
debug.log("Subscribed pending " + delegate + " queue.isEmpty: "
430
+ queue.isEmpty());
431
// Everything may have been received already. Make sure
432
// we parse it.
433
if (client.isSelectorThread()) {
434
scheduler.runOrSchedule(executor);
435
} else {
436
scheduler.runOrSchedule();
437
}
438
}
439
440
// Used for debugging only!
441
long remaining() {
442
return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY));
443
}
444
445
void unsubscribe(Http1AsyncDelegate delegate) {
446
synchronized(this) {
447
if (this.delegate == delegate) {
448
if (debug.on()) debug.log("Unsubscribed %s", delegate);
449
this.delegate = null;
450
}
451
}
452
}
453
454
// Callback: Consumer of ByteBuffer
455
private void asyncReceive(ByteBuffer buf) {
456
if (debug.on())
457
debug.log("Putting %s bytes into the queue", buf.remaining());
458
received.addAndGet(buf.remaining());
459
queue.offer(buf);
460
461
// This callback is called from within the selector thread.
462
// Use an executor here to avoid doing the heavy lifting in the
463
// selector.
464
scheduler.runOrSchedule(executor);
465
}
466
467
// Callback: Consumer of Throwable
468
void onReadError(Throwable ex) {
469
Http1AsyncDelegate delegate;
470
Throwable recorded;
471
if (debug.on()) debug.log("onError: %s", (Object) ex);
472
synchronized (this) {
473
delegate = this.delegate;
474
recorded = error;
475
if (recorded == null) {
476
// retry is set to true by HttpExchange when the connection is
477
// already connected, which means it's been retrieved from
478
// the pool.
479
if (retry && (ex instanceof IOException)) {
480
// could be either EOFException, or
481
// IOException("connection reset by peer), or
482
// SSLHandshakeException resulting from the server having
483
// closed the SSL session.
484
if (received.get() == 0) {
485
// If we receive such an exception before having
486
// received any byte, then in this case, we will
487
// throw ConnectionExpiredException
488
// to try & force a retry of the request.
489
retry = false;
490
ex = new ConnectionExpiredException(ex);
491
}
492
}
493
error = ex;
494
}
495
}
496
497
final Throwable t = (recorded == null ? ex : recorded);
498
if (debug.on())
499
debug.log("recorded " + t + "\n\t delegate: " + delegate
500
+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
501
if (Log.errors()) {
502
Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
503
}
504
if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) {
505
// This callback is called from within the selector thread.
506
// Use an executor here to avoid doing the heavy lifting in the
507
// selector.
508
if (Log.errors()) {
509
Log.logError("HTTP/1 propagating recorded error: {0} - {1}", describe(), t);
510
}
511
scheduler.runOrSchedule(executor);
512
}
513
}
514
515
void stop() {
516
if (debug.on()) debug.log("stopping");
517
if (Log.channel() && !scheduler.isStopped()) {
518
Log.logChannel("HTTP/1 read subscriber stopped for {0}", describe());
519
}
520
scheduler.stop();
521
// make sure ref count is handled properly by
522
// closing the delegate.
523
Http1AsyncDelegate previous = delegate;
524
if (previous != null) previous.close(error);
525
delegate = null;
526
owner = null;
527
whenFinished.complete(null);
528
}
529
530
/**
531
* Returns the TubeSubscriber for reading from the connection flow.
532
* @return the TubeSubscriber for reading from the connection flow.
533
*/
534
TubeSubscriber subscriber() {
535
return subscriber;
536
}
537
538
/**
539
* A simple tube subscriber for reading from the connection flow.
540
*/
541
final class Http1TubeSubscriber implements TubeSubscriber {
542
volatile Flow.Subscription subscription;
543
volatile boolean completed;
544
volatile boolean dropped;
545
546
public void onSubscribe(Flow.Subscription subscription) {
547
// supports being called multiple time.
548
// doesn't cancel the previous subscription, since that is
549
// most probably the same as the new subscription.
550
if (debug.on()) debug.log("Received onSubscribed from upstream");
551
if (Log.channel()) {
552
Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe());
553
}
554
assert this.subscription == null || dropped == false;
555
this.subscription = subscription;
556
dropped = false;
557
canRequestMore.set(true);
558
if (delegate != null) {
559
scheduler.runOrSchedule(executor);
560
} else {
561
if (debug.on()) debug.log("onSubscribe: read delegate not present yet");
562
}
563
}
564
565
void requestMore() {
566
Flow.Subscription s = subscription;
567
if (s == null) return;
568
if (canRequestMore.compareAndSet(true, false)) {
569
if (!completed && !dropped) {
570
if (debug.on())
571
debug.log("Http1TubeSubscriber: requesting one more from upstream");
572
s.request(1);
573
return;
574
}
575
}
576
if (debug.on())
577
debug.log("Http1TubeSubscriber: no need to request more");
578
}
579
580
@Override
581
public void onNext(List<ByteBuffer> item) {
582
canRequestMore.set(item.isEmpty());
583
for (ByteBuffer buffer : item) {
584
asyncReceive(buffer);
585
}
586
}
587
588
@Override
589
public void onError(Throwable throwable) {
590
onReadError(throwable);
591
completed = true;
592
}
593
594
@Override
595
public void onComplete() {
596
onReadError(new EOFException("EOF reached while reading"));
597
completed = true;
598
}
599
600
public void dropSubscription() {
601
if (debug.on()) debug.log("Http1TubeSubscriber: dropSubscription");
602
// we could probably set subscription to null here...
603
// then we might not need the 'dropped' boolean?
604
dropped = true;
605
}
606
607
}
608
609
// Drains the content of the queue into a single ByteBuffer.
610
// The scheduler must be permanently stopped before calling drain().
611
ByteBuffer drain(ByteBuffer initial) {
612
// Revisit: need to clean that up.
613
//
614
ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial);
615
assert scheduler.isStopped();
616
617
if (queue.isEmpty()) return b;
618
619
// sanity check: we shouldn't have queued the same
620
// buffer twice.
621
ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]);
622
623
// the assertion looks suspicious, more investigation needed
624
//
625
// assert java.util.stream.Stream.of(qbb)
626
// .collect(Collectors.toSet())
627
// .size() == qbb.length : debugQBB(qbb);
628
629
// compute the number of bytes in the queue, the number of bytes
630
// in the initial buffer
631
// TODO: will need revisiting - as it is not guaranteed that all
632
// data will fit in single BB!
633
int size = Utils.remaining(qbb, Integer.MAX_VALUE);
634
int remaining = b.remaining();
635
int free = b.capacity() - b.position() - remaining;
636
if (debug.on())
637
debug.log("Flushing %s bytes from queue into initial buffer "
638
+ "(remaining=%s, free=%s)", size, remaining, free);
639
640
// check whether the initial buffer has enough space
641
if (size > free) {
642
if (debug.on())
643
debug.log("Allocating new buffer for initial: %s", (size + remaining));
644
// allocates a new buffer and copy initial to it
645
b = ByteBuffer.allocate(size + remaining);
646
Utils.copy(initial, b);
647
assert b.position() == remaining;
648
b.flip();
649
assert b.position() == 0;
650
assert b.limit() == remaining;
651
assert b.remaining() == remaining;
652
}
653
654
// store position and limit
655
int pos = b.position();
656
int limit = b.limit();
657
assert limit - pos == remaining;
658
assert b.capacity() >= remaining + size
659
: "capacity: " + b.capacity()
660
+ ", remaining: " + b.remaining()
661
+ ", size: " + size;
662
663
// prepare to copy the content of the queue
664
b.position(limit);
665
b.limit(pos + remaining + size);
666
assert b.remaining() >= size :
667
"remaining: " + b.remaining() + ", size: " + size;
668
669
// copy the content of the queue
670
int count = 0;
671
for (int i=0; i<qbb.length; i++) {
672
ByteBuffer b2 = qbb[i];
673
int r = b2.remaining();
674
assert b.remaining() >= r : "need at least " + r + " only "
675
+ b.remaining() + " available";
676
int copied = Utils.copy(b2, b);
677
assert copied == r : "copied="+copied+" available="+r;
678
assert b2.remaining() == 0;
679
count += copied;
680
}
681
assert count == size;
682
assert b.position() == pos + remaining + size :
683
"b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size;
684
685
// reset limit and position
686
b.limit(limit+size);
687
b.position(pos);
688
689
// we can clear the refs
690
queue.clear();
691
final ByteBuffer bb = b;
692
if (debug.on())
693
debug.log("Initial buffer now has " + bb.remaining()
694
+ " pos=" + bb.position() + " limit=" + bb.limit());
695
696
return b;
697
}
698
699
private String debugQBB(ByteBuffer[] qbb) {
700
StringBuilder msg = new StringBuilder();
701
List<ByteBuffer> lbb = Arrays.asList(qbb);
702
Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb));
703
704
int uniquebb = sbb.size();
705
msg.append("qbb: ").append(lbb.size())
706
.append(" (unique: ").append(uniquebb).append("), ")
707
.append("duplicates: ");
708
String sep = "";
709
for (ByteBuffer b : lbb) {
710
if (!sbb.remove(b)) {
711
msg.append(sep)
712
.append(String.valueOf(b))
713
.append("[remaining=")
714
.append(b.remaining())
715
.append(", position=")
716
.append(b.position())
717
.append(", capacity=")
718
.append(b.capacity())
719
.append("]");
720
sep = ", ";
721
}
722
}
723
return msg.toString();
724
}
725
726
volatile String dbgTag;
727
String dbgString() {
728
String tag = dbgTag;
729
if (tag == null) {
730
String flowTag = null;
731
Http1Exchange<?> exchg = owner;
732
Object flow = (exchg != null)
733
? exchg.connection().getConnectionFlow()
734
: null;
735
flowTag = tag = flow == null ? null: (String.valueOf(flow));
736
if (flowTag != null) {
737
dbgTag = tag = "Http1AsyncReceiver("+ flowTag + ")";
738
} else {
739
tag = "Http1AsyncReceiver(?)";
740
}
741
}
742
return tag;
743
}
744
}
745
746