Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
41171 views
1
/*
2
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.FileInputStream;
29
import java.io.FileNotFoundException;
30
import java.io.FilePermission;
31
import java.io.IOException;
32
import java.io.InputStream;
33
import java.io.UncheckedIOException;
34
import java.lang.reflect.UndeclaredThrowableException;
35
import java.net.http.HttpRequest.BodyPublisher;
36
import java.nio.ByteBuffer;
37
import java.nio.charset.Charset;
38
import java.nio.file.Files;
39
import java.nio.file.Path;
40
import java.security.AccessControlContext;
41
import java.security.AccessController;
42
import java.security.Permission;
43
import java.security.PrivilegedActionException;
44
import java.security.PrivilegedExceptionAction;
45
import java.util.ArrayList;
46
import java.util.Collections;
47
import java.util.Iterator;
48
import java.util.List;
49
import java.util.NoSuchElementException;
50
import java.util.Objects;
51
import java.util.Queue;
52
import java.util.concurrent.ConcurrentLinkedQueue;
53
import java.util.concurrent.Flow;
54
import java.util.concurrent.Flow.Publisher;
55
import java.util.concurrent.atomic.AtomicReference;
56
import java.util.function.Function;
57
import java.util.function.Supplier;
58
59
import jdk.internal.net.http.common.Demand;
60
import jdk.internal.net.http.common.SequentialScheduler;
61
import jdk.internal.net.http.common.Utils;
62
63
public final class RequestPublishers {
64
65
private RequestPublishers() { }
66
67
public static class ByteArrayPublisher implements BodyPublisher {
68
private final int length;
69
private final byte[] content;
70
private final int offset;
71
private final int bufSize;
72
73
public ByteArrayPublisher(byte[] content) {
74
this(content, 0, content.length);
75
}
76
77
public ByteArrayPublisher(byte[] content, int offset, int length) {
78
this(content, offset, length, Utils.BUFSIZE);
79
}
80
81
/* bufSize exposed for testing purposes */
82
ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
83
this.content = content;
84
this.offset = offset;
85
this.length = length;
86
this.bufSize = bufSize;
87
}
88
89
List<ByteBuffer> copy(byte[] content, int offset, int length) {
90
List<ByteBuffer> bufs = new ArrayList<>();
91
while (length > 0) {
92
ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));
93
int max = b.capacity();
94
int tocopy = Math.min(max, length);
95
b.put(content, offset, tocopy);
96
offset += tocopy;
97
length -= tocopy;
98
b.flip();
99
bufs.add(b);
100
}
101
return bufs;
102
}
103
104
@Override
105
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
106
List<ByteBuffer> copy = copy(content, offset, length);
107
var delegate = new PullPublisher<>(copy);
108
delegate.subscribe(subscriber);
109
}
110
111
@Override
112
public long contentLength() {
113
return length;
114
}
115
}
116
117
// This implementation has lots of room for improvement.
118
public static class IterablePublisher implements BodyPublisher {
119
private final Iterable<byte[]> content;
120
private volatile long contentLength;
121
122
public IterablePublisher(Iterable<byte[]> content) {
123
this.content = Objects.requireNonNull(content);
124
}
125
126
// The ByteBufferIterator will iterate over the byte[] arrays in
127
// the content one at the time.
128
//
129
class ByteBufferIterator implements Iterator<ByteBuffer> {
130
final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
131
final Iterator<byte[]> iterator = content.iterator();
132
@Override
133
public boolean hasNext() {
134
return !buffers.isEmpty() || iterator.hasNext();
135
}
136
137
@Override
138
public ByteBuffer next() {
139
ByteBuffer buffer = buffers.poll();
140
while (buffer == null) {
141
copy();
142
buffer = buffers.poll();
143
}
144
return buffer;
145
}
146
147
ByteBuffer getBuffer() {
148
return Utils.getBuffer();
149
}
150
151
void copy() {
152
byte[] bytes = iterator.next();
153
int length = bytes.length;
154
if (length == 0 && iterator.hasNext()) {
155
// avoid inserting empty buffers, except
156
// if that's the last.
157
return;
158
}
159
int offset = 0;
160
do {
161
ByteBuffer b = getBuffer();
162
int max = b.capacity();
163
164
int tocopy = Math.min(max, length);
165
b.put(bytes, offset, tocopy);
166
offset += tocopy;
167
length -= tocopy;
168
b.flip();
169
buffers.add(b);
170
} while (length > 0);
171
}
172
}
173
174
public Iterator<ByteBuffer> iterator() {
175
return new ByteBufferIterator();
176
}
177
178
@Override
179
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
180
Iterable<ByteBuffer> iterable = this::iterator;
181
var delegate = new PullPublisher<>(iterable);
182
delegate.subscribe(subscriber);
183
}
184
185
static long computeLength(Iterable<byte[]> bytes) {
186
// Avoid iterating just for the purpose of computing
187
// a length, in case iterating is a costly operation
188
// For HTTP/1.1 it means we will be using chunk encoding
189
// when sending the request body.
190
// For HTTP/2 it means we will not send the optional
191
// Content-length header.
192
return -1;
193
}
194
195
@Override
196
public long contentLength() {
197
if (contentLength == 0) {
198
synchronized(this) {
199
if (contentLength == 0) {
200
contentLength = computeLength(content);
201
}
202
}
203
}
204
return contentLength;
205
}
206
}
207
208
public static class StringPublisher extends ByteArrayPublisher {
209
public StringPublisher(String content, Charset charset) {
210
super(content.getBytes(charset));
211
}
212
}
213
214
public static class EmptyPublisher implements BodyPublisher {
215
private final Flow.Publisher<ByteBuffer> delegate =
216
new PullPublisher<ByteBuffer>(Collections.emptyList(), null);
217
218
@Override
219
public long contentLength() {
220
return 0;
221
}
222
223
@Override
224
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
225
delegate.subscribe(subscriber);
226
}
227
}
228
229
/**
230
* Publishes the content of a given file.
231
* <p>
232
* Privileged actions are performed within a limited doPrivileged that only
233
* asserts the specific, read, file permission that was checked during the
234
* construction of this FilePublisher. This only applies if the file system
235
* that created the file provides interoperability with {@code java.io.File}.
236
*/
237
public static class FilePublisher implements BodyPublisher {
238
239
private final Path path;
240
private final long length;
241
private final Function<Path, InputStream> inputStreamSupplier;
242
243
private static String pathForSecurityCheck(Path path) {
244
return path.toFile().getPath();
245
}
246
247
/**
248
* Factory for creating FilePublisher.
249
*
250
* Permission checks are performed here before construction of the
251
* FilePublisher. Permission checking and construction are deliberately
252
* and tightly co-located.
253
*/
254
public static FilePublisher create(Path path)
255
throws FileNotFoundException {
256
@SuppressWarnings("removal")
257
SecurityManager sm = System.getSecurityManager();
258
FilePermission filePermission = null;
259
boolean defaultFS = true;
260
261
try {
262
String fn = pathForSecurityCheck(path);
263
if (sm != null) {
264
FilePermission readPermission = new FilePermission(fn, "read");
265
sm.checkPermission(readPermission);
266
filePermission = readPermission;
267
}
268
} catch (UnsupportedOperationException uoe) {
269
defaultFS = false;
270
// Path not associated with the default file system
271
// Test early if an input stream can still be obtained
272
try {
273
if (sm != null) {
274
Files.newInputStream(path).close();
275
}
276
} catch (IOException ioe) {
277
if (ioe instanceof FileNotFoundException) {
278
throw (FileNotFoundException) ioe;
279
} else {
280
var ex = new FileNotFoundException(ioe.getMessage());
281
ex.initCause(ioe);
282
throw ex;
283
}
284
}
285
}
286
287
// existence check must be after permission checks
288
if (Files.notExists(path))
289
throw new FileNotFoundException(path + " not found");
290
291
Permission perm = filePermission;
292
assert perm == null || perm.getActions().equals("read");
293
@SuppressWarnings("removal")
294
AccessControlContext acc = sm != null ?
295
AccessController.getContext() : null;
296
boolean finalDefaultFS = defaultFS;
297
Function<Path, InputStream> inputStreamSupplier = (p) ->
298
createInputStream(p, acc, perm, finalDefaultFS);
299
300
long length;
301
try {
302
length = Files.size(path);
303
} catch (IOException ioe) {
304
length = -1;
305
}
306
307
return new FilePublisher(path, length, inputStreamSupplier);
308
}
309
310
@SuppressWarnings("removal")
311
private static InputStream createInputStream(Path path,
312
AccessControlContext acc,
313
Permission perm,
314
boolean defaultFS) {
315
try {
316
if (acc != null) {
317
PrivilegedExceptionAction<InputStream> pa = defaultFS
318
? () -> new FileInputStream(path.toFile())
319
: () -> Files.newInputStream(path);
320
return perm != null
321
? AccessController.doPrivileged(pa, acc, perm)
322
: AccessController.doPrivileged(pa, acc);
323
} else {
324
return defaultFS
325
? new FileInputStream(path.toFile())
326
: Files.newInputStream(path);
327
}
328
} catch (PrivilegedActionException pae) {
329
throw toUncheckedException(pae.getCause());
330
} catch (IOException io) {
331
throw new UncheckedIOException(io);
332
}
333
}
334
335
private static RuntimeException toUncheckedException(Throwable t) {
336
if (t instanceof RuntimeException)
337
throw (RuntimeException) t;
338
if (t instanceof Error)
339
throw (Error) t;
340
if (t instanceof IOException)
341
throw new UncheckedIOException((IOException) t);
342
throw new UndeclaredThrowableException(t);
343
}
344
345
private FilePublisher(Path name,
346
long length,
347
Function<Path, InputStream> inputStreamSupplier) {
348
path = name;
349
this.length = length;
350
this.inputStreamSupplier = inputStreamSupplier;
351
}
352
353
@Override
354
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
355
InputStream is = null;
356
Throwable t = null;
357
try {
358
is = inputStreamSupplier.apply(path);
359
} catch (UncheckedIOException | UndeclaredThrowableException ue) {
360
t = ue.getCause();
361
} catch (Throwable th) {
362
t = th;
363
}
364
final InputStream fis = is;
365
PullPublisher<ByteBuffer> publisher;
366
if (t == null) {
367
publisher = new PullPublisher<>(() -> new StreamIterator(fis));
368
} else {
369
publisher = new PullPublisher<>(null, t);
370
}
371
publisher.subscribe(subscriber);
372
}
373
374
@Override
375
public long contentLength() {
376
return length;
377
}
378
}
379
380
/**
381
* Reads one buffer ahead all the time, blocking in hasNext()
382
*/
383
public static class StreamIterator implements Iterator<ByteBuffer> {
384
final InputStream is;
385
final Supplier<? extends ByteBuffer> bufSupplier;
386
private volatile boolean eof;
387
volatile ByteBuffer nextBuffer;
388
volatile boolean need2Read = true;
389
volatile boolean haveNext;
390
391
StreamIterator(InputStream is) {
392
this(is, Utils::getBuffer);
393
}
394
395
StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) {
396
this.is = is;
397
this.bufSupplier = bufSupplier;
398
}
399
400
// Throwable error() {
401
// return error;
402
// }
403
404
private int read() throws IOException {
405
if (eof)
406
return -1;
407
nextBuffer = bufSupplier.get();
408
nextBuffer.clear();
409
byte[] buf = nextBuffer.array();
410
int offset = nextBuffer.arrayOffset();
411
int cap = nextBuffer.capacity();
412
int n = is.read(buf, offset, cap);
413
if (n == -1) {
414
eof = true;
415
return -1;
416
}
417
//flip
418
nextBuffer.limit(n);
419
nextBuffer.position(0);
420
return n;
421
}
422
423
/**
424
* Close stream in this instance.
425
* UncheckedIOException may be thrown if IOE happens at InputStream::close.
426
*/
427
private void closeStream() {
428
try {
429
is.close();
430
} catch (IOException e) {
431
throw new UncheckedIOException(e);
432
}
433
}
434
435
@Override
436
public synchronized boolean hasNext() {
437
if (need2Read) {
438
try {
439
haveNext = read() != -1;
440
if (haveNext) {
441
need2Read = false;
442
}
443
} catch (IOException e) {
444
haveNext = false;
445
need2Read = false;
446
throw new UncheckedIOException(e);
447
} finally {
448
if (!haveNext) {
449
closeStream();
450
}
451
}
452
}
453
return haveNext;
454
}
455
456
@Override
457
public synchronized ByteBuffer next() {
458
if (!hasNext()) {
459
throw new NoSuchElementException();
460
}
461
need2Read = true;
462
return nextBuffer;
463
}
464
465
}
466
467
public static class InputStreamPublisher implements BodyPublisher {
468
private final Supplier<? extends InputStream> streamSupplier;
469
470
public InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
471
this.streamSupplier = Objects.requireNonNull(streamSupplier);
472
}
473
474
@Override
475
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
476
PullPublisher<ByteBuffer> publisher;
477
InputStream is = streamSupplier.get();
478
if (is == null) {
479
Throwable t = new IOException("streamSupplier returned null");
480
publisher = new PullPublisher<>(null, t);
481
} else {
482
publisher = new PullPublisher<>(iterableOf(is), null);
483
}
484
publisher.subscribe(subscriber);
485
}
486
487
protected Iterable<ByteBuffer> iterableOf(InputStream is) {
488
return () -> new StreamIterator(is);
489
}
490
491
@Override
492
public long contentLength() {
493
return -1;
494
}
495
}
496
497
public static final class PublisherAdapter implements BodyPublisher {
498
499
private final Publisher<? extends ByteBuffer> publisher;
500
private final long contentLength;
501
502
public PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
503
long contentLength) {
504
this.publisher = Objects.requireNonNull(publisher);
505
this.contentLength = contentLength;
506
}
507
508
@Override
509
public final long contentLength() {
510
return contentLength;
511
}
512
513
@Override
514
public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
515
publisher.subscribe(subscriber);
516
}
517
}
518
519
520
public static BodyPublisher concat(BodyPublisher... publishers) {
521
if (publishers.length == 0) {
522
return new EmptyPublisher();
523
} else if (publishers.length == 1) {
524
return Objects.requireNonNull(publishers[0]);
525
} else {
526
return new AggregatePublisher(List.of(publishers));
527
}
528
}
529
530
/**
531
* An aggregate publisher acts as a proxy between a subscriber
532
* and a list of publishers. It lazily subscribes to each publisher
533
* in sequence in order to publish a request body that is
534
* composed from all the bytes obtained from each publisher.
535
* For instance, the following two publishers are equivalent, even
536
* though they may result in a different count of {@code onNext}
537
* invocations.
538
* <pre>{@code
539
* var bp1 = BodyPublishers.ofString("ab");
540
* var bp2 = BodyPublishers.concat(BodyPublishers.ofString("a"),
541
* BodyPublisher.ofByteArray(new byte[] {(byte)'b'}));
542
* }</pre>
543
*
544
*/
545
private static final class AggregatePublisher implements BodyPublisher {
546
final List<BodyPublisher> bodies;
547
AggregatePublisher(List<BodyPublisher> bodies) {
548
this.bodies = bodies;
549
}
550
551
// -1 must be returned if any publisher returns -1
552
// Otherwise, we can just sum the contents.
553
@Override
554
public long contentLength() {
555
long length = bodies.stream()
556
.mapToLong(BodyPublisher::contentLength)
557
.reduce((a,b) -> a < 0 || b < 0 ? -1 : a + b)
558
.orElse(0);
559
// In case of overflow in any operation but the last, length
560
// will be -1.
561
// In case of overflow in the last reduce operation, length
562
// will be negative, but not necessarily -1: in that case,
563
// return -1
564
if (length < 0) return -1;
565
return length;
566
}
567
568
@Override
569
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
570
subscriber.onSubscribe(new AggregateSubscription(bodies, subscriber));
571
}
572
}
573
574
private static final class AggregateSubscription
575
implements Flow.Subscription, Flow.Subscriber<ByteBuffer> {
576
final Flow.Subscriber<? super ByteBuffer> subscriber; // upstream
577
final Queue<BodyPublisher> bodies;
578
final SequentialScheduler scheduler;
579
final Demand demand = new Demand(); // from upstream
580
final Demand demanded = new Demand(); // requested downstream
581
final AtomicReference<Throwable> error = new AtomicReference<>();
582
volatile Throwable illegalRequest;
583
volatile BodyPublisher publisher; // downstream
584
volatile Flow.Subscription subscription; // downstream
585
volatile boolean cancelled;
586
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
587
this.bodies = new ConcurrentLinkedQueue<>(bodies);
588
this.subscriber = subscriber;
589
this.scheduler = SequentialScheduler.lockingScheduler(this::run);
590
}
591
592
@Override
593
public void request(long n) {
594
if (cancelled || publisher == null && bodies.isEmpty()) {
595
return;
596
}
597
try {
598
demand.increase(n);
599
} catch (IllegalArgumentException x) {
600
illegalRequest = x;
601
}
602
scheduler.runOrSchedule();
603
}
604
605
@Override
606
public void cancel() {
607
cancelled = true;
608
scheduler.runOrSchedule();
609
}
610
611
private boolean cancelSubscription() {
612
Flow.Subscription subscription = this.subscription;
613
if (subscription != null) {
614
this.subscription = null;
615
this.publisher = null;
616
subscription.cancel();
617
}
618
scheduler.stop();
619
return subscription != null;
620
}
621
622
public void run() {
623
try {
624
while (error.get() == null
625
&& (!demand.isFulfilled()
626
|| (publisher == null && !bodies.isEmpty()))) {
627
boolean cancelled = this.cancelled;
628
BodyPublisher publisher = this.publisher;
629
Flow.Subscription subscription = this.subscription;
630
Throwable illegalRequest = this.illegalRequest;
631
if (cancelled) {
632
bodies.clear();
633
cancelSubscription();
634
return;
635
}
636
if (publisher == null && !bodies.isEmpty()) {
637
this.publisher = publisher = bodies.poll();
638
publisher.subscribe(this);
639
subscription = this.subscription;
640
} else if (publisher == null) {
641
return;
642
}
643
if (illegalRequest != null) {
644
onError(illegalRequest);
645
return;
646
}
647
if (subscription == null) return;
648
if (!demand.isFulfilled()) {
649
long n = demand.decreaseAndGet(demand.get());
650
demanded.increase(n);
651
subscription.request(n);
652
}
653
}
654
} catch (Throwable t) {
655
onError(t);
656
}
657
}
658
659
660
@Override
661
public void onSubscribe(Flow.Subscription subscription) {
662
this.subscription = subscription;
663
scheduler.runOrSchedule();
664
}
665
666
@Override
667
public void onNext(ByteBuffer item) {
668
// make sure to cancel the subscription if we receive
669
// an item after the subscription was cancelled or
670
// an error was reported.
671
if (cancelled || error.get() != null) {
672
cancelSubscription();
673
return;
674
}
675
demanded.tryDecrement();
676
subscriber.onNext(item);
677
}
678
679
@Override
680
public void onError(Throwable throwable) {
681
if (error.compareAndSet(null, throwable)) {
682
publisher = null;
683
subscription = null;
684
subscriber.onError(throwable);
685
scheduler.stop();
686
}
687
}
688
689
@Override
690
public void onComplete() {
691
if (publisher != null && !bodies.isEmpty()) {
692
while (!demanded.isFulfilled()) {
693
demand.increase(demanded.decreaseAndGet(demanded.get()));
694
}
695
publisher = null;
696
subscription = null;
697
scheduler.runOrSchedule();
698
} else {
699
publisher = null;
700
subscription = null;
701
if (!cancelled) {
702
subscriber.onComplete();
703
}
704
scheduler.stop();
705
}
706
}
707
}
708
}
709
710