Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
41171 views
/*1* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.nio.ByteBuffer;29import java.util.List;30import java.util.Objects;31import java.util.concurrent.Flow;32import java.util.concurrent.atomic.AtomicLong;33import java.util.concurrent.atomic.AtomicReference;34import java.nio.channels.SelectableChannel;35import java.nio.channels.SelectionKey;36import java.nio.channels.SocketChannel;37import java.util.ArrayList;38import java.util.concurrent.locks.Lock;39import java.util.concurrent.locks.ReentrantLock;40import java.util.function.Consumer;41import java.util.function.Supplier;42import jdk.internal.net.http.common.BufferSupplier;43import jdk.internal.net.http.common.Demand;44import jdk.internal.net.http.common.FlowTube;45import jdk.internal.net.http.common.Log;46import jdk.internal.net.http.common.Logger;47import jdk.internal.net.http.common.SequentialScheduler;48import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;49import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;50import jdk.internal.net.http.common.Utils;5152/**53* A SocketTube is a terminal tube plugged directly into the socket.54* The read subscriber should call {@code subscribe} on the SocketTube before55* the SocketTube is subscribed to the write publisher.56*/57final class SocketTube implements FlowTube {5859final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);60static final AtomicLong IDS = new AtomicLong();6162private final HttpClientImpl client;63private final SocketChannel channel;64private final SliceBufferSource sliceBuffersSource;65private final Object lock = new Object();66private final AtomicReference<Throwable> errorRef = new AtomicReference<>();67private final InternalReadPublisher readPublisher;68private final InternalWriteSubscriber writeSubscriber;69private final long id = IDS.incrementAndGet();7071public SocketTube(HttpClientImpl client, SocketChannel channel,72Supplier<ByteBuffer> buffersFactory) {73this.client = client;74this.channel = channel;75this.sliceBuffersSource = new SliceBufferSource(buffersFactory);7677this.readPublisher = new InternalReadPublisher();78this.writeSubscriber = new InternalWriteSubscriber();79}8081/**82* Returns {@code true} if this flow is finished.83* This happens when this flow internal read subscription is completed,84* either normally (EOF reading) or exceptionally (EOF writing, or85* underlying socket closed, or some exception occurred while reading or86* writing to the socket).87*88* @return {@code true} if this flow is finished.89*/90public boolean isFinished() {91InternalReadPublisher.InternalReadSubscription subscription =92readPublisher.subscriptionImpl;93return subscription != null && subscription.completed94|| subscription == null && errorRef.get() != null;95}9697// ===================================================================== //98// Flow.Publisher //99// ======================================================================//100101/**102* {@inheritDoc }103* @apiNote This method should be called first. In particular, the caller104* must ensure that this method must be called by the read105* subscriber before the write publisher can call {@code onSubscribe}.106* Failure to adhere to this contract may result in assertion errors.107*/108@Override109public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {110Objects.requireNonNull(s);111assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;112readPublisher.subscribe(s);113}114115116// ===================================================================== //117// Flow.Subscriber //118// ======================================================================//119120/**121* {@inheritDoc }122* @apiNote The caller must ensure that {@code subscribe} is called by123* the read subscriber before {@code onSubscribe} is called by124* the write publisher.125* Failure to adhere to this contract may result in assertion errors.126*/127@Override128public void onSubscribe(Flow.Subscription subscription) {129writeSubscriber.onSubscribe(subscription);130}131132@Override133public void onNext(List<ByteBuffer> item) {134writeSubscriber.onNext(item);135}136137@Override138public void onError(Throwable throwable) {139writeSubscriber.onError(throwable);140}141142@Override143public void onComplete() {144writeSubscriber.onComplete();145}146147// ===================================================================== //148// Events //149// ======================================================================//150151void signalClosed() {152// Ensures that the subscriber will be terminated and that future153// subscribers will be notified when the connection is closed.154if (Log.channel()) {155Log.logChannel("Connection close signalled: connection closed locally ({0})",156channelDescr());157}158readPublisher.subscriptionImpl.signalError(159new IOException("connection closed locally"));160}161162/**163* A restartable task used to process tasks in sequence.164*/165private static class SocketFlowTask implements RestartableTask {166final Runnable task;167private final Lock lock = new ReentrantLock();168SocketFlowTask(Runnable task) {169this.task = task;170}171@Override172public final void run(DeferredCompleter taskCompleter) {173try {174// The logics of the sequential scheduler should ensure that175// the restartable task is running in only one thread at176// a given time: there should never be contention.177boolean locked = lock.tryLock();178assert locked : "contention detected in SequentialScheduler";179try {180task.run();181} finally {182if (locked) lock.unlock();183}184} finally {185taskCompleter.complete();186}187}188}189190// This is best effort - there's no guarantee that the printed set of values191// is consistent. It should only be considered as weakly accurate - in192// particular in what concerns the events states, especially when displaying193// a read event state from a write event callback and conversely.194void debugState(String when) {195if (debug.on()) {196StringBuilder state = new StringBuilder();197198InternalReadPublisher.InternalReadSubscription sub =199readPublisher.subscriptionImpl;200InternalReadPublisher.ReadEvent readEvent =201sub == null ? null : sub.readEvent;202Demand rdemand = sub == null ? null : sub.demand;203InternalWriteSubscriber.WriteEvent writeEvent =204writeSubscriber.writeEvent;205Demand wdemand = writeSubscriber.writeDemand;206int rops = readEvent == null ? 0 : readEvent.interestOps();207long rd = rdemand == null ? 0 : rdemand.get();208int wops = writeEvent == null ? 0 : writeEvent.interestOps();209long wd = wdemand == null ? 0 : wdemand.get();210211state.append(when).append(" Reading: [ops=")212.append(rops).append(", demand=").append(rd)213.append(", stopped=")214.append((sub == null ? false : sub.readScheduler.isStopped()))215.append("], Writing: [ops=").append(wops)216.append(", demand=").append(wd)217.append("]");218debug.log(state.toString());219}220}221222/**223* A repeatable event that can be paused or resumed by changing its224* interestOps. When the event is fired, it is first paused before being225* signaled. It is the responsibility of the code triggered by226* {@code signalEvent} to resume the event if required.227*/228private static abstract class SocketFlowEvent extends AsyncEvent {229final SocketChannel channel;230final int defaultInterest;231volatile int interestOps;232volatile boolean registered;233SocketFlowEvent(int defaultInterest, SocketChannel channel) {234super(AsyncEvent.REPEATING);235this.defaultInterest = defaultInterest;236this.channel = channel;237}238final boolean registered() {return registered;}239final void resume() {240interestOps = defaultInterest;241registered = true;242}243final void pause() {interestOps = 0;}244@Override245public final SelectableChannel channel() {return channel;}246@Override247public final int interestOps() {return interestOps;}248249@Override250public final void handle() {251pause(); // pause, then signal252signalEvent(); // won't be fired again until resumed.253}254@Override255public final void abort(IOException error) {256debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);257pause(); // pause, then signal258signalError(error); // should not be resumed after abort (not checked)259}260261protected abstract void signalEvent();262protected abstract void signalError(Throwable error);263abstract Logger debug();264}265266// ===================================================================== //267// Writing //268// ======================================================================//269270// This class makes the assumption that the publisher will call onNext271// sequentially, and that onNext won't be called if the demand has not been272// incremented by request(1).273// It has a 'queue of 1' meaning that it will call request(1) in274// onSubscribe, and then only after its 'current' buffer list has been275// fully written and current set to null;276private final class InternalWriteSubscriber277implements Flow.Subscriber<List<ByteBuffer>> {278279volatile WriteSubscription subscription;280volatile List<ByteBuffer> current;281volatile boolean completed;282final AsyncTriggerEvent startSubscription =283new AsyncTriggerEvent(this::signalError, this::startSubscription);284final WriteEvent writeEvent = new WriteEvent(channel, this);285final Demand writeDemand = new Demand();286287@Override288public void onSubscribe(Flow.Subscription subscription) {289WriteSubscription previous = this.subscription;290if (debug.on()) debug.log("subscribed for writing");291try {292boolean needEvent = current == null;293if (needEvent) {294if (previous != null && previous.upstreamSubscription != subscription) {295previous.dropSubscription();296}297}298this.subscription = new WriteSubscription(subscription);299if (needEvent) {300if (debug.on())301debug.log("write: registering startSubscription event");302client.registerEvent(startSubscription);303}304} catch (Throwable t) {305signalError(t);306}307}308309@Override310public void onNext(List<ByteBuffer> bufs) {311assert current == null : dbgString() // this is a queue of 1.312+ "w.onNext current: " + current;313assert subscription != null : dbgString()314+ "w.onNext: subscription is null";315current = bufs;316tryFlushCurrent(client.isSelectorThread()); // may be in selector thread317// For instance in HTTP/2, a received SETTINGS frame might trigger318// the sending of a SETTINGS frame in turn which might cause319// onNext to be called from within the same selector thread that the320// original SETTINGS frames arrived on. If rs is the read-subscriber321// and ws is the write-subscriber then the following can occur:322// ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write323// client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent324debugState("leaving w.onNext");325}326327// Don't use a SequentialScheduler here: rely on onNext() being invoked328// sequentially, and not being invoked if there is no demand, request(1).329// onNext is usually called from within a user / executor thread.330// Initial writing will be performed in that thread. If for some reason,331// not all the data can be written, a writeEvent will be registered, and332// writing will resume in the selector manager thread when the333// writeEvent is fired.334//335// If this method is invoked in the selector manager thread (because of336// a writeEvent), then the executor will be used to invoke request(1),337// ensuring that onNext() won't be invoked from within the selector338// thread. If not in the selector manager thread, then request(1) is339// invoked directly.340void tryFlushCurrent(boolean inSelectorThread) {341List<ByteBuffer> bufs = current;342if (bufs == null) return;343try {344assert inSelectorThread == client.isSelectorThread() :345"should " + (inSelectorThread ? "" : "not ")346+ " be in the selector thread";347long remaining = Utils.remaining(bufs);348if (debug.on()) debug.log("trying to write: %d", remaining);349long written = writeAvailable(bufs);350if (debug.on()) debug.log("wrote: %d", written);351assert written >= 0 : "negative number of bytes written:" + written;352assert written <= remaining;353if (remaining - written == 0) {354current = null;355if (writeDemand.tryDecrement()) {356Runnable requestMore = this::requestMore;357if (inSelectorThread) {358assert client.isSelectorThread();359client.theExecutor().execute(requestMore);360} else {361assert !client.isSelectorThread();362requestMore.run();363}364}365} else {366resumeWriteEvent(inSelectorThread);367}368} catch (Throwable t) {369signalError(t);370}371}372373// Kick off the initial request:1 that will start the writing side.374// Invoked in the selector manager thread.375void startSubscription() {376try {377if (debug.on()) debug.log("write: starting subscription");378if (Log.channel()) {379Log.logChannel("Start requesting bytes for writing to channel: {0}",380channelDescr());381}382assert client.isSelectorThread();383// make sure read registrations are handled before;384readPublisher.subscriptionImpl.handlePending();385if (debug.on()) debug.log("write: offloading requestMore");386// start writing;387client.theExecutor().execute(this::requestMore);388} catch(Throwable t) {389signalError(t);390}391}392393void requestMore() {394WriteSubscription subscription = this.subscription;395subscription.requestMore();396}397398@Override399public void onError(Throwable throwable) {400signalError(throwable);401}402403@Override404public void onComplete() {405completed = true;406// no need to pause the write event here: the write event will407// be paused if there is nothing more to write.408List<ByteBuffer> bufs = current;409long remaining = bufs == null ? 0 : Utils.remaining(bufs);410if (debug.on())411debug.log( "write completed, %d yet to send", remaining);412debugState("InternalWriteSubscriber::onComplete");413}414415void resumeWriteEvent(boolean inSelectorThread) {416if (debug.on()) debug.log("scheduling write event");417resumeEvent(writeEvent, this::signalError);418}419420void signalWritable() {421if (debug.on()) debug.log("channel is writable");422tryFlushCurrent(true);423}424425void signalError(Throwable error) {426debug.log(() -> "write error: " + error);427if (Log.channel()) {428Log.logChannel("Failed to write to channel ({0}: {1})",429channelDescr(), error);430}431completed = true;432readPublisher.signalError(error);433Flow.Subscription subscription = this.subscription;434if (subscription != null) subscription.cancel();435}436437// A repeatable WriteEvent which is paused after firing and can438// be resumed if required - see SocketFlowEvent;439final class WriteEvent extends SocketFlowEvent {440final InternalWriteSubscriber sub;441WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {442super(SelectionKey.OP_WRITE, channel);443this.sub = sub;444}445@Override446protected final void signalEvent() {447try {448client.eventUpdated(this);449sub.signalWritable();450} catch(Throwable t) {451sub.signalError(t);452}453}454455@Override456protected void signalError(Throwable error) {457sub.signalError(error);458}459460@Override461Logger debug() { return debug; }462}463464final class WriteSubscription implements Flow.Subscription {465final Flow.Subscription upstreamSubscription;466volatile boolean cancelled;467WriteSubscription(Flow.Subscription subscription) {468this.upstreamSubscription = subscription;469}470471@Override472public void request(long n) {473if (cancelled) return;474upstreamSubscription.request(n);475}476477@Override478public void cancel() {479if (cancelled) return;480if (debug.on()) debug.log("write: cancel");481if (Log.channel()) {482Log.logChannel("Cancelling write subscription");483}484dropSubscription();485upstreamSubscription.cancel();486}487488void dropSubscription() {489synchronized (InternalWriteSubscriber.this) {490cancelled = true;491if (debug.on()) debug.log("write: resetting demand to 0");492writeDemand.reset();493}494}495496void requestMore() {497try {498if (completed || cancelled) return;499boolean requestMore;500long d;501// don't fiddle with demand after cancel.502// see dropSubscription.503synchronized (InternalWriteSubscriber.this) {504if (cancelled) return;505d = writeDemand.get();506requestMore = writeDemand.increaseIfFulfilled();507}508if (requestMore) {509if (debug.on()) debug.log("write: requesting more...");510upstreamSubscription.request(1);511} else {512if (debug.on())513debug.log("write: no need to request more: %d", d);514}515} catch (Throwable t) {516if (debug.on())517debug.log("write: error while requesting more: " + t);518signalError(t);519} finally {520debugState("leaving requestMore: ");521}522}523}524}525526// ===================================================================== //527// Reading //528// ===================================================================== //529530// The InternalReadPublisher uses a SequentialScheduler to ensure that531// onNext/onError/onComplete are called sequentially on the caller's532// subscriber.533// However, it relies on the fact that the only time where534// runOrSchedule() is called from a user/executor thread is in signalError,535// right after the errorRef has been set.536// Because the sequential scheduler's task always checks for errors first,537// and always terminate the scheduler on error, then it is safe to assume538// that if it reaches the point where it reads from the channel, then539// it is running in the SelectorManager thread. This is because all540// other invocation of runOrSchedule() are triggered from within a541// ReadEvent.542//543// When pausing/resuming the event, some shortcuts can then be taken544// when we know we're running in the selector manager thread545// (in that case there's no need to call client.eventUpdated(readEvent);546//547private final class InternalReadPublisher548implements Flow.Publisher<List<ByteBuffer>> {549private final InternalReadSubscription subscriptionImpl550= new InternalReadSubscription();551AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();552private volatile ReadSubscription subscription;553554@Override555public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {556Objects.requireNonNull(s);557558TubeSubscriber sub = FlowTube.asTubeSubscriber(s);559ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);560ReadSubscription previous = pendingSubscription.getAndSet(target);561562if (previous != null && previous != target) {563if (debug.on())564debug.log("read publisher: dropping pending subscriber: "565+ previous.subscriber);566previous.errorRef.compareAndSet(null, errorRef.get());567previous.signalOnSubscribe();568if (subscriptionImpl.completed) {569previous.signalCompletion();570} else {571previous.subscriber.dropSubscription();572}573}574575if (debug.on()) debug.log("read publisher got subscriber");576subscriptionImpl.signalSubscribe();577debugState("leaving read.subscribe: ");578}579580void signalError(Throwable error) {581if (debug.on()) debug.log("error signalled " + error);582if (!errorRef.compareAndSet(null, error)) {583return;584}585if (Log.channel()) {586Log.logChannel("Error signalled on channel {0}: {1}",587channelDescr(), error);588}589subscriptionImpl.handleError();590}591592final class ReadSubscription implements Flow.Subscription {593final InternalReadSubscription impl;594final TubeSubscriber subscriber;595final AtomicReference<Throwable> errorRef = new AtomicReference<>();596final BufferSource bufferSource;597volatile boolean subscribed;598volatile boolean cancelled;599volatile boolean completed;600601public ReadSubscription(InternalReadSubscription impl,602TubeSubscriber subscriber) {603this.impl = impl;604this.bufferSource = subscriber.supportsRecycling()605? new SSLDirectBufferSource(client)606: SocketTube.this.sliceBuffersSource;607this.subscriber = subscriber;608}609610@Override611public void cancel() {612cancelled = true;613}614615@Override616public void request(long n) {617if (!cancelled) {618impl.request(n);619} else {620if (debug.on())621debug.log("subscription cancelled, ignoring request %d", n);622}623}624625void signalCompletion() {626assert subscribed || cancelled;627if (completed || cancelled) return;628synchronized (this) {629if (completed) return;630completed = true;631}632Throwable error = errorRef.get();633if (error != null) {634if (debug.on())635debug.log("forwarding error to subscriber: " + error);636subscriber.onError(error);637} else {638if (debug.on()) debug.log("completing subscriber");639subscriber.onComplete();640}641}642643void signalOnSubscribe() {644if (subscribed || cancelled) return;645synchronized (this) {646if (subscribed || cancelled) return;647subscribed = true;648}649subscriber.onSubscribe(this);650if (debug.on()) debug.log("onSubscribe called");651if (errorRef.get() != null) {652signalCompletion();653}654}655}656657final class InternalReadSubscription implements Flow.Subscription {658659private final Demand demand = new Demand();660final SequentialScheduler readScheduler;661private volatile boolean completed;662private final ReadEvent readEvent;663private final AsyncEvent subscribeEvent;664665InternalReadSubscription() {666readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));667subscribeEvent = new AsyncTriggerEvent(this::signalError,668this::handleSubscribeEvent);669readEvent = new ReadEvent(channel, this);670}671672/*673* This method must be invoked before any other method of this class.674*/675final void signalSubscribe() {676if (readScheduler.isStopped() || completed) {677// if already completed or stopped we can handle any678// pending connection directly from here.679if (debug.on())680debug.log("handling pending subscription while completed");681handlePending();682} else {683try {684if (debug.on()) debug.log("registering subscribe event");685client.registerEvent(subscribeEvent);686} catch (Throwable t) {687signalError(t);688handlePending();689}690}691}692693final void handleSubscribeEvent() {694assert client.isSelectorThread();695debug.log("subscribe event raised");696if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());697readScheduler.runOrSchedule();698if (readScheduler.isStopped() || completed) {699// if already completed or stopped we can handle any700// pending connection directly from here.701if (debug.on())702debug.log("handling pending subscription when completed");703handlePending();704}705}706707708/*709* Although this method is thread-safe, the Reactive-Streams spec seems710* to not require it to be as such. It's a responsibility of the711* subscriber to signal demand in a thread-safe manner.712*713* See Reactive Streams specification, rules 2.7 and 3.4.714*/715@Override716public final void request(long n) {717if (n > 0L) {718boolean wasFulfilled = demand.increase(n);719if (wasFulfilled) {720if (debug.on()) debug.log("got some demand for reading");721resumeReadEvent();722// if demand has been changed from fulfilled723// to unfulfilled register read event;724}725} else {726signalError(new IllegalArgumentException("non-positive request"));727}728debugState("leaving request("+n+"): ");729}730731@Override732public final void cancel() {733pauseReadEvent();734if (debug.on()) debug.log("Read subscription cancelled");735if (Log.channel()) {736Log.logChannel("Read subscription cancelled for channel {0}",737channelDescr());738}739if (debug.on()) debug.log("Stopping read scheduler");740readScheduler.stop();741}742743private void resumeReadEvent() {744if (debug.on()) debug.log("resuming read event");745resumeEvent(readEvent, this::signalError);746}747748private void pauseReadEvent() {749if (debug.on()) debug.log("pausing read event");750pauseEvent(readEvent, this::signalError);751}752753754final void handleError() {755assert errorRef.get() != null;756readScheduler.runOrSchedule();757}758759final void signalError(Throwable error) {760if (debug.on()) debug.log("signal read error: " + error);761if (!errorRef.compareAndSet(null, error)) {762return;763}764if (debug.on()) debug.log("got read error: " + error);765if (Log.channel()) {766Log.logChannel("Read error signalled on channel {0}: {1}",767channelDescr(), error);768}769readScheduler.runOrSchedule();770}771772final void signalReadable() {773readScheduler.runOrSchedule();774}775776/** The body of the task that runs in SequentialScheduler. */777final void read() {778// It is important to only call pauseReadEvent() when stopping779// the scheduler. The event is automatically paused before780// firing, and trying to pause it again could cause a race781// condition between this loop, which calls tryDecrementDemand(),782// and the thread that calls request(n), which will try to resume783// reading.784try {785while(!readScheduler.isStopped()) {786if (completed) return;787788// make sure we have a subscriber789if (handlePending()) {790if (debug.on())791debug.log("pending subscriber subscribed");792return;793}794795// If an error was signaled, we might not be in the796// the selector thread, and that is OK, because we797// will just call onError and return.798ReadSubscription current = subscription;799Throwable error = errorRef.get();800if (current == null) {801assert error != null;802if (debug.on())803debug.log("error raised before subscriber subscribed: %s",804(Object)error);805return;806}807TubeSubscriber subscriber = current.subscriber;808if (error != null) {809completed = true;810// safe to pause here because we're finished anyway.811pauseReadEvent();812if (debug.on())813debug.log("Sending error " + error814+ " to subscriber " + subscriber);815if (Log.channel()) {816Log.logChannel("Raising error with subscriber for {0}: {1}",817channelDescr(), error);818}819current.errorRef.compareAndSet(null, error);820current.signalCompletion();821if (debug.on()) debug.log("Stopping read scheduler");822readScheduler.stop();823debugState("leaving read() loop with error: ");824return;825}826827// If we reach here then we must be in the selector thread.828assert client.isSelectorThread();829if (demand.tryDecrement()) {830// we have demand.831try {832List<ByteBuffer> bytes = readAvailable(current.bufferSource);833if (bytes == EOF) {834if (!completed) {835if (debug.on()) debug.log("got read EOF");836if (Log.channel()) {837Log.logChannel("EOF read from channel: {0}",838channelDescr());839}840completed = true;841// safe to pause here because we're finished842// anyway.843pauseReadEvent();844current.signalCompletion();845if (debug.on()) debug.log("Stopping read scheduler");846readScheduler.stop();847}848debugState("leaving read() loop after EOF: ");849return;850} else if (Utils.remaining(bytes) > 0) {851// the subscriber is responsible for offloading852// to another thread if needed.853if (debug.on())854debug.log("read bytes: " + Utils.remaining(bytes));855assert !current.completed;856subscriber.onNext(bytes);857// we could continue looping until the demand858// reaches 0. However, that would risk starving859// other connections (bound to other socket860// channels) - as other selected keys activated861// by the selector manager thread might be862// waiting for this event to terminate.863// So resume the read event and return now...864resumeReadEvent();865if (errorRef.get() != null) continue;866debugState("leaving read() loop after onNext: ");867return;868} else {869// nothing available!870if (debug.on()) debug.log("no more bytes available");871// re-increment the demand and resume the read872// event. This ensures that this loop is873// executed again when the socket becomes874// readable again.875demand.increase(1);876resumeReadEvent();877if (errorRef.get() != null) continue;878debugState("leaving read() loop with no bytes");879return;880}881} catch (Throwable x) {882signalError(x);883continue;884}885} else {886if (debug.on()) debug.log("no more demand for reading");887// the event is paused just after firing, so it should888// still be paused here, unless the demand was just889// incremented from 0 to n, in which case, the890// event will be resumed, causing this loop to be891// invoked again when the socket becomes readable:892// This is what we want.893// Trying to pause the event here would actually894// introduce a race condition between this loop and895// request(n).896if (errorRef.get() != null) continue;897debugState("leaving read() loop with no demand");898break;899}900}901} catch (Throwable t) {902if (debug.on()) debug.log("Unexpected exception in read loop", t);903signalError(t);904} finally {905if (readScheduler.isStopped()) {906if (debug.on()) debug.log("Read scheduler stopped");907if (Log.channel()) {908Log.logChannel("Stopped reading from channel {0}", channelDescr());909}910}911handlePending();912}913}914915boolean handlePending() {916ReadSubscription pending = pendingSubscription.getAndSet(null);917if (pending == null) return false;918if (debug.on())919debug.log("handling pending subscription for %s",920pending.subscriber);921ReadSubscription current = subscription;922if (current != null && current != pending && !completed) {923current.subscriber.dropSubscription();924}925if (debug.on()) debug.log("read demand reset to 0");926subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.927pending.errorRef.compareAndSet(null, errorRef.get());928if (!readScheduler.isStopped()) {929subscription = pending;930} else {931if (debug.on()) debug.log("socket tube is already stopped");932}933if (debug.on()) debug.log("calling onSubscribe");934pending.signalOnSubscribe();935if (completed) {936pending.errorRef.compareAndSet(null, errorRef.get());937pending.signalCompletion();938}939return true;940}941}942943944// A repeatable ReadEvent which is paused after firing and can945// be resumed if required - see SocketFlowEvent;946final class ReadEvent extends SocketFlowEvent {947final InternalReadSubscription sub;948ReadEvent(SocketChannel channel, InternalReadSubscription sub) {949super(SelectionKey.OP_READ, channel);950this.sub = sub;951}952@Override953protected final void signalEvent() {954try {955client.eventUpdated(this);956sub.signalReadable();957} catch(Throwable t) {958sub.signalError(t);959}960}961962@Override963protected final void signalError(Throwable error) {964if (debug.on()) debug.log("signalError to %s (%s)", sub, error);965sub.signalError(error);966}967968@Override969Logger debug() { return debug; }970}971}972973// ===================================================================== //974// Buffer Management //975// ===================================================================== //976977// This interface is used by readAvailable(BufferSource);978public interface BufferSource {979/**980* Returns a buffer to read data from the socket.981*982* @implNote983* Different implementation can have different strategies, as to984* which kind of buffer to return, or whether to return the same985* buffer. The only constraints are that:986* a. the buffer returned must not be null987* b. the buffer position indicates where to start reading988* c. the buffer limit indicates where to stop reading.989* d. the buffer is 'free' - that is - it is not used990* or retained by anybody else991*992* @return A buffer to read data from the socket.993*/994ByteBuffer getBuffer();995996/**997* Appends the read-data in {@code buffer} to the list of buffer to998* be sent downstream to the subscriber. May return a new999* list, or append to the given list.1000*1001* @implNote1002* Different implementation can have different strategies, but1003* must obviously be consistent with the implementation of the1004* getBuffer() method. For instance, an implementation could1005* decide to add the buffer to the list and return a new buffer1006* next time getBuffer() is called, or could decide to add a buffer1007* slice to the list and return the same buffer (if remaining1008* space is available) next time getBuffer() is called.1009*1010* @param list The list before adding the data. Can be null.1011* @param buffer The buffer containing the data to add to the list.1012* @param start The start position at which data were read.1013* The current buffer position indicates the end.1014* @return A possibly new list where a buffer containing the1015* data read from the socket has been added.1016*/1017List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);10181019/**1020* Returns the given unused {@code buffer}, previously obtained from1021* {@code getBuffer}.1022*1023* @implNote This method can be used, if necessary, to return1024* the unused buffer to the pull.1025*1026* @param buffer The unused buffer.1027*/1028default void returnUnused(ByteBuffer buffer) { }1029}10301031// An implementation of BufferSource used for unencrypted data.1032// This buffer source uses heap buffers and avoids wasting memory1033// by forwarding read-only buffer slices downstream.1034// Buffers allocated through this source are simply GC'ed when1035// they are no longer referenced.1036private static final class SliceBufferSource implements BufferSource {1037private final Supplier<ByteBuffer> factory;1038private volatile ByteBuffer current;10391040public SliceBufferSource() {1041this(Utils::getBuffer);1042}1043public SliceBufferSource(Supplier<ByteBuffer> factory) {1044this.factory = Objects.requireNonNull(factory);1045}10461047// Reuses the same buffer if some space remains available.1048// Otherwise, returns a new heap buffer.1049@Override1050public final ByteBuffer getBuffer() {1051ByteBuffer buf = current;1052buf = (buf == null || !buf.hasRemaining())1053? (current = factory.get()) : buf;1054assert buf.hasRemaining();1055return buf;1056}10571058// Adds a read-only slice to the list, potentially returning a1059// new list with that slice at the end.1060@Override1061public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {1062// creates a slice to add to the list1063int limit = buf.limit();1064buf.limit(buf.position());1065buf.position(start);1066ByteBuffer slice = buf.slice();10671068// restore buffer state to what it was before creating the slice1069buf.position(buf.limit());1070buf.limit(limit);10711072// add the buffer to the list1073return SocketTube.listOf(list, slice.asReadOnlyBuffer());1074}1075}107610771078// An implementation of BufferSource used for encrypted data.1079// This buffer source uses direct byte buffers that will be1080// recycled by the SocketTube subscriber.1081//1082private static final class SSLDirectBufferSource implements BufferSource {1083private final BufferSupplier factory;1084private final HttpClientImpl client;1085private ByteBuffer current;10861087public SSLDirectBufferSource(HttpClientImpl client) {1088this.client = Objects.requireNonNull(client);1089this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());1090}10911092// Obtains a 'free' byte buffer from the pool, or returns1093// the same buffer if nothing was read at the previous cycle.1094// The subscriber will be responsible for recycling this1095// buffer into the pool (see SSLFlowDelegate.Reader)1096@Override1097public final ByteBuffer getBuffer() {1098assert client.isSelectorThread();1099ByteBuffer buf = current;1100if (buf == null) {1101buf = current = factory.get();1102}1103assert buf.hasRemaining();1104assert buf.position() == 0;1105return buf;1106}11071108// Adds the buffer to the list. The buffer will be later returned to the1109// pool by the subscriber (see SSLFlowDelegate.Reader).1110// The next buffer returned by getBuffer() will be obtained from the1111// pool. It might be the same buffer or another one.1112// Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because1113// recycling will happen in the flow before onNext returns, then the1114// pool can not grow larger than MAX_BUFFERS = 3 buffers, even though1115// it's shared by all SSL connections opened on that client.1116@Override1117public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {1118assert client.isSelectorThread();1119assert buf.isDirect();1120assert start == 0;1121assert current == buf;1122current = null;1123buf.limit(buf.position());1124buf.position(start);1125// add the buffer to the list1126return SocketTube.listOf(list, buf);1127}11281129@Override1130public void returnUnused(ByteBuffer buffer) {1131// if current is null, then the buffer will have been added to the1132// list, through append. Otherwise, current is not null, and needs1133// to be returned to prevent the buffer supplier pool from growing1134// to more than MAX_BUFFERS.1135assert buffer == current;1136ByteBuffer buf = current;1137if (buf != null) {1138assert buf.position() == 0;1139current = null;1140// the supplier assert if buf has remaining1141buf.limit(buf.position());1142factory.recycle(buf);1143}1144}1145}11461147// ===================================================================== //1148// Socket Channel Read/Write //1149// ===================================================================== //1150static final int MAX_BUFFERS = 3;1151static final List<ByteBuffer> EOF = List.of();1152static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);11531154// readAvailable() will read bytes into the 'current' ByteBuffer until1155// the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().1156// When that happens, a slice of the data that has been read so far1157// is inserted into the returned buffer list, and if the current buffer1158// has remaining space, that space will be used to read more data when1159// the channel becomes readable again.1160private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {1161ByteBuffer buf = buffersSource.getBuffer();1162assert buf.hasRemaining();11631164int read;1165int pos = buf.position();1166List<ByteBuffer> list = null;1167while (buf.hasRemaining()) {1168try {1169while ((read = channel.read(buf)) > 0) {1170if (!buf.hasRemaining())1171break;1172}1173} catch (IOException x) {1174if (buf.position() == pos && list == null) {1175// make sure that the buffer source will recycle1176// 'buf' if needed1177buffersSource.returnUnused(buf);1178// no bytes have been read, just throw...1179throw x;1180} else {1181// some bytes have been read, return them and fail next time1182errorRef.compareAndSet(null, x);1183read = 0; // ensures outer loop will exit1184}1185}11861187// nothing read;1188if (buf.position() == pos) {1189// An empty list signals the end of data, and should only be1190// returned if read == -1. If some data has already been read,1191// then it must be returned. -1 will be returned next time1192// the caller attempts to read something.1193buffersSource.returnUnused(buf);1194if (list == null) {1195// nothing read - list was null - return EOF or NOTHING1196list = read == -1 ? EOF : NOTHING;1197}1198break;1199}12001201// check whether this buffer has still some free space available.1202// if so, we will keep it for the next round.1203list = buffersSource.append(list, buf, pos);12041205if (read <= 0 || list.size() == MAX_BUFFERS) {1206break;1207}12081209buf = buffersSource.getBuffer();1210pos = buf.position();1211assert buf.hasRemaining();1212}1213return list;1214}12151216private static <T> List<T> listOf(List<T> list, T item) {1217int size = list == null ? 0 : list.size();1218switch (size) {1219case 0: return List.of(item);1220case 1: return List.of(list.get(0), item);1221case 2: return List.of(list.get(0), list.get(1), item);1222default: // slow path if MAX_BUFFERS > 31223List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);1224res.add(item);1225return res;1226}1227}12281229private long writeAvailable(List<ByteBuffer> bytes) throws IOException {1230ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);1231final long remaining = Utils.remaining(srcs);1232long written = 0;1233while (remaining > written) {1234try {1235long w = channel.write(srcs);1236assert w >= 0 : "negative number of bytes written:" + w;1237if (w == 0) {1238break;1239}1240written += w;1241} catch (IOException x) {1242if (written == 0) {1243// no bytes were written just throw1244throw x;1245} else {1246// return how many bytes were written, will fail next time1247break;1248}1249}1250}1251return written;1252}12531254private void resumeEvent(SocketFlowEvent event,1255Consumer<Throwable> errorSignaler) {1256boolean registrationRequired;1257synchronized(lock) {1258registrationRequired = !event.registered();1259event.resume();1260}1261try {1262if (registrationRequired) {1263client.registerEvent(event);1264} else {1265client.eventUpdated(event);1266}1267} catch(Throwable t) {1268errorSignaler.accept(t);1269}1270}12711272private void pauseEvent(SocketFlowEvent event,1273Consumer<Throwable> errorSignaler) {1274synchronized(lock) {1275event.pause();1276}1277try {1278client.eventUpdated(event);1279} catch(Throwable t) {1280errorSignaler.accept(t);1281}1282}12831284@Override1285public void connectFlows(TubePublisher writePublisher,1286TubeSubscriber readSubscriber) {1287if (debug.on()) debug.log("connecting flows");1288this.subscribe(readSubscriber);1289writePublisher.subscribe(this);1290}129112921293@Override1294public String toString() {1295return dbgString();1296}12971298final String dbgString() {1299return "SocketTube("+id+")";1300}13011302final String channelDescr() {1303return String.valueOf(channel);1304}1305}130613071308