Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java
41171 views
/*1* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import jdk.internal.net.http.common.Demand;28import jdk.internal.net.http.common.FlowTube;29import jdk.internal.net.http.common.Logger;30import jdk.internal.net.http.common.Utils;31import jdk.internal.net.http.websocket.RawChannel;3233import java.io.EOFException;34import java.io.IOException;35import java.lang.ref.Cleaner;36import java.nio.ByteBuffer;37import java.nio.channels.ClosedChannelException;38import java.nio.channels.SelectionKey;39import java.util.ArrayList;40import java.util.List;41import java.util.concurrent.ConcurrentLinkedQueue;42import java.util.concurrent.Flow;43import java.util.concurrent.atomic.AtomicBoolean;44import java.util.concurrent.atomic.AtomicReference;45import java.util.function.Supplier;46import java.lang.System.Logger.Level;4748/*49* I/O abstraction used to implement WebSocket.50*51*/52public class RawChannelTube implements RawChannel {5354final HttpConnection connection;55final FlowTube tube;56final WritePublisher writePublisher;57final ReadSubscriber readSubscriber;58final Supplier<ByteBuffer> initial;59final AtomicBoolean inited = new AtomicBoolean();60final AtomicBoolean outputClosed = new AtomicBoolean();61final AtomicBoolean inputClosed = new AtomicBoolean();62final AtomicBoolean closed = new AtomicBoolean();63final String dbgTag;64final Logger debug;65private static final Cleaner cleaner =66Utils.ASSERTIONSENABLED && Utils.DEBUG_WS ? Cleaner.create() : null;6768RawChannelTube(HttpConnection connection,69Supplier<ByteBuffer> initial) {70this.connection = connection;71this.tube = connection.getConnectionFlow();72this.initial = initial;73this.writePublisher = new WritePublisher();74this.readSubscriber = new ReadSubscriber();75dbgTag = "[WebSocket] RawChannelTube(" + tube +")";76debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);77connection.client().webSocketOpen();78connectFlows();79if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {80// this is just for debug...81cleaner.register(this, new CleanupChecker(closed, debug));82}83}8485// Make sure no back reference to RawChannelTube can exist86// from this class. In particular it would be dangerous87// to reference connection, since connection has a reference88// to SocketTube with which a RawChannelTube is registered.89// Ditto for HttpClientImpl, which might have a back reference90// to the connection.91static final class CleanupChecker implements Runnable {92final AtomicBoolean closed;93final System.Logger debug;94CleanupChecker(AtomicBoolean closed, System.Logger debug) {95this.closed = closed;96this.debug = debug;97}9899@Override100public void run() {101if (!closed.get()) {102debug.log(Level.DEBUG,103"RawChannelTube was not closed before being released");104}105}106}107108private void connectFlows() {109if (debug.on()) debug.log("connectFlows");110tube.connectFlows(writePublisher, readSubscriber);111}112113class WriteSubscription implements Flow.Subscription {114final Flow.Subscriber<? super List<ByteBuffer>> subscriber;115final Demand demand = new Demand();116volatile boolean cancelled;117WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {118this.subscriber = subscriber;119}120@Override121public void request(long n) {122if (debug.on()) debug.log("WriteSubscription::request %d", n);123demand.increase(n);124RawEvent event;125while ((event = writePublisher.events.poll()) != null) {126if (debug.on()) debug.log("WriteSubscriber: handling event");127event.handle();128if (demand.isFulfilled()) break;129}130}131@Override132public void cancel() {133cancelled = true;134if (debug.on()) debug.log("WriteSubscription::cancel");135shutdownOutput();136RawEvent event;137while ((event = writePublisher.events.poll()) != null) {138if (debug.on()) debug.log("WriteSubscriber: handling event");139event.handle();140}141}142}143144class WritePublisher implements FlowTube.TubePublisher {145final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();146volatile WriteSubscription writeSubscription;147@Override148public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {149if (debug.on()) debug.log("WritePublisher::subscribe");150WriteSubscription subscription = new WriteSubscription(subscriber);151subscriber.onSubscribe(subscription);152writeSubscription = subscription;153}154}155156class ReadSubscriber implements FlowTube.TubeSubscriber {157158volatile Flow.Subscription readSubscription;159volatile boolean completed;160long initialRequest;161final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();162final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();163final AtomicReference<Throwable> errorRef = new AtomicReference<>();164165void checkEvents() {166Flow.Subscription subscription = readSubscription;167if (subscription != null) {168Throwable error = errorRef.get();169while (!buffers.isEmpty() || error != null || closed.get() || completed) {170RawEvent event = events.poll();171if (event == null) break;172if (debug.on()) debug.log("ReadSubscriber: handling event");173event.handle();174}175}176}177178@Override179public void onSubscribe(Flow.Subscription subscription) {180//buffers.add(initial.get());181long n;182synchronized (this) {183readSubscription = subscription;184n = initialRequest;185initialRequest = 0;186}187if (debug.on()) debug.log("ReadSubscriber::onSubscribe");188if (n > 0) {189Throwable error = errorRef.get();190if (error == null && !closed.get() && !completed) {191if (debug.on()) debug.log("readSubscription: requesting " + n);192subscription.request(n);193}194}195checkEvents();196}197198@Override199public void onNext(List<ByteBuffer> item) {200if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "201+ Utils.remaining(item) + " bytes");202buffers.addAll(item);203checkEvents();204}205206@Override207public void onError(Throwable throwable) {208if (closed.get() || errorRef.compareAndSet(null, throwable)) {209if (debug.on()) debug.log("ReadSubscriber::onError", throwable);210if (buffers.isEmpty()) {211checkEvents();212shutdownInput();213}214}215}216217@Override218public void onComplete() {219if (debug.on()) debug.log("ReadSubscriber::onComplete");220completed = true;221if (buffers.isEmpty()) {222checkEvents();223shutdownInput();224}225}226}227228229/*230* Registers given event whose callback will be called once only (i.e.231* register new event for each callback).232*233* Memory consistency effects: actions in a thread calling registerEvent234* happen-before any subsequent actions in the thread calling event.handle235*/236public void registerEvent(RawEvent event) throws IOException {237int interestOps = event.interestOps();238if ((interestOps & SelectionKey.OP_WRITE) != 0) {239if (debug.on()) debug.log("register write event");240if (outputClosed.get()) throw new IOException("closed output");241writePublisher.events.add(event);242WriteSubscription writeSubscription = writePublisher.writeSubscription;243if (writeSubscription != null) {244while (!writeSubscription.demand.isFulfilled()) {245event = writePublisher.events.poll();246if (event == null) break;247event.handle();248}249}250}251if ((interestOps & SelectionKey.OP_READ) != 0) {252if (debug.on()) debug.log("register read event");253if (inputClosed.get()) throw new IOException("closed input");254readSubscriber.events.add(event);255readSubscriber.checkEvents();256if (readSubscriber.buffers.isEmpty()257&& !readSubscriber.events.isEmpty()) {258Flow.Subscription readSubscription =259readSubscriber.readSubscription;260if (readSubscription == null) {261synchronized (readSubscriber) {262readSubscription = readSubscriber.readSubscription;263if (readSubscription == null) {264readSubscriber.initialRequest = 1;265return;266}267}268}269assert readSubscription != null;270if (debug.on()) debug.log("readSubscription: requesting 1");271readSubscription.request(1);272}273}274}275276/**277* Hands over the initial bytes. Once the bytes have been returned they are278* no longer available and the method will throw an {@link279* IllegalStateException} on each subsequent invocation.280*281* @return the initial bytes282* @throws IllegalStateException283* if the method has been already invoked284*/285public ByteBuffer initialByteBuffer() throws IllegalStateException {286if (inited.compareAndSet(false, true)) {287return initial.get();288} else throw new IllegalStateException("initial buffer already drained");289}290291/*292* Returns a ByteBuffer with the data read or null if EOF is reached. Has no293* remaining bytes if no data available at the moment.294*/295public ByteBuffer read() throws IOException {296if (debug.on()) debug.log("read");297Flow.Subscription readSubscription = readSubscriber.readSubscription;298if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;299ByteBuffer buffer = readSubscriber.buffers.poll();300if (buffer != null) {301if (debug.on()) debug.log("read: " + buffer.remaining());302return buffer;303}304Throwable error = readSubscriber.errorRef.get();305if (error != null) error = Utils.getIOException(error);306if (error instanceof EOFException) {307if (debug.on()) debug.log("read: EOFException");308shutdownInput();309return null;310}311if (error != null) {312if (debug.on()) debug.log("read: " + error);313if (closed.get()) {314return null;315}316shutdownInput();317throw Utils.getIOException(error);318}319if (readSubscriber.completed) {320if (debug.on()) debug.log("read: EOF");321shutdownInput();322return null;323}324if (inputClosed.get()) {325if (debug.on()) debug.log("read: CLOSED");326throw new IOException("closed output");327}328if (debug.on()) debug.log("read: nothing to read");329return Utils.EMPTY_BYTEBUFFER;330}331332/*333* Writes a sequence of bytes to this channel from a subsequence of the334* given buffers.335*/336public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {337if (outputClosed.get()) {338if (debug.on()) debug.log("write: CLOSED");339throw new IOException("closed output");340}341WriteSubscription writeSubscription = writePublisher.writeSubscription;342if (writeSubscription == null) {343if (debug.on()) debug.log("write: unsubscribed: 0");344return 0;345}346if (writeSubscription.cancelled) {347if (debug.on()) debug.log("write: CANCELLED");348shutdownOutput();349throw new IOException("closed output");350}351if (writeSubscription.demand.tryDecrement()) {352List<ByteBuffer> buffers = copy(srcs, offset, length);353long res = Utils.remaining(buffers);354if (debug.on()) debug.log("write: writing %d", res);355writeSubscription.subscriber.onNext(buffers);356return res;357} else {358if (debug.on()) debug.log("write: no demand: 0");359return 0;360}361}362363/**364* Shutdown the connection for reading without closing the channel.365*366* <p> Once shutdown for reading then further reads on the channel will367* return {@code null}, the end-of-stream indication. If the input side of368* the connection is already shutdown then invoking this method has no369* effect.370*371* @throws ClosedChannelException372* If this channel is closed373* @throws IOException374* If some other I/O error occurs375*/376public void shutdownInput() {377if (inputClosed.compareAndSet(false, true)) {378if (debug.on()) debug.log("shutdownInput");379// TransportImpl will eventually call RawChannel::close.380// We must not call it here as this would close the socket381// and can cause an exception to back fire before382// TransportImpl and WebSocketImpl have updated their state.383}384}385386/**387* Shutdown the connection for writing without closing the channel.388*389* <p> Once shutdown for writing then further attempts to write to the390* channel will throw {@link ClosedChannelException}. If the output side of391* the connection is already shutdown then invoking this method has no392* effect.393*394* @throws ClosedChannelException395* If this channel is closed396* @throws IOException397* If some other I/O error occurs398*/399public void shutdownOutput() {400if (outputClosed.compareAndSet(false, true)) {401if (debug.on()) debug.log("shutdownOutput");402// TransportImpl will eventually call RawChannel::close.403// We must not call it here as this would close the socket404// and can cause an exception to back fire before405// TransportImpl and WebSocketImpl have updated their state.406}407}408409/**410* Closes this channel.411*412* @throws IOException413* If an I/O error occurs414*/415@Override416public void close() {417if (closed.compareAndSet(false, true)) {418if (debug.on()) debug.log("close");419connection.client().webSocketClose();420connection.close();421}422}423424private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {425int count = Math.min(len, src.length - offset);426if (count <= 0) return Utils.EMPTY_BB_LIST;427if (count == 1) return List.of(Utils.copy(src[offset]));428if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));429List<ByteBuffer> list = new ArrayList<>(count);430for (int i = 0; i < count; i++) {431list.add(Utils.copy(src[offset + i]));432}433return list;434}435}436437438