Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
41171 views
1
/*
2
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.IOException;
29
import java.nio.ByteBuffer;
30
import java.util.List;
31
import java.util.Objects;
32
import java.util.concurrent.Flow;
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.concurrent.atomic.AtomicReference;
35
import java.nio.channels.SelectableChannel;
36
import java.nio.channels.SelectionKey;
37
import java.nio.channels.SocketChannel;
38
import java.util.ArrayList;
39
import java.util.concurrent.locks.Lock;
40
import java.util.concurrent.locks.ReentrantLock;
41
import java.util.function.Consumer;
42
import java.util.function.Supplier;
43
import jdk.internal.net.http.common.BufferSupplier;
44
import jdk.internal.net.http.common.Demand;
45
import jdk.internal.net.http.common.FlowTube;
46
import jdk.internal.net.http.common.Log;
47
import jdk.internal.net.http.common.Logger;
48
import jdk.internal.net.http.common.SequentialScheduler;
49
import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
50
import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
51
import jdk.internal.net.http.common.Utils;
52
53
/**
54
* A SocketTube is a terminal tube plugged directly into the socket.
55
* The read subscriber should call {@code subscribe} on the SocketTube before
56
* the SocketTube is subscribed to the write publisher.
57
*/
58
final class SocketTube implements FlowTube {
59
60
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
61
static final AtomicLong IDS = new AtomicLong();
62
63
private final HttpClientImpl client;
64
private final SocketChannel channel;
65
private final SliceBufferSource sliceBuffersSource;
66
private final Object lock = new Object();
67
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
68
private final InternalReadPublisher readPublisher;
69
private final InternalWriteSubscriber writeSubscriber;
70
private final long id = IDS.incrementAndGet();
71
72
public SocketTube(HttpClientImpl client, SocketChannel channel,
73
Supplier<ByteBuffer> buffersFactory) {
74
this.client = client;
75
this.channel = channel;
76
this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
77
78
this.readPublisher = new InternalReadPublisher();
79
this.writeSubscriber = new InternalWriteSubscriber();
80
}
81
82
/**
83
* Returns {@code true} if this flow is finished.
84
* This happens when this flow internal read subscription is completed,
85
* either normally (EOF reading) or exceptionally (EOF writing, or
86
* underlying socket closed, or some exception occurred while reading or
87
* writing to the socket).
88
*
89
* @return {@code true} if this flow is finished.
90
*/
91
public boolean isFinished() {
92
InternalReadPublisher.InternalReadSubscription subscription =
93
readPublisher.subscriptionImpl;
94
return subscription != null && subscription.completed
95
|| subscription == null && errorRef.get() != null;
96
}
97
98
// ===================================================================== //
99
// Flow.Publisher //
100
// ======================================================================//
101
102
/**
103
* {@inheritDoc }
104
* @apiNote This method should be called first. In particular, the caller
105
* must ensure that this method must be called by the read
106
* subscriber before the write publisher can call {@code onSubscribe}.
107
* Failure to adhere to this contract may result in assertion errors.
108
*/
109
@Override
110
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
111
Objects.requireNonNull(s);
112
assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
113
readPublisher.subscribe(s);
114
}
115
116
117
// ===================================================================== //
118
// Flow.Subscriber //
119
// ======================================================================//
120
121
/**
122
* {@inheritDoc }
123
* @apiNote The caller must ensure that {@code subscribe} is called by
124
* the read subscriber before {@code onSubscribe} is called by
125
* the write publisher.
126
* Failure to adhere to this contract may result in assertion errors.
127
*/
128
@Override
129
public void onSubscribe(Flow.Subscription subscription) {
130
writeSubscriber.onSubscribe(subscription);
131
}
132
133
@Override
134
public void onNext(List<ByteBuffer> item) {
135
writeSubscriber.onNext(item);
136
}
137
138
@Override
139
public void onError(Throwable throwable) {
140
writeSubscriber.onError(throwable);
141
}
142
143
@Override
144
public void onComplete() {
145
writeSubscriber.onComplete();
146
}
147
148
// ===================================================================== //
149
// Events //
150
// ======================================================================//
151
152
void signalClosed() {
153
// Ensures that the subscriber will be terminated and that future
154
// subscribers will be notified when the connection is closed.
155
if (Log.channel()) {
156
Log.logChannel("Connection close signalled: connection closed locally ({0})",
157
channelDescr());
158
}
159
readPublisher.subscriptionImpl.signalError(
160
new IOException("connection closed locally"));
161
}
162
163
/**
164
* A restartable task used to process tasks in sequence.
165
*/
166
private static class SocketFlowTask implements RestartableTask {
167
final Runnable task;
168
private final Lock lock = new ReentrantLock();
169
SocketFlowTask(Runnable task) {
170
this.task = task;
171
}
172
@Override
173
public final void run(DeferredCompleter taskCompleter) {
174
try {
175
// The logics of the sequential scheduler should ensure that
176
// the restartable task is running in only one thread at
177
// a given time: there should never be contention.
178
boolean locked = lock.tryLock();
179
assert locked : "contention detected in SequentialScheduler";
180
try {
181
task.run();
182
} finally {
183
if (locked) lock.unlock();
184
}
185
} finally {
186
taskCompleter.complete();
187
}
188
}
189
}
190
191
// This is best effort - there's no guarantee that the printed set of values
192
// is consistent. It should only be considered as weakly accurate - in
193
// particular in what concerns the events states, especially when displaying
194
// a read event state from a write event callback and conversely.
195
void debugState(String when) {
196
if (debug.on()) {
197
StringBuilder state = new StringBuilder();
198
199
InternalReadPublisher.InternalReadSubscription sub =
200
readPublisher.subscriptionImpl;
201
InternalReadPublisher.ReadEvent readEvent =
202
sub == null ? null : sub.readEvent;
203
Demand rdemand = sub == null ? null : sub.demand;
204
InternalWriteSubscriber.WriteEvent writeEvent =
205
writeSubscriber.writeEvent;
206
Demand wdemand = writeSubscriber.writeDemand;
207
int rops = readEvent == null ? 0 : readEvent.interestOps();
208
long rd = rdemand == null ? 0 : rdemand.get();
209
int wops = writeEvent == null ? 0 : writeEvent.interestOps();
210
long wd = wdemand == null ? 0 : wdemand.get();
211
212
state.append(when).append(" Reading: [ops=")
213
.append(rops).append(", demand=").append(rd)
214
.append(", stopped=")
215
.append((sub == null ? false : sub.readScheduler.isStopped()))
216
.append("], Writing: [ops=").append(wops)
217
.append(", demand=").append(wd)
218
.append("]");
219
debug.log(state.toString());
220
}
221
}
222
223
/**
224
* A repeatable event that can be paused or resumed by changing its
225
* interestOps. When the event is fired, it is first paused before being
226
* signaled. It is the responsibility of the code triggered by
227
* {@code signalEvent} to resume the event if required.
228
*/
229
private static abstract class SocketFlowEvent extends AsyncEvent {
230
final SocketChannel channel;
231
final int defaultInterest;
232
volatile int interestOps;
233
volatile boolean registered;
234
SocketFlowEvent(int defaultInterest, SocketChannel channel) {
235
super(AsyncEvent.REPEATING);
236
this.defaultInterest = defaultInterest;
237
this.channel = channel;
238
}
239
final boolean registered() {return registered;}
240
final void resume() {
241
interestOps = defaultInterest;
242
registered = true;
243
}
244
final void pause() {interestOps = 0;}
245
@Override
246
public final SelectableChannel channel() {return channel;}
247
@Override
248
public final int interestOps() {return interestOps;}
249
250
@Override
251
public final void handle() {
252
pause(); // pause, then signal
253
signalEvent(); // won't be fired again until resumed.
254
}
255
@Override
256
public final void abort(IOException error) {
257
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
258
pause(); // pause, then signal
259
signalError(error); // should not be resumed after abort (not checked)
260
}
261
262
protected abstract void signalEvent();
263
protected abstract void signalError(Throwable error);
264
abstract Logger debug();
265
}
266
267
// ===================================================================== //
268
// Writing //
269
// ======================================================================//
270
271
// This class makes the assumption that the publisher will call onNext
272
// sequentially, and that onNext won't be called if the demand has not been
273
// incremented by request(1).
274
// It has a 'queue of 1' meaning that it will call request(1) in
275
// onSubscribe, and then only after its 'current' buffer list has been
276
// fully written and current set to null;
277
private final class InternalWriteSubscriber
278
implements Flow.Subscriber<List<ByteBuffer>> {
279
280
volatile WriteSubscription subscription;
281
volatile List<ByteBuffer> current;
282
volatile boolean completed;
283
final AsyncTriggerEvent startSubscription =
284
new AsyncTriggerEvent(this::signalError, this::startSubscription);
285
final WriteEvent writeEvent = new WriteEvent(channel, this);
286
final Demand writeDemand = new Demand();
287
288
@Override
289
public void onSubscribe(Flow.Subscription subscription) {
290
WriteSubscription previous = this.subscription;
291
if (debug.on()) debug.log("subscribed for writing");
292
try {
293
boolean needEvent = current == null;
294
if (needEvent) {
295
if (previous != null && previous.upstreamSubscription != subscription) {
296
previous.dropSubscription();
297
}
298
}
299
this.subscription = new WriteSubscription(subscription);
300
if (needEvent) {
301
if (debug.on())
302
debug.log("write: registering startSubscription event");
303
client.registerEvent(startSubscription);
304
}
305
} catch (Throwable t) {
306
signalError(t);
307
}
308
}
309
310
@Override
311
public void onNext(List<ByteBuffer> bufs) {
312
assert current == null : dbgString() // this is a queue of 1.
313
+ "w.onNext current: " + current;
314
assert subscription != null : dbgString()
315
+ "w.onNext: subscription is null";
316
current = bufs;
317
tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
318
// For instance in HTTP/2, a received SETTINGS frame might trigger
319
// the sending of a SETTINGS frame in turn which might cause
320
// onNext to be called from within the same selector thread that the
321
// original SETTINGS frames arrived on. If rs is the read-subscriber
322
// and ws is the write-subscriber then the following can occur:
323
// ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
324
// client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
325
debugState("leaving w.onNext");
326
}
327
328
// Don't use a SequentialScheduler here: rely on onNext() being invoked
329
// sequentially, and not being invoked if there is no demand, request(1).
330
// onNext is usually called from within a user / executor thread.
331
// Initial writing will be performed in that thread. If for some reason,
332
// not all the data can be written, a writeEvent will be registered, and
333
// writing will resume in the selector manager thread when the
334
// writeEvent is fired.
335
//
336
// If this method is invoked in the selector manager thread (because of
337
// a writeEvent), then the executor will be used to invoke request(1),
338
// ensuring that onNext() won't be invoked from within the selector
339
// thread. If not in the selector manager thread, then request(1) is
340
// invoked directly.
341
void tryFlushCurrent(boolean inSelectorThread) {
342
List<ByteBuffer> bufs = current;
343
if (bufs == null) return;
344
try {
345
assert inSelectorThread == client.isSelectorThread() :
346
"should " + (inSelectorThread ? "" : "not ")
347
+ " be in the selector thread";
348
long remaining = Utils.remaining(bufs);
349
if (debug.on()) debug.log("trying to write: %d", remaining);
350
long written = writeAvailable(bufs);
351
if (debug.on()) debug.log("wrote: %d", written);
352
assert written >= 0 : "negative number of bytes written:" + written;
353
assert written <= remaining;
354
if (remaining - written == 0) {
355
current = null;
356
if (writeDemand.tryDecrement()) {
357
Runnable requestMore = this::requestMore;
358
if (inSelectorThread) {
359
assert client.isSelectorThread();
360
client.theExecutor().execute(requestMore);
361
} else {
362
assert !client.isSelectorThread();
363
requestMore.run();
364
}
365
}
366
} else {
367
resumeWriteEvent(inSelectorThread);
368
}
369
} catch (Throwable t) {
370
signalError(t);
371
}
372
}
373
374
// Kick off the initial request:1 that will start the writing side.
375
// Invoked in the selector manager thread.
376
void startSubscription() {
377
try {
378
if (debug.on()) debug.log("write: starting subscription");
379
if (Log.channel()) {
380
Log.logChannel("Start requesting bytes for writing to channel: {0}",
381
channelDescr());
382
}
383
assert client.isSelectorThread();
384
// make sure read registrations are handled before;
385
readPublisher.subscriptionImpl.handlePending();
386
if (debug.on()) debug.log("write: offloading requestMore");
387
// start writing;
388
client.theExecutor().execute(this::requestMore);
389
} catch(Throwable t) {
390
signalError(t);
391
}
392
}
393
394
void requestMore() {
395
WriteSubscription subscription = this.subscription;
396
subscription.requestMore();
397
}
398
399
@Override
400
public void onError(Throwable throwable) {
401
signalError(throwable);
402
}
403
404
@Override
405
public void onComplete() {
406
completed = true;
407
// no need to pause the write event here: the write event will
408
// be paused if there is nothing more to write.
409
List<ByteBuffer> bufs = current;
410
long remaining = bufs == null ? 0 : Utils.remaining(bufs);
411
if (debug.on())
412
debug.log( "write completed, %d yet to send", remaining);
413
debugState("InternalWriteSubscriber::onComplete");
414
}
415
416
void resumeWriteEvent(boolean inSelectorThread) {
417
if (debug.on()) debug.log("scheduling write event");
418
resumeEvent(writeEvent, this::signalError);
419
}
420
421
void signalWritable() {
422
if (debug.on()) debug.log("channel is writable");
423
tryFlushCurrent(true);
424
}
425
426
void signalError(Throwable error) {
427
debug.log(() -> "write error: " + error);
428
if (Log.channel()) {
429
Log.logChannel("Failed to write to channel ({0}: {1})",
430
channelDescr(), error);
431
}
432
completed = true;
433
readPublisher.signalError(error);
434
Flow.Subscription subscription = this.subscription;
435
if (subscription != null) subscription.cancel();
436
}
437
438
// A repeatable WriteEvent which is paused after firing and can
439
// be resumed if required - see SocketFlowEvent;
440
final class WriteEvent extends SocketFlowEvent {
441
final InternalWriteSubscriber sub;
442
WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
443
super(SelectionKey.OP_WRITE, channel);
444
this.sub = sub;
445
}
446
@Override
447
protected final void signalEvent() {
448
try {
449
client.eventUpdated(this);
450
sub.signalWritable();
451
} catch(Throwable t) {
452
sub.signalError(t);
453
}
454
}
455
456
@Override
457
protected void signalError(Throwable error) {
458
sub.signalError(error);
459
}
460
461
@Override
462
Logger debug() { return debug; }
463
}
464
465
final class WriteSubscription implements Flow.Subscription {
466
final Flow.Subscription upstreamSubscription;
467
volatile boolean cancelled;
468
WriteSubscription(Flow.Subscription subscription) {
469
this.upstreamSubscription = subscription;
470
}
471
472
@Override
473
public void request(long n) {
474
if (cancelled) return;
475
upstreamSubscription.request(n);
476
}
477
478
@Override
479
public void cancel() {
480
if (cancelled) return;
481
if (debug.on()) debug.log("write: cancel");
482
if (Log.channel()) {
483
Log.logChannel("Cancelling write subscription");
484
}
485
dropSubscription();
486
upstreamSubscription.cancel();
487
}
488
489
void dropSubscription() {
490
synchronized (InternalWriteSubscriber.this) {
491
cancelled = true;
492
if (debug.on()) debug.log("write: resetting demand to 0");
493
writeDemand.reset();
494
}
495
}
496
497
void requestMore() {
498
try {
499
if (completed || cancelled) return;
500
boolean requestMore;
501
long d;
502
// don't fiddle with demand after cancel.
503
// see dropSubscription.
504
synchronized (InternalWriteSubscriber.this) {
505
if (cancelled) return;
506
d = writeDemand.get();
507
requestMore = writeDemand.increaseIfFulfilled();
508
}
509
if (requestMore) {
510
if (debug.on()) debug.log("write: requesting more...");
511
upstreamSubscription.request(1);
512
} else {
513
if (debug.on())
514
debug.log("write: no need to request more: %d", d);
515
}
516
} catch (Throwable t) {
517
if (debug.on())
518
debug.log("write: error while requesting more: " + t);
519
signalError(t);
520
} finally {
521
debugState("leaving requestMore: ");
522
}
523
}
524
}
525
}
526
527
// ===================================================================== //
528
// Reading //
529
// ===================================================================== //
530
531
// The InternalReadPublisher uses a SequentialScheduler to ensure that
532
// onNext/onError/onComplete are called sequentially on the caller's
533
// subscriber.
534
// However, it relies on the fact that the only time where
535
// runOrSchedule() is called from a user/executor thread is in signalError,
536
// right after the errorRef has been set.
537
// Because the sequential scheduler's task always checks for errors first,
538
// and always terminate the scheduler on error, then it is safe to assume
539
// that if it reaches the point where it reads from the channel, then
540
// it is running in the SelectorManager thread. This is because all
541
// other invocation of runOrSchedule() are triggered from within a
542
// ReadEvent.
543
//
544
// When pausing/resuming the event, some shortcuts can then be taken
545
// when we know we're running in the selector manager thread
546
// (in that case there's no need to call client.eventUpdated(readEvent);
547
//
548
private final class InternalReadPublisher
549
implements Flow.Publisher<List<ByteBuffer>> {
550
private final InternalReadSubscription subscriptionImpl
551
= new InternalReadSubscription();
552
AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
553
private volatile ReadSubscription subscription;
554
555
@Override
556
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
557
Objects.requireNonNull(s);
558
559
TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
560
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
561
ReadSubscription previous = pendingSubscription.getAndSet(target);
562
563
if (previous != null && previous != target) {
564
if (debug.on())
565
debug.log("read publisher: dropping pending subscriber: "
566
+ previous.subscriber);
567
previous.errorRef.compareAndSet(null, errorRef.get());
568
previous.signalOnSubscribe();
569
if (subscriptionImpl.completed) {
570
previous.signalCompletion();
571
} else {
572
previous.subscriber.dropSubscription();
573
}
574
}
575
576
if (debug.on()) debug.log("read publisher got subscriber");
577
subscriptionImpl.signalSubscribe();
578
debugState("leaving read.subscribe: ");
579
}
580
581
void signalError(Throwable error) {
582
if (debug.on()) debug.log("error signalled " + error);
583
if (!errorRef.compareAndSet(null, error)) {
584
return;
585
}
586
if (Log.channel()) {
587
Log.logChannel("Error signalled on channel {0}: {1}",
588
channelDescr(), error);
589
}
590
subscriptionImpl.handleError();
591
}
592
593
final class ReadSubscription implements Flow.Subscription {
594
final InternalReadSubscription impl;
595
final TubeSubscriber subscriber;
596
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
597
final BufferSource bufferSource;
598
volatile boolean subscribed;
599
volatile boolean cancelled;
600
volatile boolean completed;
601
602
public ReadSubscription(InternalReadSubscription impl,
603
TubeSubscriber subscriber) {
604
this.impl = impl;
605
this.bufferSource = subscriber.supportsRecycling()
606
? new SSLDirectBufferSource(client)
607
: SocketTube.this.sliceBuffersSource;
608
this.subscriber = subscriber;
609
}
610
611
@Override
612
public void cancel() {
613
cancelled = true;
614
}
615
616
@Override
617
public void request(long n) {
618
if (!cancelled) {
619
impl.request(n);
620
} else {
621
if (debug.on())
622
debug.log("subscription cancelled, ignoring request %d", n);
623
}
624
}
625
626
void signalCompletion() {
627
assert subscribed || cancelled;
628
if (completed || cancelled) return;
629
synchronized (this) {
630
if (completed) return;
631
completed = true;
632
}
633
Throwable error = errorRef.get();
634
if (error != null) {
635
if (debug.on())
636
debug.log("forwarding error to subscriber: " + error);
637
subscriber.onError(error);
638
} else {
639
if (debug.on()) debug.log("completing subscriber");
640
subscriber.onComplete();
641
}
642
}
643
644
void signalOnSubscribe() {
645
if (subscribed || cancelled) return;
646
synchronized (this) {
647
if (subscribed || cancelled) return;
648
subscribed = true;
649
}
650
subscriber.onSubscribe(this);
651
if (debug.on()) debug.log("onSubscribe called");
652
if (errorRef.get() != null) {
653
signalCompletion();
654
}
655
}
656
}
657
658
final class InternalReadSubscription implements Flow.Subscription {
659
660
private final Demand demand = new Demand();
661
final SequentialScheduler readScheduler;
662
private volatile boolean completed;
663
private final ReadEvent readEvent;
664
private final AsyncEvent subscribeEvent;
665
666
InternalReadSubscription() {
667
readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
668
subscribeEvent = new AsyncTriggerEvent(this::signalError,
669
this::handleSubscribeEvent);
670
readEvent = new ReadEvent(channel, this);
671
}
672
673
/*
674
* This method must be invoked before any other method of this class.
675
*/
676
final void signalSubscribe() {
677
if (readScheduler.isStopped() || completed) {
678
// if already completed or stopped we can handle any
679
// pending connection directly from here.
680
if (debug.on())
681
debug.log("handling pending subscription while completed");
682
handlePending();
683
} else {
684
try {
685
if (debug.on()) debug.log("registering subscribe event");
686
client.registerEvent(subscribeEvent);
687
} catch (Throwable t) {
688
signalError(t);
689
handlePending();
690
}
691
}
692
}
693
694
final void handleSubscribeEvent() {
695
assert client.isSelectorThread();
696
debug.log("subscribe event raised");
697
if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
698
readScheduler.runOrSchedule();
699
if (readScheduler.isStopped() || completed) {
700
// if already completed or stopped we can handle any
701
// pending connection directly from here.
702
if (debug.on())
703
debug.log("handling pending subscription when completed");
704
handlePending();
705
}
706
}
707
708
709
/*
710
* Although this method is thread-safe, the Reactive-Streams spec seems
711
* to not require it to be as such. It's a responsibility of the
712
* subscriber to signal demand in a thread-safe manner.
713
*
714
* See Reactive Streams specification, rules 2.7 and 3.4.
715
*/
716
@Override
717
public final void request(long n) {
718
if (n > 0L) {
719
boolean wasFulfilled = demand.increase(n);
720
if (wasFulfilled) {
721
if (debug.on()) debug.log("got some demand for reading");
722
resumeReadEvent();
723
// if demand has been changed from fulfilled
724
// to unfulfilled register read event;
725
}
726
} else {
727
signalError(new IllegalArgumentException("non-positive request"));
728
}
729
debugState("leaving request("+n+"): ");
730
}
731
732
@Override
733
public final void cancel() {
734
pauseReadEvent();
735
if (debug.on()) debug.log("Read subscription cancelled");
736
if (Log.channel()) {
737
Log.logChannel("Read subscription cancelled for channel {0}",
738
channelDescr());
739
}
740
if (debug.on()) debug.log("Stopping read scheduler");
741
readScheduler.stop();
742
}
743
744
private void resumeReadEvent() {
745
if (debug.on()) debug.log("resuming read event");
746
resumeEvent(readEvent, this::signalError);
747
}
748
749
private void pauseReadEvent() {
750
if (debug.on()) debug.log("pausing read event");
751
pauseEvent(readEvent, this::signalError);
752
}
753
754
755
final void handleError() {
756
assert errorRef.get() != null;
757
readScheduler.runOrSchedule();
758
}
759
760
final void signalError(Throwable error) {
761
if (debug.on()) debug.log("signal read error: " + error);
762
if (!errorRef.compareAndSet(null, error)) {
763
return;
764
}
765
if (debug.on()) debug.log("got read error: " + error);
766
if (Log.channel()) {
767
Log.logChannel("Read error signalled on channel {0}: {1}",
768
channelDescr(), error);
769
}
770
readScheduler.runOrSchedule();
771
}
772
773
final void signalReadable() {
774
readScheduler.runOrSchedule();
775
}
776
777
/** The body of the task that runs in SequentialScheduler. */
778
final void read() {
779
// It is important to only call pauseReadEvent() when stopping
780
// the scheduler. The event is automatically paused before
781
// firing, and trying to pause it again could cause a race
782
// condition between this loop, which calls tryDecrementDemand(),
783
// and the thread that calls request(n), which will try to resume
784
// reading.
785
try {
786
while(!readScheduler.isStopped()) {
787
if (completed) return;
788
789
// make sure we have a subscriber
790
if (handlePending()) {
791
if (debug.on())
792
debug.log("pending subscriber subscribed");
793
return;
794
}
795
796
// If an error was signaled, we might not be in the
797
// the selector thread, and that is OK, because we
798
// will just call onError and return.
799
ReadSubscription current = subscription;
800
Throwable error = errorRef.get();
801
if (current == null) {
802
assert error != null;
803
if (debug.on())
804
debug.log("error raised before subscriber subscribed: %s",
805
(Object)error);
806
return;
807
}
808
TubeSubscriber subscriber = current.subscriber;
809
if (error != null) {
810
completed = true;
811
// safe to pause here because we're finished anyway.
812
pauseReadEvent();
813
if (debug.on())
814
debug.log("Sending error " + error
815
+ " to subscriber " + subscriber);
816
if (Log.channel()) {
817
Log.logChannel("Raising error with subscriber for {0}: {1}",
818
channelDescr(), error);
819
}
820
current.errorRef.compareAndSet(null, error);
821
current.signalCompletion();
822
if (debug.on()) debug.log("Stopping read scheduler");
823
readScheduler.stop();
824
debugState("leaving read() loop with error: ");
825
return;
826
}
827
828
// If we reach here then we must be in the selector thread.
829
assert client.isSelectorThread();
830
if (demand.tryDecrement()) {
831
// we have demand.
832
try {
833
List<ByteBuffer> bytes = readAvailable(current.bufferSource);
834
if (bytes == EOF) {
835
if (!completed) {
836
if (debug.on()) debug.log("got read EOF");
837
if (Log.channel()) {
838
Log.logChannel("EOF read from channel: {0}",
839
channelDescr());
840
}
841
completed = true;
842
// safe to pause here because we're finished
843
// anyway.
844
pauseReadEvent();
845
current.signalCompletion();
846
if (debug.on()) debug.log("Stopping read scheduler");
847
readScheduler.stop();
848
}
849
debugState("leaving read() loop after EOF: ");
850
return;
851
} else if (Utils.remaining(bytes) > 0) {
852
// the subscriber is responsible for offloading
853
// to another thread if needed.
854
if (debug.on())
855
debug.log("read bytes: " + Utils.remaining(bytes));
856
assert !current.completed;
857
subscriber.onNext(bytes);
858
// we could continue looping until the demand
859
// reaches 0. However, that would risk starving
860
// other connections (bound to other socket
861
// channels) - as other selected keys activated
862
// by the selector manager thread might be
863
// waiting for this event to terminate.
864
// So resume the read event and return now...
865
resumeReadEvent();
866
if (errorRef.get() != null) continue;
867
debugState("leaving read() loop after onNext: ");
868
return;
869
} else {
870
// nothing available!
871
if (debug.on()) debug.log("no more bytes available");
872
// re-increment the demand and resume the read
873
// event. This ensures that this loop is
874
// executed again when the socket becomes
875
// readable again.
876
demand.increase(1);
877
resumeReadEvent();
878
if (errorRef.get() != null) continue;
879
debugState("leaving read() loop with no bytes");
880
return;
881
}
882
} catch (Throwable x) {
883
signalError(x);
884
continue;
885
}
886
} else {
887
if (debug.on()) debug.log("no more demand for reading");
888
// the event is paused just after firing, so it should
889
// still be paused here, unless the demand was just
890
// incremented from 0 to n, in which case, the
891
// event will be resumed, causing this loop to be
892
// invoked again when the socket becomes readable:
893
// This is what we want.
894
// Trying to pause the event here would actually
895
// introduce a race condition between this loop and
896
// request(n).
897
if (errorRef.get() != null) continue;
898
debugState("leaving read() loop with no demand");
899
break;
900
}
901
}
902
} catch (Throwable t) {
903
if (debug.on()) debug.log("Unexpected exception in read loop", t);
904
signalError(t);
905
} finally {
906
if (readScheduler.isStopped()) {
907
if (debug.on()) debug.log("Read scheduler stopped");
908
if (Log.channel()) {
909
Log.logChannel("Stopped reading from channel {0}", channelDescr());
910
}
911
}
912
handlePending();
913
}
914
}
915
916
boolean handlePending() {
917
ReadSubscription pending = pendingSubscription.getAndSet(null);
918
if (pending == null) return false;
919
if (debug.on())
920
debug.log("handling pending subscription for %s",
921
pending.subscriber);
922
ReadSubscription current = subscription;
923
if (current != null && current != pending && !completed) {
924
current.subscriber.dropSubscription();
925
}
926
if (debug.on()) debug.log("read demand reset to 0");
927
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
928
pending.errorRef.compareAndSet(null, errorRef.get());
929
if (!readScheduler.isStopped()) {
930
subscription = pending;
931
} else {
932
if (debug.on()) debug.log("socket tube is already stopped");
933
}
934
if (debug.on()) debug.log("calling onSubscribe");
935
pending.signalOnSubscribe();
936
if (completed) {
937
pending.errorRef.compareAndSet(null, errorRef.get());
938
pending.signalCompletion();
939
}
940
return true;
941
}
942
}
943
944
945
// A repeatable ReadEvent which is paused after firing and can
946
// be resumed if required - see SocketFlowEvent;
947
final class ReadEvent extends SocketFlowEvent {
948
final InternalReadSubscription sub;
949
ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
950
super(SelectionKey.OP_READ, channel);
951
this.sub = sub;
952
}
953
@Override
954
protected final void signalEvent() {
955
try {
956
client.eventUpdated(this);
957
sub.signalReadable();
958
} catch(Throwable t) {
959
sub.signalError(t);
960
}
961
}
962
963
@Override
964
protected final void signalError(Throwable error) {
965
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
966
sub.signalError(error);
967
}
968
969
@Override
970
Logger debug() { return debug; }
971
}
972
}
973
974
// ===================================================================== //
975
// Buffer Management //
976
// ===================================================================== //
977
978
// This interface is used by readAvailable(BufferSource);
979
public interface BufferSource {
980
/**
981
* Returns a buffer to read data from the socket.
982
*
983
* @implNote
984
* Different implementation can have different strategies, as to
985
* which kind of buffer to return, or whether to return the same
986
* buffer. The only constraints are that:
987
* a. the buffer returned must not be null
988
* b. the buffer position indicates where to start reading
989
* c. the buffer limit indicates where to stop reading.
990
* d. the buffer is 'free' - that is - it is not used
991
* or retained by anybody else
992
*
993
* @return A buffer to read data from the socket.
994
*/
995
ByteBuffer getBuffer();
996
997
/**
998
* Appends the read-data in {@code buffer} to the list of buffer to
999
* be sent downstream to the subscriber. May return a new
1000
* list, or append to the given list.
1001
*
1002
* @implNote
1003
* Different implementation can have different strategies, but
1004
* must obviously be consistent with the implementation of the
1005
* getBuffer() method. For instance, an implementation could
1006
* decide to add the buffer to the list and return a new buffer
1007
* next time getBuffer() is called, or could decide to add a buffer
1008
* slice to the list and return the same buffer (if remaining
1009
* space is available) next time getBuffer() is called.
1010
*
1011
* @param list The list before adding the data. Can be null.
1012
* @param buffer The buffer containing the data to add to the list.
1013
* @param start The start position at which data were read.
1014
* The current buffer position indicates the end.
1015
* @return A possibly new list where a buffer containing the
1016
* data read from the socket has been added.
1017
*/
1018
List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
1019
1020
/**
1021
* Returns the given unused {@code buffer}, previously obtained from
1022
* {@code getBuffer}.
1023
*
1024
* @implNote This method can be used, if necessary, to return
1025
* the unused buffer to the pull.
1026
*
1027
* @param buffer The unused buffer.
1028
*/
1029
default void returnUnused(ByteBuffer buffer) { }
1030
}
1031
1032
// An implementation of BufferSource used for unencrypted data.
1033
// This buffer source uses heap buffers and avoids wasting memory
1034
// by forwarding read-only buffer slices downstream.
1035
// Buffers allocated through this source are simply GC'ed when
1036
// they are no longer referenced.
1037
private static final class SliceBufferSource implements BufferSource {
1038
private final Supplier<ByteBuffer> factory;
1039
private volatile ByteBuffer current;
1040
1041
public SliceBufferSource() {
1042
this(Utils::getBuffer);
1043
}
1044
public SliceBufferSource(Supplier<ByteBuffer> factory) {
1045
this.factory = Objects.requireNonNull(factory);
1046
}
1047
1048
// Reuses the same buffer if some space remains available.
1049
// Otherwise, returns a new heap buffer.
1050
@Override
1051
public final ByteBuffer getBuffer() {
1052
ByteBuffer buf = current;
1053
buf = (buf == null || !buf.hasRemaining())
1054
? (current = factory.get()) : buf;
1055
assert buf.hasRemaining();
1056
return buf;
1057
}
1058
1059
// Adds a read-only slice to the list, potentially returning a
1060
// new list with that slice at the end.
1061
@Override
1062
public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
1063
// creates a slice to add to the list
1064
int limit = buf.limit();
1065
buf.limit(buf.position());
1066
buf.position(start);
1067
ByteBuffer slice = buf.slice();
1068
1069
// restore buffer state to what it was before creating the slice
1070
buf.position(buf.limit());
1071
buf.limit(limit);
1072
1073
// add the buffer to the list
1074
return SocketTube.listOf(list, slice.asReadOnlyBuffer());
1075
}
1076
}
1077
1078
1079
// An implementation of BufferSource used for encrypted data.
1080
// This buffer source uses direct byte buffers that will be
1081
// recycled by the SocketTube subscriber.
1082
//
1083
private static final class SSLDirectBufferSource implements BufferSource {
1084
private final BufferSupplier factory;
1085
private final HttpClientImpl client;
1086
private ByteBuffer current;
1087
1088
public SSLDirectBufferSource(HttpClientImpl client) {
1089
this.client = Objects.requireNonNull(client);
1090
this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
1091
}
1092
1093
// Obtains a 'free' byte buffer from the pool, or returns
1094
// the same buffer if nothing was read at the previous cycle.
1095
// The subscriber will be responsible for recycling this
1096
// buffer into the pool (see SSLFlowDelegate.Reader)
1097
@Override
1098
public final ByteBuffer getBuffer() {
1099
assert client.isSelectorThread();
1100
ByteBuffer buf = current;
1101
if (buf == null) {
1102
buf = current = factory.get();
1103
}
1104
assert buf.hasRemaining();
1105
assert buf.position() == 0;
1106
return buf;
1107
}
1108
1109
// Adds the buffer to the list. The buffer will be later returned to the
1110
// pool by the subscriber (see SSLFlowDelegate.Reader).
1111
// The next buffer returned by getBuffer() will be obtained from the
1112
// pool. It might be the same buffer or another one.
1113
// Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
1114
// recycling will happen in the flow before onNext returns, then the
1115
// pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
1116
// it's shared by all SSL connections opened on that client.
1117
@Override
1118
public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
1119
assert client.isSelectorThread();
1120
assert buf.isDirect();
1121
assert start == 0;
1122
assert current == buf;
1123
current = null;
1124
buf.limit(buf.position());
1125
buf.position(start);
1126
// add the buffer to the list
1127
return SocketTube.listOf(list, buf);
1128
}
1129
1130
@Override
1131
public void returnUnused(ByteBuffer buffer) {
1132
// if current is null, then the buffer will have been added to the
1133
// list, through append. Otherwise, current is not null, and needs
1134
// to be returned to prevent the buffer supplier pool from growing
1135
// to more than MAX_BUFFERS.
1136
assert buffer == current;
1137
ByteBuffer buf = current;
1138
if (buf != null) {
1139
assert buf.position() == 0;
1140
current = null;
1141
// the supplier assert if buf has remaining
1142
buf.limit(buf.position());
1143
factory.recycle(buf);
1144
}
1145
}
1146
}
1147
1148
// ===================================================================== //
1149
// Socket Channel Read/Write //
1150
// ===================================================================== //
1151
static final int MAX_BUFFERS = 3;
1152
static final List<ByteBuffer> EOF = List.of();
1153
static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
1154
1155
// readAvailable() will read bytes into the 'current' ByteBuffer until
1156
// the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
1157
// When that happens, a slice of the data that has been read so far
1158
// is inserted into the returned buffer list, and if the current buffer
1159
// has remaining space, that space will be used to read more data when
1160
// the channel becomes readable again.
1161
private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
1162
ByteBuffer buf = buffersSource.getBuffer();
1163
assert buf.hasRemaining();
1164
1165
int read;
1166
int pos = buf.position();
1167
List<ByteBuffer> list = null;
1168
while (buf.hasRemaining()) {
1169
try {
1170
while ((read = channel.read(buf)) > 0) {
1171
if (!buf.hasRemaining())
1172
break;
1173
}
1174
} catch (IOException x) {
1175
if (buf.position() == pos && list == null) {
1176
// make sure that the buffer source will recycle
1177
// 'buf' if needed
1178
buffersSource.returnUnused(buf);
1179
// no bytes have been read, just throw...
1180
throw x;
1181
} else {
1182
// some bytes have been read, return them and fail next time
1183
errorRef.compareAndSet(null, x);
1184
read = 0; // ensures outer loop will exit
1185
}
1186
}
1187
1188
// nothing read;
1189
if (buf.position() == pos) {
1190
// An empty list signals the end of data, and should only be
1191
// returned if read == -1. If some data has already been read,
1192
// then it must be returned. -1 will be returned next time
1193
// the caller attempts to read something.
1194
buffersSource.returnUnused(buf);
1195
if (list == null) {
1196
// nothing read - list was null - return EOF or NOTHING
1197
list = read == -1 ? EOF : NOTHING;
1198
}
1199
break;
1200
}
1201
1202
// check whether this buffer has still some free space available.
1203
// if so, we will keep it for the next round.
1204
list = buffersSource.append(list, buf, pos);
1205
1206
if (read <= 0 || list.size() == MAX_BUFFERS) {
1207
break;
1208
}
1209
1210
buf = buffersSource.getBuffer();
1211
pos = buf.position();
1212
assert buf.hasRemaining();
1213
}
1214
return list;
1215
}
1216
1217
private static <T> List<T> listOf(List<T> list, T item) {
1218
int size = list == null ? 0 : list.size();
1219
switch (size) {
1220
case 0: return List.of(item);
1221
case 1: return List.of(list.get(0), item);
1222
case 2: return List.of(list.get(0), list.get(1), item);
1223
default: // slow path if MAX_BUFFERS > 3
1224
List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
1225
res.add(item);
1226
return res;
1227
}
1228
}
1229
1230
private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
1231
ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
1232
final long remaining = Utils.remaining(srcs);
1233
long written = 0;
1234
while (remaining > written) {
1235
try {
1236
long w = channel.write(srcs);
1237
assert w >= 0 : "negative number of bytes written:" + w;
1238
if (w == 0) {
1239
break;
1240
}
1241
written += w;
1242
} catch (IOException x) {
1243
if (written == 0) {
1244
// no bytes were written just throw
1245
throw x;
1246
} else {
1247
// return how many bytes were written, will fail next time
1248
break;
1249
}
1250
}
1251
}
1252
return written;
1253
}
1254
1255
private void resumeEvent(SocketFlowEvent event,
1256
Consumer<Throwable> errorSignaler) {
1257
boolean registrationRequired;
1258
synchronized(lock) {
1259
registrationRequired = !event.registered();
1260
event.resume();
1261
}
1262
try {
1263
if (registrationRequired) {
1264
client.registerEvent(event);
1265
} else {
1266
client.eventUpdated(event);
1267
}
1268
} catch(Throwable t) {
1269
errorSignaler.accept(t);
1270
}
1271
}
1272
1273
private void pauseEvent(SocketFlowEvent event,
1274
Consumer<Throwable> errorSignaler) {
1275
synchronized(lock) {
1276
event.pause();
1277
}
1278
try {
1279
client.eventUpdated(event);
1280
} catch(Throwable t) {
1281
errorSignaler.accept(t);
1282
}
1283
}
1284
1285
@Override
1286
public void connectFlows(TubePublisher writePublisher,
1287
TubeSubscriber readSubscriber) {
1288
if (debug.on()) debug.log("connecting flows");
1289
this.subscribe(readSubscriber);
1290
writePublisher.subscribe(this);
1291
}
1292
1293
1294
@Override
1295
public String toString() {
1296
return dbgString();
1297
}
1298
1299
final String dbgString() {
1300
return "SocketTube("+id+")";
1301
}
1302
1303
final String channelDescr() {
1304
return String.valueOf(channel);
1305
}
1306
}
1307
1308