Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/BufferingSubscriber.java
41171 views
/*1* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.nio.ByteBuffer;28import java.util.ArrayList;29import java.util.Collections;30import java.util.List;31import java.util.ListIterator;32import java.util.Objects;33import java.util.concurrent.CompletionStage;34import java.util.concurrent.Flow;35import java.util.concurrent.atomic.AtomicBoolean;36import java.net.http.HttpResponse.BodySubscriber;37import jdk.internal.net.http.common.Demand;38import jdk.internal.net.http.common.SequentialScheduler;39import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;4041/**42* A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given43* amount ( in bytes ) of a publisher's data before pushing it to a downstream44* subscriber.45*/46public class BufferingSubscriber<T> implements TrustedSubscriber<T>47{48/** The downstream consumer of the data. */49private final BodySubscriber<T> downstreamSubscriber;50/** The amount of data to be accumulate before pushing downstream. */51private final int bufferSize;5253/** The subscription, created lazily. */54private volatile Flow.Subscription subscription;55/** The downstream subscription, created lazily. */56private volatile DownstreamSubscription downstreamSubscription;5758/** Must be held when accessing the internal buffers. */59private final Object buffersLock = new Object();60/** The internal buffers holding the buffered data. */61private ArrayList<ByteBuffer> internalBuffers;62/** The actual accumulated remaining bytes in internalBuffers. */63private int accumulatedBytes;6465/** Holds the Throwable from upstream's onError. */66private volatile Throwable throwable;6768/** State of the buffering subscriber:69* 1) [UNSUBSCRIBED] when initially created70* 2) [ACTIVE] when subscribed and can receive data71* 3) [ERROR | CANCELLED | COMPLETE] (terminal state)72*/73static final int UNSUBSCRIBED = 0x01;74static final int ACTIVE = 0x02;75static final int ERROR = 0x04;76static final int CANCELLED = 0x08;77static final int COMPLETE = 0x10;7879private volatile int state;8081public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber,82int bufferSize) {83this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber);84this.bufferSize = bufferSize;85synchronized (buffersLock) {86internalBuffers = new ArrayList<>();87}88state = UNSUBSCRIBED;89}9091/** Returns the number of bytes remaining in the given buffers. */92private static final long remaining(List<ByteBuffer> buffers) {93return buffers.stream().mapToLong(ByteBuffer::remaining).sum();94}9596@Override97public boolean needsExecutor() {98return TrustedSubscriber.needsExecutor(downstreamSubscriber);99}100101/**102* Tells whether, or not, there is at least a sufficient number of bytes103* accumulated in the internal buffers. If the subscriber is COMPLETE, and104* has some buffered data, then there is always enough ( to pass downstream ).105*/106private final boolean hasEnoughAccumulatedBytes() {107assert Thread.holdsLock(buffersLock);108return accumulatedBytes >= bufferSize109|| (state == COMPLETE && accumulatedBytes > 0);110}111112/**113* Returns a new, unmodifiable, List<ByteBuffer> containing exactly the114* amount of data as required before pushing downstream. The amount of data115* may be less than required ( bufferSize ), in the case where the subscriber116* is COMPLETE.117*/118private List<ByteBuffer> fromInternalBuffers() {119assert Thread.holdsLock(buffersLock);120int leftToFill = bufferSize;121int state = this.state;122assert (state == ACTIVE || state == CANCELLED)123? accumulatedBytes >= leftToFill : true;124List<ByteBuffer> dsts = new ArrayList<>();125126ListIterator<ByteBuffer> itr = internalBuffers.listIterator();127while (itr.hasNext()) {128ByteBuffer b = itr.next();129if (b.remaining() <= leftToFill) {130itr.remove();131if (b.position() != 0)132b = b.slice(); // ensure position = 0 when propagated133dsts.add(b);134leftToFill -= b.remaining();135accumulatedBytes -= b.remaining();136if (leftToFill == 0)137break;138} else {139int prevLimit = b.limit();140b.limit(b.position() + leftToFill);141ByteBuffer slice = b.slice();142dsts.add(slice);143b.limit(prevLimit);144b.position(b.position() + leftToFill);145accumulatedBytes -= leftToFill;146leftToFill = 0;147break;148}149}150assert (state == ACTIVE || state == CANCELLED)151? leftToFill == 0 : state == COMPLETE;152assert (state == ACTIVE || state == CANCELLED)153? remaining(dsts) == bufferSize : state == COMPLETE;154assert accumulatedBytes >= 0;155assert dsts.stream().noneMatch(b -> b.position() != 0);156return Collections.unmodifiableList(dsts);157}158159/** Subscription that is passed to the downstream subscriber. */160private class DownstreamSubscription implements Flow.Subscription {161private final AtomicBoolean cancelled = new AtomicBoolean(); // false162private final Demand demand = new Demand();163private volatile boolean illegalArg;164165@Override166public void request(long n) {167if (cancelled.get() || illegalArg) {168return;169}170if (n <= 0L) {171// pass the "bad" value upstream so the Publisher can deal with172// it appropriately, i.e. invoke onError173illegalArg = true;174subscription.request(n);175return;176}177178demand.increase(n);179180pushDemanded();181}182183private final SequentialScheduler pushDemandedScheduler =184new SequentialScheduler(new PushDemandedTask());185186void pushDemanded() {187if (cancelled.get())188return;189pushDemandedScheduler.runOrSchedule();190}191192class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {193@Override194public void run() {195try {196Throwable t = throwable;197if (t != null) {198pushDemandedScheduler.stop(); // stop the demand scheduler199downstreamSubscriber.onError(t);200return;201}202203while (true) {204List<ByteBuffer> item;205synchronized (buffersLock) {206if (cancelled.get())207return;208if (!hasEnoughAccumulatedBytes())209break;210if (!demand.tryDecrement())211break;212item = fromInternalBuffers();213}214assert item != null;215216downstreamSubscriber.onNext(item);217}218if (cancelled.get())219return;220221// complete only if all data consumed222boolean complete;223synchronized (buffersLock) {224complete = state == COMPLETE && internalBuffers.isEmpty();225}226if (complete) {227assert internalBuffers.isEmpty();228pushDemandedScheduler.stop(); // stop the demand scheduler229downstreamSubscriber.onComplete();230return;231}232} catch (Throwable t) {233cancel(); // cancel if there is any error234throw t;235}236237boolean requestMore = false;238synchronized (buffersLock) {239if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {240// request more upstream data241requestMore = true;242}243}244if (requestMore)245subscription.request(1);246}247}248249@Override250public void cancel() {251if (cancelled.compareAndExchange(false, true))252return; // already cancelled253254state = CANCELLED; // set CANCELLED state of upstream subscriber255subscription.cancel(); // cancel upstream subscription256pushDemandedScheduler.stop(); // stop the demand scheduler257}258}259260@Override261public void onSubscribe(Flow.Subscription subscription) {262Objects.requireNonNull(subscription);263if (this.subscription != null) {264subscription.cancel();265return;266}267268int s = this.state;269assert s == UNSUBSCRIBED;270state = ACTIVE;271this.subscription = subscription;272downstreamSubscription = new DownstreamSubscription();273downstreamSubscriber.onSubscribe(downstreamSubscription);274}275276@Override277public void onNext(List<ByteBuffer> item) {278Objects.requireNonNull(item);279280int s = state;281if (s == CANCELLED)282return;283284if (s != ACTIVE)285throw new InternalError("onNext on inactive subscriber");286287synchronized (buffersLock) {288internalBuffers.addAll(item);289accumulatedBytes += remaining(item);290}291292downstreamSubscription.pushDemanded();293}294295@Override296public void onError(Throwable incomingThrowable) {297Objects.requireNonNull(incomingThrowable);298int s = state;299assert s == ACTIVE : "Expected ACTIVE, got:" + s;300state = ERROR;301Throwable t = this.throwable;302assert t == null : "Expected null, got:" + t;303this.throwable = incomingThrowable;304downstreamSubscription.pushDemanded();305}306307@Override308public void onComplete() {309int s = state;310assert s == ACTIVE : "Expected ACTIVE, got:" + s;311state = COMPLETE;312downstreamSubscription.pushDemanded();313}314315@Override316public CompletionStage<T> getBody() {317return downstreamSubscriber.getBody();318}319}320321322