Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java
41171 views
/*1* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.util.Iterator;28import java.util.concurrent.Flow;29import jdk.internal.net.http.common.Demand;30import jdk.internal.net.http.common.SequentialScheduler;3132/**33* A Publisher that publishes items obtained from the given Iterable. Each new34* subscription gets a new Iterator.35*/36class PullPublisher<T> implements Flow.Publisher<T> {3738// Only one of `iterable` and `throwable` can be non-null. throwable is39// non-null when an error has been encountered, by the creator of40// PullPublisher, while subscribing the subscriber, but before subscribe has41// completed.42private final Iterable<T> iterable;43private final Throwable throwable;4445PullPublisher(Iterable<T> iterable, Throwable throwable) {46this.iterable = iterable;47this.throwable = throwable;48}4950PullPublisher(Iterable<T> iterable) {51this(iterable, null);52}5354@Override55public void subscribe(Flow.Subscriber<? super T> subscriber) {56Subscription sub;57if (throwable != null) {58assert iterable == null : "non-null iterable: " + iterable;59sub = new Subscription(subscriber, null, throwable);60} else {61assert throwable == null : "non-null exception: " + throwable;62sub = new Subscription(subscriber, iterable.iterator(), null);63}64subscriber.onSubscribe(sub);6566if (throwable != null) {67sub.pullScheduler.runOrSchedule();68}69}7071private class Subscription implements Flow.Subscription {7273private final Flow.Subscriber<? super T> subscriber;74private final Iterator<T> iter;75private volatile boolean completed;76private volatile boolean cancelled;77private volatile Throwable error;78final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());79private final Demand demand = new Demand();8081Subscription(Flow.Subscriber<? super T> subscriber,82Iterator<T> iter,83Throwable throwable) {84this.subscriber = subscriber;85this.iter = iter;86this.error = throwable;87}8889final class PullTask extends SequentialScheduler.CompleteRestartableTask {90@Override91protected void run() {92if (completed || cancelled) {93return;94}9596Throwable t = error;97if (t != null) {98completed = true;99pullScheduler.stop();100subscriber.onError(t);101return;102}103104while (demand.tryDecrement() && !cancelled) {105T next;106try {107if (!iter.hasNext()) {108break;109}110next = iter.next();111} catch (Throwable t1) {112completed = true;113pullScheduler.stop();114subscriber.onError(t1);115return;116}117subscriber.onNext(next);118}119if (!iter.hasNext() && !cancelled) {120completed = true;121pullScheduler.stop();122subscriber.onComplete();123}124}125}126127@Override128public void request(long n) {129if (cancelled)130return; // no-op131132if (n <= 0) {133error = new IllegalArgumentException("non-positive subscription request: " + n);134} else {135demand.increase(n);136}137pullScheduler.runOrSchedule();138}139140@Override141public void cancel() {142cancelled = true;143}144}145}146147148