Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/net/httpclient/BufferingSubscriberTest.java
41149 views
1
/*
2
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
import java.nio.ByteBuffer;
25
import java.util.List;
26
import java.util.Random;
27
import java.util.concurrent.CompletableFuture;
28
import java.util.concurrent.CompletionStage;
29
import java.util.concurrent.Executor;
30
import java.util.concurrent.ExecutorService;
31
import java.util.concurrent.Executors;
32
import java.util.concurrent.Flow;
33
import java.util.concurrent.Flow.Subscription;
34
import java.util.concurrent.SubmissionPublisher;
35
import java.util.function.BiConsumer;
36
import java.net.http.HttpResponse.BodyHandler;
37
import java.net.http.HttpResponse.BodyHandlers;
38
import java.net.http.HttpResponse.BodySubscriber;
39
import java.net.http.HttpResponse.BodySubscribers;
40
import jdk.test.lib.RandomFactory;
41
import org.testng.annotations.DataProvider;
42
import org.testng.annotations.Test;
43
import static java.lang.Long.MAX_VALUE;
44
import static java.lang.Long.min;
45
import static java.lang.System.out;
46
import static java.util.concurrent.CompletableFuture.delayedExecutor;
47
import static java.util.concurrent.TimeUnit.MILLISECONDS;
48
import static org.testng.Assert.*;
49
50
/*
51
* @test
52
* @bug 8184285
53
* @summary Direct test for HttpResponse.BodySubscriber.buffering() API
54
* @key randomness
55
* @library /test/lib
56
* @build jdk.test.lib.RandomFactory
57
* @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
58
*/
59
60
public class BufferingSubscriberTest {
61
62
// If we compute that a test will take less that 10s
63
// we judge it acceptable
64
static final long LOWER_THRESHOLD = 10_000; // 10 sec.
65
// If we compute that a test will take more than 20 sec
66
// we judge it problematic: we will try to adjust the
67
// buffer sizes, and if we can't we will print a warning
68
static final long UPPER_THRESHOLD = 20_000; // 20 sec.
69
70
static final Random random = RandomFactory.getRandom();
71
static final long start = System.nanoTime();
72
static final String START = "start";
73
static final String END = "end ";
74
static long elapsed() { return (System.nanoTime() - start)/1000_000;}
75
static void printStamp(String what, String fmt, Object... args) {
76
long elapsed = elapsed();
77
long sec = elapsed/1000;
78
long ms = elapsed % 1000;
79
String time = sec > 0 ? sec + "sec " : "";
80
time = time + ms + "ms";
81
out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args));
82
}
83
@DataProvider(name = "negatives")
84
public Object[][] negatives() {
85
return new Object[][] { { 0 }, { -1 }, { -1000 } };
86
}
87
88
@Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
89
public void subscriberThrowsIAE(int bufferSize) {
90
printStamp(START, "subscriberThrowsIAE(%d)", bufferSize);
91
try {
92
BodySubscriber<?> bp = BodySubscribers.ofByteArray();
93
BodySubscribers.buffering(bp, bufferSize);
94
} finally {
95
printStamp(END, "subscriberThrowsIAE(%d)", bufferSize);
96
}
97
}
98
99
@Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
100
public void handlerThrowsIAE(int bufferSize) {
101
printStamp(START, "handlerThrowsIAE(%d)", bufferSize);
102
try {
103
BodyHandler<?> bp = BodyHandlers.ofByteArray();
104
BodyHandlers.buffering(bp, bufferSize);
105
} finally {
106
printStamp(END, "handlerThrowsIAE(%d)", bufferSize);
107
}
108
}
109
110
// ---
111
112
@DataProvider(name = "config")
113
public Object[][] config() {
114
return new Object[][] {
115
// iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
116
{ 1, 0, 1, 1, 2, 1 },
117
{ 1, 5, 1, 100, 2, 1 },
118
{ 1, 0, 1, 10, 1000, 1 },
119
{ 1, 10, 1, 10, 1000, 1 },
120
{ 1, 0, 1, 1000, 1000, 10 },
121
{ 1, 0, 10, 1000, 1000, 50 },
122
{ 1, 0, 1000, 10 , 1000, 50 },
123
{ 1, 100, 1, 1000 * 4, 1000, 50 },
124
{ 100, 0, 1000, 1, 2, 1 },
125
{ 3, 0, 4, 5006, 1000, 50 },
126
{ 20, 0, 100, 4888, 1000, 100 },
127
{ 16, 10, 1000, 50 , 1000, 100 },
128
{ 16, 10, 1000, 50 , 657, 657 },
129
};
130
}
131
132
@Test(dataProvider = "config")
133
public void test(int iterations,
134
int delayMillis,
135
int numBuffers,
136
int bufferSize,
137
int maxBufferSize,
138
int minbufferSize) {
139
for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
140
test(iterations,
141
delayMillis,
142
numBuffers,
143
bufferSize,
144
maxBufferSize,
145
minbufferSize,
146
perRequestAmount);
147
}
148
149
volatile boolean onNextThrew;
150
151
BiConsumer<Flow.Subscriber<?>, ? super Throwable> onNextThrowHandler =
152
(sub, ex) -> {
153
onNextThrew = true;
154
System.out.println("onNext threw " + ex);
155
ex.printStackTrace();
156
};
157
158
public void test(int iterations,
159
int delayMillis,
160
int numBuffers,
161
int bufferSize,
162
int maxBufferSize,
163
int minBufferSize,
164
long requestAmount) {
165
ExecutorService executor = Executors.newFixedThreadPool(1);
166
try {
167
out.printf("Iterations %d\n", iterations);
168
for (int i=0; i<iterations; i++ ) {
169
printStamp(START, "Iteration %d", i);
170
try {
171
SubmissionPublisher<List<ByteBuffer>> publisher =
172
new SubmissionPublisher<>(executor,
173
1, // lock-step with the publisher, for now
174
onNextThrowHandler);
175
CompletableFuture<?> cf = sink(publisher,
176
delayMillis,
177
numBuffers * bufferSize,
178
requestAmount,
179
maxBufferSize,
180
minBufferSize);
181
source(publisher, numBuffers, bufferSize);
182
publisher.close();
183
cf.join();
184
} finally {
185
printStamp(END, "Iteration %d\n", i);
186
}
187
}
188
189
assertFalse(onNextThrew, "Unexpected onNextThrew, check output");
190
191
out.println("OK");
192
} finally {
193
executor.shutdown();
194
}
195
}
196
197
static long accumulatedDataSize(List<ByteBuffer> bufs) {
198
return bufs.stream().mapToLong(ByteBuffer::remaining).sum();
199
}
200
201
/** Returns a new BB with its contents set to monotonically increasing
202
* values, staring at the given start index and wrapping every 100. */
203
static ByteBuffer allocateBuffer(int size, int startIdx) {
204
ByteBuffer b = ByteBuffer.allocate(size);
205
for (int i=0; i<size; i++)
206
b.put((byte)((startIdx + i) % 100));
207
b.position(0);
208
return b;
209
}
210
211
static class TestSubscriber implements BodySubscriber<Integer> {
212
final int delayMillis;
213
final int bufferSize;
214
final int expectedTotalSize;
215
final long requestAmount;
216
final CompletableFuture<Integer> completion;
217
final Executor delayedExecutor;
218
volatile Flow.Subscription subscription;
219
220
TestSubscriber(int bufferSize,
221
int delayMillis,
222
int expectedTotalSize,
223
long requestAmount) {
224
this.bufferSize = bufferSize;
225
this.completion = new CompletableFuture<>();
226
this.delayMillis = delayMillis;
227
this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
228
this.expectedTotalSize = expectedTotalSize;
229
this.requestAmount = requestAmount;
230
}
231
232
/**
233
* Example of a factory method which would decorate a buffering
234
* subscriber to create a new subscriber dependent on buffering capability.
235
* <p>
236
* The integer type parameter simulates the body just by counting the
237
* number of bytes in the body.
238
*/
239
static BodySubscriber<Integer> createSubscriber(int bufferSize,
240
int delay,
241
int expectedTotalSize,
242
long requestAmount) {
243
TestSubscriber s = new TestSubscriber(bufferSize,
244
delay,
245
expectedTotalSize,
246
requestAmount);
247
return BodySubscribers.buffering(s, bufferSize);
248
}
249
250
private void requestMore() {
251
subscription.request(requestAmount);
252
}
253
254
@Override
255
public void onSubscribe(Subscription subscription) {
256
assertNull(this.subscription);
257
this.subscription = subscription;
258
if (delayMillis > 0)
259
delayedExecutor.execute(this::requestMore);
260
else
261
requestMore();
262
}
263
264
volatile int wrongSizes;
265
volatile int totalBytesReceived;
266
volatile int onNextInvocations;
267
volatile long lastSeenSize = -1;
268
volatile boolean noMoreOnNext; // false
269
volatile int index; // 0
270
volatile long count;
271
272
@Override
273
public void onNext(List<ByteBuffer> items) {
274
try {
275
long sz = accumulatedDataSize(items);
276
boolean printStamp = delayMillis > 0
277
&& requestAmount < Long.MAX_VALUE
278
&& count % 20 == 0;
279
if (printStamp) {
280
printStamp("stamp", "count=%d sz=%d accumulated=%d",
281
count, sz, (totalBytesReceived + sz));
282
}
283
count++;
284
onNextInvocations++;
285
assertNotEquals(sz, 0L, "Unexpected empty buffers");
286
items.stream().forEach(b -> assertEquals(b.position(), 0));
287
assertFalse(noMoreOnNext);
288
289
if (sz != bufferSize) {
290
String msg = sz + ", should be less than bufferSize, " + bufferSize;
291
assertTrue(sz < bufferSize, msg);
292
assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
293
noMoreOnNext = true;
294
wrongSizes++;
295
printStamp("onNext",
296
"Possibly received last buffer: sz=%d, accumulated=%d, total=%d",
297
sz, totalBytesReceived, totalBytesReceived + sz);
298
} else {
299
assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
300
}
301
lastSeenSize = sz;
302
303
// Ensure expected contents
304
for (ByteBuffer b : items) {
305
while (b.hasRemaining()) {
306
assertEquals(b.get(), (byte) (index % 100));
307
index++;
308
}
309
}
310
311
totalBytesReceived += sz;
312
assertEquals(totalBytesReceived, index);
313
if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))
314
delayedExecutor.execute(this::requestMore);
315
else
316
requestMore();
317
} catch (Throwable t) {
318
completion.completeExceptionally(t);
319
}
320
}
321
322
@Override
323
public void onError(Throwable throwable) {
324
completion.completeExceptionally(throwable);
325
}
326
327
@Override
328
public void onComplete() {
329
if (wrongSizes > 1) { // allow just the final item to be smaller
330
String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
331
completion.completeExceptionally(new Throwable(msg));
332
}
333
if (totalBytesReceived != expectedTotalSize) {
334
String msg = "Wrong number of bytes. [" + this + "]";
335
completion.completeExceptionally(new Throwable(msg));
336
} else {
337
completion.complete(totalBytesReceived);
338
}
339
}
340
341
@Override
342
public CompletionStage<Integer> getBody() {
343
return completion;
344
}
345
346
@Override
347
public String toString() {
348
StringBuilder sb = new StringBuilder();
349
sb.append(super.toString());
350
sb.append(", bufferSize=").append(bufferSize);
351
sb.append(", onNextInvocations=").append(onNextInvocations);
352
sb.append(", totalBytesReceived=").append(totalBytesReceived);
353
sb.append(", expectedTotalSize=").append(expectedTotalSize);
354
sb.append(", requestAmount=").append(requestAmount);
355
sb.append(", lastSeenSize=").append(lastSeenSize);
356
sb.append(", wrongSizes=").append(wrongSizes);
357
sb.append(", index=").append(index);
358
return sb.toString();
359
}
360
}
361
362
/**
363
* Publishes data, through the given publisher, using the main thread.
364
*
365
* Note: The executor supplied when creating the SubmissionPublisher provides
366
* the threads for executing the Subscribers.
367
*
368
* @param publisher the publisher
369
* @param numBuffers the number of buffers to send ( before splitting in two )
370
* @param bufferSize the total size of the data to send ( before splitting in two )
371
*/
372
static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
373
int numBuffers,
374
int bufferSize) {
375
printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize);
376
int index = 0;
377
for (int i=0; i<numBuffers; i++) {
378
int chunkSize = random.nextInt(bufferSize);
379
ByteBuffer buf1 = allocateBuffer(chunkSize, index);
380
index += chunkSize;
381
ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
382
index += bufferSize - chunkSize;
383
publisher.submit(List.of(buf1, buf2));
384
}
385
printStamp("source", "complete");
386
}
387
388
/**
389
* Creates and subscribes Subscribers that receive data from the given
390
* publisher.
391
*
392
* @param publisher the publisher
393
* @param delayMillis time, in milliseconds, to delay the Subscription
394
* requesting more bytes ( for simulating slow consumption )
395
* @param expectedTotalSize the total number of bytes expected to be received
396
* by the subscribers
397
* @return a CompletableFuture which completes when the subscription is complete
398
*/
399
static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
400
int delayMillis,
401
int expectedTotalSize,
402
long requestAmount,
403
int maxBufferSize,
404
int minBufferSize) {
405
int bufferSize = chooseBufferSize(maxBufferSize,
406
minBufferSize,
407
delayMillis,
408
expectedTotalSize,
409
requestAmount);
410
assert bufferSize > 0;
411
assert bufferSize >= minBufferSize;
412
assert bufferSize <= maxBufferSize;
413
BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
414
delayMillis,
415
expectedTotalSize,
416
requestAmount);
417
publisher.subscribe(sub);
418
printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize);
419
out.printf("Subscription delay is %d msec\n", delayMillis);
420
long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount;
421
out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000);
422
out.printf("Request amount is %d items\n", requestAmount);
423
return sub.getBody().toCompletableFuture();
424
}
425
426
static int chooseBufferSize(int maxBufferSize,
427
int minBufferSize,
428
int delaysMillis,
429
int expectedTotalSize,
430
long requestAmount) {
431
assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
432
int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :
433
(random.nextInt(maxBufferSize - minBufferSize)
434
+ minBufferSize);
435
if (requestAmount == Long.MAX_VALUE) return bufferSize;
436
long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
437
/ requestAmount;
438
long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize)
439
/ requestAmount;
440
// if the maximum delay is < 10s just take a random number between min and max.
441
if (maxDelay <= LOWER_THRESHOLD) {
442
return bufferSize;
443
}
444
// if minimum delay is greater than 20s then print a warning and use max buffer.
445
if (minDelay >= UPPER_THRESHOLD) {
446
System.out.println("Warning: minimum delay is "
447
+ minDelay/1000 + "sec " + minDelay%1000 + "ms");
448
System.err.println("Warning: minimum delay is "
449
+ minDelay/1000 + "sec " + minDelay%1000 + "ms");
450
return maxBufferSize;
451
}
452
// maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD
453
// try to pick up a buffer size that keeps the delay below the
454
// UPPER_THRESHOLD
455
while (minBufferSize < maxBufferSize) {
456
bufferSize = random.nextInt(maxBufferSize - minBufferSize)
457
+ minBufferSize;
458
long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize)
459
/ requestAmount;
460
if (delay < UPPER_THRESHOLD) return bufferSize;
461
minBufferSize++;
462
}
463
return minBufferSize;
464
}
465
466
// ---
467
468
/* Main entry point for standalone testing of the main functional test. */
469
public static void main(String... args) {
470
BufferingSubscriberTest t = new BufferingSubscriberTest();
471
for (Object[] objs : t.config())
472
t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
473
}
474
}
475
476