Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java
41171 views
1
/*
2
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.nio.ByteBuffer;
29
import java.nio.CharBuffer;
30
import java.nio.charset.CharacterCodingException;
31
import java.nio.charset.Charset;
32
import java.nio.charset.CharsetDecoder;
33
import java.nio.charset.CoderResult;
34
import java.nio.charset.CodingErrorAction;
35
import java.util.List;
36
import java.util.Objects;
37
import java.util.concurrent.CompletableFuture;
38
import java.util.concurrent.CompletionStage;
39
import java.util.concurrent.ConcurrentLinkedDeque;
40
import java.util.concurrent.Flow;
41
import java.util.concurrent.Flow.Subscriber;
42
import java.util.concurrent.Flow.Subscription;
43
import java.util.concurrent.atomic.AtomicBoolean;
44
import java.util.concurrent.atomic.AtomicLong;
45
import java.util.concurrent.atomic.AtomicReference;
46
import java.util.function.Function;
47
import jdk.internal.net.http.common.Demand;
48
import java.net.http.HttpResponse.BodySubscriber;
49
import jdk.internal.net.http.common.MinimalFuture;
50
import jdk.internal.net.http.common.SequentialScheduler;
51
52
/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
53
public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
54
implements BodySubscriber<R> {
55
private final CompletableFuture<R> cf = new MinimalFuture<>();
56
private final S subscriber;
57
private final Function<? super S, ? extends R> finisher;
58
private final Charset charset;
59
private final String eol;
60
private final AtomicBoolean subscribed = new AtomicBoolean();
61
private volatile LineSubscription downstream;
62
63
private LineSubscriberAdapter(S subscriber,
64
Function<? super S, ? extends R> finisher,
65
Charset charset,
66
String eol) {
67
if (eol != null && eol.isEmpty())
68
throw new IllegalArgumentException("empty line separator");
69
this.subscriber = Objects.requireNonNull(subscriber);
70
this.finisher = Objects.requireNonNull(finisher);
71
this.charset = Objects.requireNonNull(charset);
72
this.eol = eol;
73
}
74
75
@Override
76
public void onSubscribe(Subscription subscription) {
77
Objects.requireNonNull(subscription);
78
if (!subscribed.compareAndSet(false, true)) {
79
subscription.cancel();
80
return;
81
}
82
83
downstream = LineSubscription.create(subscription,
84
charset,
85
eol,
86
subscriber,
87
cf);
88
subscriber.onSubscribe(downstream);
89
}
90
91
@Override
92
public void onNext(List<ByteBuffer> item) {
93
Objects.requireNonNull(item);
94
try {
95
downstream.submit(item);
96
} catch (Throwable t) {
97
onError(t);
98
}
99
}
100
101
@Override
102
public void onError(Throwable throwable) {
103
Objects.requireNonNull(throwable);
104
try {
105
downstream.signalError(throwable);
106
} finally {
107
cf.completeExceptionally(throwable);
108
}
109
}
110
111
@Override
112
public void onComplete() {
113
try {
114
downstream.signalComplete();
115
} finally {
116
cf.complete(finisher.apply(subscriber));
117
}
118
}
119
120
@Override
121
public CompletionStage<R> getBody() {
122
return cf;
123
}
124
125
public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
126
create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
127
{
128
if (eol != null && eol.isEmpty())
129
throw new IllegalArgumentException("empty line separator");
130
return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
131
Objects.requireNonNull(finisher),
132
Objects.requireNonNull(charset),
133
eol);
134
}
135
136
static final class LineSubscription implements Flow.Subscription {
137
final Flow.Subscription upstreamSubscription;
138
final CharsetDecoder decoder;
139
final String newline;
140
final Demand downstreamDemand;
141
final ConcurrentLinkedDeque<ByteBuffer> queue;
142
final SequentialScheduler scheduler;
143
final Flow.Subscriber<? super String> upstream;
144
final CompletableFuture<?> cf;
145
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
146
private final AtomicLong demanded = new AtomicLong();
147
private volatile boolean completed;
148
private volatile boolean cancelled;
149
150
private final char[] chars = new char[1024];
151
private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
152
private final CharBuffer buffer = CharBuffer.wrap(chars);
153
private final StringBuilder builder = new StringBuilder();
154
private String nextLine;
155
156
private LineSubscription(Flow.Subscription s,
157
CharsetDecoder dec,
158
String separator,
159
Flow.Subscriber<? super String> subscriber,
160
CompletableFuture<?> completion) {
161
downstreamDemand = new Demand();
162
queue = new ConcurrentLinkedDeque<>();
163
upstreamSubscription = Objects.requireNonNull(s);
164
decoder = Objects.requireNonNull(dec);
165
newline = separator;
166
upstream = Objects.requireNonNull(subscriber);
167
cf = Objects.requireNonNull(completion);
168
scheduler = SequentialScheduler.lockingScheduler(this::loop);
169
}
170
171
@Override
172
public void request(long n) {
173
if (cancelled) return;
174
if (downstreamDemand.increase(n)) {
175
scheduler.runOrSchedule();
176
}
177
}
178
179
@Override
180
public void cancel() {
181
cancelled = true;
182
upstreamSubscription.cancel();
183
}
184
185
public void submit(List<ByteBuffer> list) {
186
queue.addAll(list);
187
demanded.decrementAndGet();
188
scheduler.runOrSchedule();
189
}
190
191
public void signalComplete() {
192
completed = true;
193
scheduler.runOrSchedule();
194
}
195
196
public void signalError(Throwable error) {
197
if (errorRef.compareAndSet(null,
198
Objects.requireNonNull(error))) {
199
scheduler.runOrSchedule();
200
}
201
}
202
203
// This method looks at whether some bytes where left over (in leftover)
204
// from decoding the previous buffer when the previous buffer was in
205
// underflow. If so, it takes bytes one by one from the new buffer 'in'
206
// and combines them with the leftover bytes until 'in' is exhausted or a
207
// character was produced in 'out', resolving the previous underflow.
208
// Returns true if the buffer is still in underflow, false otherwise.
209
// However, in both situation some chars might have been produced in 'out'.
210
private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
211
throws CharacterCodingException {
212
int limit = leftover.position();
213
if (limit == 0) {
214
// no leftover
215
return false;
216
} else {
217
CoderResult res = null;
218
while (in.hasRemaining()) {
219
leftover.position(limit);
220
leftover.limit(++limit);
221
leftover.put(in.get());
222
leftover.position(0);
223
res = decoder.decode(leftover, out,
224
endOfInput && !in.hasRemaining());
225
int remaining = leftover.remaining();
226
if (remaining > 0) {
227
assert leftover.position() == 0;
228
leftover.position(remaining);
229
} else {
230
leftover.position(0);
231
}
232
leftover.limit(leftover.capacity());
233
if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
234
continue;
235
}
236
if (res.isError()) {
237
res.throwException();
238
}
239
assert !res.isOverflow();
240
return false;
241
}
242
return !endOfInput;
243
}
244
}
245
246
// extract characters from start to end and remove them from
247
// the StringBuilder
248
private static String take(StringBuilder b, int start, int end) {
249
assert start == 0;
250
String line;
251
if (end == start) return "";
252
line = b.substring(start, end);
253
b.delete(start, end);
254
return line;
255
}
256
257
// finds end of line, returns -1 if not found, or the position after
258
// the line delimiter if found, removing the delimiter in the process.
259
private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
260
int len = b.length();
261
if (eol != null) { // delimiter explicitly specified
262
int i = b.indexOf(eol);
263
if (i >= 0) {
264
// remove the delimiter and returns the position
265
// of the char after it.
266
b.delete(i, i + eol.length());
267
return i;
268
}
269
} else { // no delimiter specified, behaves as BufferedReader::readLine
270
boolean crfound = false;
271
for (int i = 0; i < len; i++) {
272
char c = b.charAt(i);
273
if (c == '\n') {
274
// '\n' or '\r\n' found.
275
// remove the delimiter and returns the position
276
// of the char after it.
277
b.delete(crfound ? i - 1 : i, i + 1);
278
return crfound ? i - 1 : i;
279
} else if (crfound) {
280
// previous char was '\r', c != '\n'
281
assert i != 0;
282
// remove the delimiter and returns the position
283
// of the char after it.
284
b.delete(i - 1, i);
285
return i - 1;
286
}
287
crfound = c == '\r';
288
}
289
if (crfound && endOfInput) {
290
// remove the delimiter and returns the position
291
// of the char after it.
292
b.delete(len - 1, len);
293
return len - 1;
294
}
295
}
296
return endOfInput && len > 0 ? len : -1;
297
}
298
299
// Looks at whether the StringBuilder contains a line.
300
// Returns null if more character are needed.
301
private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
302
int next = endOfLine(b, eol, endOfInput);
303
return (next > -1) ? take(b, 0, next) : null;
304
}
305
306
// Attempts to read the next line. Returns the next line if
307
// the delimiter was found, null otherwise. The delimiters are
308
// consumed.
309
private String nextLine()
310
throws CharacterCodingException {
311
assert nextLine == null;
312
LINES:
313
while (nextLine == null) {
314
boolean endOfInput = completed && queue.isEmpty();
315
nextLine = nextLine(builder, newline,
316
endOfInput && leftover.position() == 0);
317
if (nextLine != null) return nextLine;
318
ByteBuffer b;
319
BUFFERS:
320
while ((b = queue.peek()) != null) {
321
if (!b.hasRemaining()) {
322
queue.poll();
323
continue BUFFERS;
324
}
325
BYTES:
326
while (b.hasRemaining()) {
327
buffer.position(0);
328
buffer.limit(buffer.capacity());
329
boolean endofInput = completed && queue.size() <= 1;
330
if (isUnderFlow(b, buffer, endofInput)) {
331
assert !b.hasRemaining();
332
if (buffer.position() > 0) {
333
buffer.flip();
334
builder.append(buffer);
335
}
336
continue BUFFERS;
337
}
338
CoderResult res = decoder.decode(b, buffer, endofInput);
339
if (res.isError()) res.throwException();
340
if (buffer.position() > 0) {
341
buffer.flip();
342
builder.append(buffer);
343
continue LINES;
344
}
345
if (res.isUnderflow() && b.hasRemaining()) {
346
//System.out.println("underflow: adding " + b.remaining() + " bytes");
347
leftover.put(b);
348
assert !b.hasRemaining();
349
continue BUFFERS;
350
}
351
}
352
}
353
354
assert queue.isEmpty();
355
if (endOfInput) {
356
// Time to cleanup: there may be some undecoded leftover bytes
357
// We need to flush them out.
358
// The decoder has been configured to replace malformed/unmappable
359
// chars with some replacement, in order to behave like
360
// InputStreamReader.
361
leftover.flip();
362
buffer.position(0);
363
buffer.limit(buffer.capacity());
364
365
// decode() must be called just before flush, even if there
366
// is nothing to decode. We must do this even if leftover
367
// has no remaining bytes.
368
CoderResult res = decoder.decode(leftover, buffer, endOfInput);
369
if (buffer.position() > 0) {
370
buffer.flip();
371
builder.append(buffer);
372
}
373
if (res.isError()) res.throwException();
374
375
// Now call decoder.flush()
376
buffer.position(0);
377
buffer.limit(buffer.capacity());
378
res = decoder.flush(buffer);
379
if (buffer.position() > 0) {
380
buffer.flip();
381
builder.append(buffer);
382
}
383
if (res.isError()) res.throwException();
384
385
// It's possible that we reach here twice - just for the
386
// purpose of checking that no bytes were left over, so
387
// we reset leftover/decoder to make the function reentrant.
388
leftover.position(0);
389
leftover.limit(leftover.capacity());
390
decoder.reset();
391
392
// if some chars were produced then this call will
393
// return them.
394
return nextLine = nextLine(builder, newline, endOfInput);
395
}
396
return null;
397
}
398
return null;
399
}
400
401
// The main sequential scheduler loop.
402
private void loop() {
403
try {
404
while (!cancelled) {
405
Throwable error = errorRef.get();
406
if (error != null) {
407
cancelled = true;
408
scheduler.stop();
409
upstream.onError(error);
410
cf.completeExceptionally(error);
411
return;
412
}
413
if (nextLine == null) nextLine = nextLine();
414
if (nextLine == null) {
415
if (completed) {
416
scheduler.stop();
417
if (leftover.position() != 0) {
418
// Underflow: not all bytes could be
419
// decoded, but no more bytes will be coming.
420
// This should not happen as we should already
421
// have got a MalformedInputException, or
422
// replaced the unmappable chars.
423
errorRef.compareAndSet(null,
424
new IllegalStateException(
425
"premature end of input ("
426
+ leftover.position()
427
+ " undecoded bytes)"));
428
continue;
429
} else {
430
upstream.onComplete();
431
}
432
return;
433
} else if (demanded.get() == 0
434
&& !downstreamDemand.isFulfilled()) {
435
long incr = Math.max(1, downstreamDemand.get());
436
demanded.addAndGet(incr);
437
upstreamSubscription.request(incr);
438
continue;
439
} else return;
440
}
441
assert nextLine != null;
442
assert newline != null && !nextLine.endsWith(newline)
443
|| !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
444
if (downstreamDemand.tryDecrement()) {
445
String forward = nextLine;
446
nextLine = null;
447
upstream.onNext(forward);
448
} else return; // no demand: come back later
449
}
450
} catch (Throwable t) {
451
try {
452
upstreamSubscription.cancel();
453
} finally {
454
signalError(t);
455
}
456
}
457
}
458
459
static LineSubscription create(Flow.Subscription s,
460
Charset charset,
461
String lineSeparator,
462
Flow.Subscriber<? super String> upstream,
463
CompletableFuture<?> cf) {
464
return new LineSubscription(Objects.requireNonNull(s),
465
Objects.requireNonNull(charset).newDecoder()
466
// use the same decoder configuration than
467
// java.io.InputStreamReader
468
.onMalformedInput(CodingErrorAction.REPLACE)
469
.onUnmappableCharacter(CodingErrorAction.REPLACE),
470
lineSeparator,
471
Objects.requireNonNull(upstream),
472
Objects.requireNonNull(cf));
473
}
474
}
475
}
476
477
478