Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/BufferingSubscriber.java
41171 views
1
/*
2
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.nio.ByteBuffer;
29
import java.util.ArrayList;
30
import java.util.Collections;
31
import java.util.List;
32
import java.util.ListIterator;
33
import java.util.Objects;
34
import java.util.concurrent.CompletionStage;
35
import java.util.concurrent.Flow;
36
import java.util.concurrent.atomic.AtomicBoolean;
37
import java.net.http.HttpResponse.BodySubscriber;
38
import jdk.internal.net.http.common.Demand;
39
import jdk.internal.net.http.common.SequentialScheduler;
40
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
41
42
/**
43
* A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
44
* amount ( in bytes ) of a publisher's data before pushing it to a downstream
45
* subscriber.
46
*/
47
public class BufferingSubscriber<T> implements TrustedSubscriber<T>
48
{
49
/** The downstream consumer of the data. */
50
private final BodySubscriber<T> downstreamSubscriber;
51
/** The amount of data to be accumulate before pushing downstream. */
52
private final int bufferSize;
53
54
/** The subscription, created lazily. */
55
private volatile Flow.Subscription subscription;
56
/** The downstream subscription, created lazily. */
57
private volatile DownstreamSubscription downstreamSubscription;
58
59
/** Must be held when accessing the internal buffers. */
60
private final Object buffersLock = new Object();
61
/** The internal buffers holding the buffered data. */
62
private ArrayList<ByteBuffer> internalBuffers;
63
/** The actual accumulated remaining bytes in internalBuffers. */
64
private int accumulatedBytes;
65
66
/** Holds the Throwable from upstream's onError. */
67
private volatile Throwable throwable;
68
69
/** State of the buffering subscriber:
70
* 1) [UNSUBSCRIBED] when initially created
71
* 2) [ACTIVE] when subscribed and can receive data
72
* 3) [ERROR | CANCELLED | COMPLETE] (terminal state)
73
*/
74
static final int UNSUBSCRIBED = 0x01;
75
static final int ACTIVE = 0x02;
76
static final int ERROR = 0x04;
77
static final int CANCELLED = 0x08;
78
static final int COMPLETE = 0x10;
79
80
private volatile int state;
81
82
public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber,
83
int bufferSize) {
84
this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber);
85
this.bufferSize = bufferSize;
86
synchronized (buffersLock) {
87
internalBuffers = new ArrayList<>();
88
}
89
state = UNSUBSCRIBED;
90
}
91
92
/** Returns the number of bytes remaining in the given buffers. */
93
private static final long remaining(List<ByteBuffer> buffers) {
94
return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
95
}
96
97
@Override
98
public boolean needsExecutor() {
99
return TrustedSubscriber.needsExecutor(downstreamSubscriber);
100
}
101
102
/**
103
* Tells whether, or not, there is at least a sufficient number of bytes
104
* accumulated in the internal buffers. If the subscriber is COMPLETE, and
105
* has some buffered data, then there is always enough ( to pass downstream ).
106
*/
107
private final boolean hasEnoughAccumulatedBytes() {
108
assert Thread.holdsLock(buffersLock);
109
return accumulatedBytes >= bufferSize
110
|| (state == COMPLETE && accumulatedBytes > 0);
111
}
112
113
/**
114
* Returns a new, unmodifiable, List<ByteBuffer> containing exactly the
115
* amount of data as required before pushing downstream. The amount of data
116
* may be less than required ( bufferSize ), in the case where the subscriber
117
* is COMPLETE.
118
*/
119
private List<ByteBuffer> fromInternalBuffers() {
120
assert Thread.holdsLock(buffersLock);
121
int leftToFill = bufferSize;
122
int state = this.state;
123
assert (state == ACTIVE || state == CANCELLED)
124
? accumulatedBytes >= leftToFill : true;
125
List<ByteBuffer> dsts = new ArrayList<>();
126
127
ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
128
while (itr.hasNext()) {
129
ByteBuffer b = itr.next();
130
if (b.remaining() <= leftToFill) {
131
itr.remove();
132
if (b.position() != 0)
133
b = b.slice(); // ensure position = 0 when propagated
134
dsts.add(b);
135
leftToFill -= b.remaining();
136
accumulatedBytes -= b.remaining();
137
if (leftToFill == 0)
138
break;
139
} else {
140
int prevLimit = b.limit();
141
b.limit(b.position() + leftToFill);
142
ByteBuffer slice = b.slice();
143
dsts.add(slice);
144
b.limit(prevLimit);
145
b.position(b.position() + leftToFill);
146
accumulatedBytes -= leftToFill;
147
leftToFill = 0;
148
break;
149
}
150
}
151
assert (state == ACTIVE || state == CANCELLED)
152
? leftToFill == 0 : state == COMPLETE;
153
assert (state == ACTIVE || state == CANCELLED)
154
? remaining(dsts) == bufferSize : state == COMPLETE;
155
assert accumulatedBytes >= 0;
156
assert dsts.stream().noneMatch(b -> b.position() != 0);
157
return Collections.unmodifiableList(dsts);
158
}
159
160
/** Subscription that is passed to the downstream subscriber. */
161
private class DownstreamSubscription implements Flow.Subscription {
162
private final AtomicBoolean cancelled = new AtomicBoolean(); // false
163
private final Demand demand = new Demand();
164
private volatile boolean illegalArg;
165
166
@Override
167
public void request(long n) {
168
if (cancelled.get() || illegalArg) {
169
return;
170
}
171
if (n <= 0L) {
172
// pass the "bad" value upstream so the Publisher can deal with
173
// it appropriately, i.e. invoke onError
174
illegalArg = true;
175
subscription.request(n);
176
return;
177
}
178
179
demand.increase(n);
180
181
pushDemanded();
182
}
183
184
private final SequentialScheduler pushDemandedScheduler =
185
new SequentialScheduler(new PushDemandedTask());
186
187
void pushDemanded() {
188
if (cancelled.get())
189
return;
190
pushDemandedScheduler.runOrSchedule();
191
}
192
193
class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
194
@Override
195
public void run() {
196
try {
197
Throwable t = throwable;
198
if (t != null) {
199
pushDemandedScheduler.stop(); // stop the demand scheduler
200
downstreamSubscriber.onError(t);
201
return;
202
}
203
204
while (true) {
205
List<ByteBuffer> item;
206
synchronized (buffersLock) {
207
if (cancelled.get())
208
return;
209
if (!hasEnoughAccumulatedBytes())
210
break;
211
if (!demand.tryDecrement())
212
break;
213
item = fromInternalBuffers();
214
}
215
assert item != null;
216
217
downstreamSubscriber.onNext(item);
218
}
219
if (cancelled.get())
220
return;
221
222
// complete only if all data consumed
223
boolean complete;
224
synchronized (buffersLock) {
225
complete = state == COMPLETE && internalBuffers.isEmpty();
226
}
227
if (complete) {
228
assert internalBuffers.isEmpty();
229
pushDemandedScheduler.stop(); // stop the demand scheduler
230
downstreamSubscriber.onComplete();
231
return;
232
}
233
} catch (Throwable t) {
234
cancel(); // cancel if there is any error
235
throw t;
236
}
237
238
boolean requestMore = false;
239
synchronized (buffersLock) {
240
if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
241
// request more upstream data
242
requestMore = true;
243
}
244
}
245
if (requestMore)
246
subscription.request(1);
247
}
248
}
249
250
@Override
251
public void cancel() {
252
if (cancelled.compareAndExchange(false, true))
253
return; // already cancelled
254
255
state = CANCELLED; // set CANCELLED state of upstream subscriber
256
subscription.cancel(); // cancel upstream subscription
257
pushDemandedScheduler.stop(); // stop the demand scheduler
258
}
259
}
260
261
@Override
262
public void onSubscribe(Flow.Subscription subscription) {
263
Objects.requireNonNull(subscription);
264
if (this.subscription != null) {
265
subscription.cancel();
266
return;
267
}
268
269
int s = this.state;
270
assert s == UNSUBSCRIBED;
271
state = ACTIVE;
272
this.subscription = subscription;
273
downstreamSubscription = new DownstreamSubscription();
274
downstreamSubscriber.onSubscribe(downstreamSubscription);
275
}
276
277
@Override
278
public void onNext(List<ByteBuffer> item) {
279
Objects.requireNonNull(item);
280
281
int s = state;
282
if (s == CANCELLED)
283
return;
284
285
if (s != ACTIVE)
286
throw new InternalError("onNext on inactive subscriber");
287
288
synchronized (buffersLock) {
289
internalBuffers.addAll(item);
290
accumulatedBytes += remaining(item);
291
}
292
293
downstreamSubscription.pushDemanded();
294
}
295
296
@Override
297
public void onError(Throwable incomingThrowable) {
298
Objects.requireNonNull(incomingThrowable);
299
int s = state;
300
assert s == ACTIVE : "Expected ACTIVE, got:" + s;
301
state = ERROR;
302
Throwable t = this.throwable;
303
assert t == null : "Expected null, got:" + t;
304
this.throwable = incomingThrowable;
305
downstreamSubscription.pushDemanded();
306
}
307
308
@Override
309
public void onComplete() {
310
int s = state;
311
assert s == ACTIVE : "Expected ACTIVE, got:" + s;
312
state = COMPLETE;
313
downstreamSubscription.pushDemanded();
314
}
315
316
@Override
317
public CompletionStage<T> getBody() {
318
return downstreamSubscriber.getBody();
319
}
320
}
321
322