Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java
41171 views
1
/*
2
* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.util.Iterator;
29
import java.util.concurrent.Flow;
30
import jdk.internal.net.http.common.Demand;
31
import jdk.internal.net.http.common.SequentialScheduler;
32
33
/**
34
* A Publisher that publishes items obtained from the given Iterable. Each new
35
* subscription gets a new Iterator.
36
*/
37
class PullPublisher<T> implements Flow.Publisher<T> {
38
39
// Only one of `iterable` and `throwable` can be non-null. throwable is
40
// non-null when an error has been encountered, by the creator of
41
// PullPublisher, while subscribing the subscriber, but before subscribe has
42
// completed.
43
private final Iterable<T> iterable;
44
private final Throwable throwable;
45
46
PullPublisher(Iterable<T> iterable, Throwable throwable) {
47
this.iterable = iterable;
48
this.throwable = throwable;
49
}
50
51
PullPublisher(Iterable<T> iterable) {
52
this(iterable, null);
53
}
54
55
@Override
56
public void subscribe(Flow.Subscriber<? super T> subscriber) {
57
Subscription sub;
58
if (throwable != null) {
59
assert iterable == null : "non-null iterable: " + iterable;
60
sub = new Subscription(subscriber, null, throwable);
61
} else {
62
assert throwable == null : "non-null exception: " + throwable;
63
sub = new Subscription(subscriber, iterable.iterator(), null);
64
}
65
subscriber.onSubscribe(sub);
66
67
if (throwable != null) {
68
sub.pullScheduler.runOrSchedule();
69
}
70
}
71
72
private class Subscription implements Flow.Subscription {
73
74
private final Flow.Subscriber<? super T> subscriber;
75
private final Iterator<T> iter;
76
private volatile boolean completed;
77
private volatile boolean cancelled;
78
private volatile Throwable error;
79
final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
80
private final Demand demand = new Demand();
81
82
Subscription(Flow.Subscriber<? super T> subscriber,
83
Iterator<T> iter,
84
Throwable throwable) {
85
this.subscriber = subscriber;
86
this.iter = iter;
87
this.error = throwable;
88
}
89
90
final class PullTask extends SequentialScheduler.CompleteRestartableTask {
91
@Override
92
protected void run() {
93
if (completed || cancelled) {
94
return;
95
}
96
97
Throwable t = error;
98
if (t != null) {
99
completed = true;
100
pullScheduler.stop();
101
subscriber.onError(t);
102
return;
103
}
104
105
while (demand.tryDecrement() && !cancelled) {
106
T next;
107
try {
108
if (!iter.hasNext()) {
109
break;
110
}
111
next = iter.next();
112
} catch (Throwable t1) {
113
completed = true;
114
pullScheduler.stop();
115
subscriber.onError(t1);
116
return;
117
}
118
subscriber.onNext(next);
119
}
120
if (!iter.hasNext() && !cancelled) {
121
completed = true;
122
pullScheduler.stop();
123
subscriber.onComplete();
124
}
125
}
126
}
127
128
@Override
129
public void request(long n) {
130
if (cancelled)
131
return; // no-op
132
133
if (n <= 0) {
134
error = new IllegalArgumentException("non-positive subscription request: " + n);
135
} else {
136
demand.increase(n);
137
}
138
pullScheduler.runOrSchedule();
139
}
140
141
@Override
142
public void cancel() {
143
cancelled = true;
144
}
145
}
146
}
147
148