Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/net/httpclient/AggregateRequestBodyTest.java
41152 views
1
/*
2
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/*
25
* @test
26
* @bug 8252374
27
* @library /test/lib http2/server
28
* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
29
* ReferenceTracker AggregateRequestBodyTest
30
* @modules java.base/sun.net.www.http
31
* java.net.http/jdk.internal.net.http.common
32
* java.net.http/jdk.internal.net.http.frame
33
* java.net.http/jdk.internal.net.http.hpack
34
* @run testng/othervm -Djdk.internal.httpclient.debug=true
35
* -Djdk.httpclient.HttpClient.log=requests,responses,errors
36
* AggregateRequestBodyTest
37
* @summary Tests HttpRequest.BodyPublishers::concat
38
*/
39
40
import java.net.InetAddress;
41
import java.net.InetSocketAddress;
42
import java.net.URI;
43
import java.net.http.HttpClient;
44
import java.net.http.HttpRequest;
45
import java.net.http.HttpRequest.BodyPublisher;
46
import java.net.http.HttpRequest.BodyPublishers;
47
import java.net.http.HttpResponse;
48
import java.net.http.HttpResponse.BodyHandlers;
49
import java.nio.ByteBuffer;
50
import java.util.LinkedHashMap;
51
import java.util.List;
52
import java.util.Map;
53
import java.util.concurrent.CompletableFuture;
54
import java.util.concurrent.CompletionException;
55
import java.util.concurrent.ConcurrentHashMap;
56
import java.util.concurrent.ConcurrentLinkedDeque;
57
import java.util.concurrent.ConcurrentMap;
58
import java.util.concurrent.Executor;
59
import java.util.concurrent.Executors;
60
import java.util.concurrent.Flow;
61
import java.util.concurrent.Flow.Subscriber;
62
import java.util.concurrent.Flow.Subscription;
63
import java.util.concurrent.TimeUnit;
64
import java.util.concurrent.TimeoutException;
65
import java.util.concurrent.atomic.AtomicLong;
66
import java.util.function.Consumer;
67
import java.util.function.Supplier;
68
import java.util.stream.Collectors;
69
import java.util.stream.LongStream;
70
import java.util.stream.Stream;
71
import javax.net.ssl.SSLContext;
72
73
import com.sun.net.httpserver.HttpServer;
74
import com.sun.net.httpserver.HttpsConfigurator;
75
import com.sun.net.httpserver.HttpsServer;
76
import jdk.test.lib.net.SimpleSSLContext;
77
import org.testng.Assert;
78
import org.testng.ITestContext;
79
import org.testng.annotations.AfterClass;
80
import org.testng.annotations.AfterTest;
81
import org.testng.annotations.BeforeMethod;
82
import org.testng.annotations.BeforeTest;
83
import org.testng.annotations.DataProvider;
84
import org.testng.annotations.Test;
85
86
import static java.lang.System.out;
87
import static org.testng.Assert.assertEquals;
88
import static org.testng.Assert.assertFalse;
89
import static org.testng.Assert.assertTrue;
90
import static org.testng.Assert.expectThrows;
91
92
public class AggregateRequestBodyTest implements HttpServerAdapters {
93
94
SSLContext sslContext;
95
HttpTestServer http1TestServer; // HTTP/1.1 ( http )
96
HttpTestServer https1TestServer; // HTTPS/1.1 ( https )
97
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
98
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
99
String http1URI;
100
String https1URI;
101
String http2URI;
102
String https2URI;
103
104
static final int RESPONSE_CODE = 200;
105
static final int ITERATION_COUNT = 4;
106
static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
107
static final Class<CompletionException> CE = CompletionException.class;
108
// a shared executor helps reduce the amount of threads created by the test
109
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
110
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
111
static volatile boolean tasksFailed;
112
static final AtomicLong serverCount = new AtomicLong();
113
static final AtomicLong clientCount = new AtomicLong();
114
static final long start = System.nanoTime();
115
public static String now() {
116
long now = System.nanoTime() - start;
117
long secs = now / 1000_000_000;
118
long mill = (now % 1000_000_000) / 1000_000;
119
long nan = now % 1000_000;
120
return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
121
}
122
123
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
124
private volatile HttpClient sharedClient;
125
126
static class TestExecutor implements Executor {
127
final AtomicLong tasks = new AtomicLong();
128
Executor executor;
129
TestExecutor(Executor executor) {
130
this.executor = executor;
131
}
132
133
@Override
134
public void execute(Runnable command) {
135
long id = tasks.incrementAndGet();
136
executor.execute(() -> {
137
try {
138
command.run();
139
} catch (Throwable t) {
140
tasksFailed = true;
141
System.out.printf(now() + "Task %s failed: %s%n", id, t);
142
System.err.printf(now() + "Task %s failed: %s%n", id, t);
143
FAILURES.putIfAbsent("Task " + id, t);
144
throw t;
145
}
146
});
147
}
148
}
149
150
protected boolean stopAfterFirstFailure() {
151
return Boolean.getBoolean("jdk.internal.httpclient.debug");
152
}
153
154
@BeforeMethod
155
void beforeMethod(ITestContext context) {
156
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
157
throw new RuntimeException("some tests failed");
158
}
159
}
160
161
@AfterClass
162
static final void printFailedTests() {
163
out.println("\n=========================");
164
try {
165
out.printf("%n%sCreated %d servers and %d clients%n",
166
now(), serverCount.get(), clientCount.get());
167
if (FAILURES.isEmpty()) return;
168
out.println("Failed tests: ");
169
FAILURES.entrySet().forEach((e) -> {
170
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
171
e.getValue().printStackTrace(out);
172
e.getValue().printStackTrace();
173
});
174
if (tasksFailed) {
175
System.out.println("WARNING: Some tasks failed");
176
}
177
} finally {
178
out.println("\n=========================\n");
179
}
180
}
181
182
private String[] uris() {
183
return new String[] {
184
http1URI,
185
https1URI,
186
http2URI,
187
https2URI,
188
};
189
}
190
191
static AtomicLong URICOUNT = new AtomicLong();
192
193
@DataProvider(name = "variants")
194
public Object[][] variants(ITestContext context) {
195
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
196
return new Object[0][];
197
}
198
String[] uris = uris();
199
Object[][] result = new Object[uris.length * 2][];
200
int i = 0;
201
for (boolean sameClient : List.of(false, true)) {
202
for (String uri : uris()) {
203
result[i++] = new Object[]{uri, sameClient};
204
}
205
}
206
assert i == uris.length * 2;
207
return result;
208
}
209
210
private HttpClient makeNewClient() {
211
clientCount.incrementAndGet();
212
HttpClient client = HttpClient.newBuilder()
213
.proxy(HttpClient.Builder.NO_PROXY)
214
.executor(executor)
215
.sslContext(sslContext)
216
.build();
217
return TRACKER.track(client);
218
}
219
220
HttpClient newHttpClient(boolean share) {
221
if (!share) return makeNewClient();
222
HttpClient shared = sharedClient;
223
if (shared != null) return shared;
224
synchronized (this) {
225
shared = sharedClient;
226
if (shared == null) {
227
shared = sharedClient = makeNewClient();
228
}
229
return shared;
230
}
231
}
232
233
static final List<String> BODIES = List.of(
234
"Lorem ipsum",
235
"dolor sit amet",
236
"consectetur adipiscing elit, sed do eiusmod tempor",
237
"quis nostrud exercitation ullamco",
238
"laboris nisi",
239
"ut",
240
"aliquip ex ea commodo consequat." +
241
"Duis aute irure dolor in reprehenderit in voluptate velit esse" +
242
"cillum dolore eu fugiat nulla pariatur.",
243
"Excepteur sint occaecat cupidatat non proident."
244
);
245
246
static BodyPublisher[] publishers(String... content) {
247
if (content == null) return null;
248
BodyPublisher[] result = new BodyPublisher[content.length];
249
for (int i=0; i < content.length ; i++) {
250
result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);
251
}
252
return result;
253
}
254
255
static String[] strings(String... s) {
256
return s;
257
}
258
259
@DataProvider(name = "sparseContent")
260
Object[][] nulls() {
261
return new Object[][] {
262
{"null array", null},
263
{"null element", strings((String)null)},
264
{"null first element", strings(null, "one")},
265
{"null second element", strings( "one", null)},
266
{"null third element", strings( "one", "two", null)},
267
{"null fourth element", strings( "one", "two", "three", null)},
268
{"null random element", strings( "one", "two", "three", null, "five")},
269
};
270
}
271
272
static List<Long> lengths(long... lengths) {
273
return LongStream.of(lengths)
274
.mapToObj(Long::valueOf)
275
.collect(Collectors.toList());
276
}
277
278
@DataProvider(name = "contentLengths")
279
Object[][] contentLengths() {
280
return new Object[][] {
281
{-1, lengths(-1)},
282
{-42, lengths(-42)},
283
{42, lengths(42)},
284
{42, lengths(10, 0, 20, 0, 12)},
285
{-1, lengths(10, 0, 20, -1, 12)},
286
{-1, lengths(-1, 0, 20, 10, 12)},
287
{-1, lengths(10, 0, 20, 12, -1)},
288
{-1, lengths(10, 0, 20, -10, 12)},
289
{-1, lengths(-10, 0, 20, 10, 12)},
290
{-1, lengths(10, 0, 20, 12, -10)},
291
{-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},
292
{-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},
293
{-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},
294
{Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},
295
{-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},
296
{-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},
297
{-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},
298
{-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},
299
{-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},
300
{-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
301
{-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
302
{-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}
303
};
304
}
305
306
@DataProvider(name="negativeRequests")
307
Object[][] negativeRequests() {
308
return new Object[][] {
309
{0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}
310
};
311
}
312
313
314
static class ContentLengthPublisher implements BodyPublisher {
315
final long length;
316
ContentLengthPublisher(long length) {
317
this.length = length;
318
}
319
@Override
320
public long contentLength() {
321
return length;
322
}
323
324
@Override
325
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
326
}
327
328
static ContentLengthPublisher[] of(List<Long> lengths) {
329
return lengths.stream()
330
.map(ContentLengthPublisher::new)
331
.toArray(ContentLengthPublisher[]::new);
332
}
333
}
334
335
/**
336
* A dummy publisher that allows to call onError on its subscriber (or not...).
337
*/
338
static class PublishWithError implements BodyPublisher {
339
final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();
340
final long length;
341
final List<String> content;
342
final int errorAt;
343
final Supplier<? extends Throwable> errorSupplier;
344
PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {
345
this.content = content;
346
this.errorAt = errorAt;
347
this.errorSupplier = supplier;
348
length = content.stream().mapToInt(String::length).sum();
349
}
350
351
boolean hasErrors() {
352
return errorAt < content.size();
353
}
354
355
@Override
356
public long contentLength() {
357
return length;
358
}
359
360
@Override
361
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
362
ErrorSubscription subscription = new ErrorSubscription(subscriber);
363
subscribers.put(subscriber, subscription);
364
subscriber.onSubscribe(subscription);
365
}
366
367
class ErrorSubscription implements Flow.Subscription {
368
volatile boolean cancelled;
369
volatile int at;
370
final Subscriber<? super ByteBuffer> subscriber;
371
ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {
372
this.subscriber = subscriber;
373
}
374
@Override
375
public void request(long n) {
376
while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {
377
if (at++ == errorAt) {
378
subscriber.onError(errorSupplier.get());
379
return;
380
} else if (at <= content.size()){
381
subscriber.onNext(ByteBuffer.wrap(
382
content.get(at-1).getBytes()));
383
if (at == content.size()) {
384
subscriber.onComplete();
385
return;
386
}
387
}
388
}
389
}
390
391
@Override
392
public void cancel() {
393
cancelled = true;
394
}
395
}
396
}
397
398
static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
399
CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
400
ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
401
CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
402
403
@Override
404
public void onSubscribe(Subscription subscription) {
405
this.subscriptionCF.complete(subscription);
406
}
407
408
@Override
409
public void onNext(ByteBuffer item) {
410
items.addLast(item);
411
}
412
413
@Override
414
public void onError(Throwable throwable) {
415
resultCF.completeExceptionally(throwable);
416
}
417
418
@Override
419
public void onComplete() {
420
resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));
421
}
422
423
CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }
424
}
425
426
static String stringFromBuffer(ByteBuffer buffer) {
427
byte[] bytes = new byte[buffer.remaining()];
428
buffer.get(bytes);
429
return new String(bytes);
430
}
431
432
String stringFromBytes(Stream<ByteBuffer> buffers) {
433
return buffers.map(AggregateRequestBodyTest::stringFromBuffer)
434
.collect(Collectors.joining());
435
}
436
437
static PublishWithError withNoError(String content) {
438
return new PublishWithError(List.of(content), 1,
439
() -> new AssertionError("Should not happen!"));
440
}
441
442
static PublishWithError withNoError(List<String> content) {
443
return new PublishWithError(content, content.size(),
444
() -> new AssertionError("Should not happen!"));
445
}
446
447
@Test(dataProvider = "sparseContent") // checks that NPE is thrown
448
public void testNullPointerException(String description, String[] content) {
449
BodyPublisher[] publishers = publishers(content);
450
Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));
451
}
452
453
// Verifies that an empty array creates a "noBody" publisher
454
@Test
455
public void testEmpty() {
456
BodyPublisher publisher = BodyPublishers.concat();
457
RequestSubscriber subscriber = new RequestSubscriber();
458
assertEquals(publisher.contentLength(), 0);
459
publisher.subscribe(subscriber);
460
subscriber.subscriptionCF.thenAccept(s -> s.request(1));
461
List<ByteBuffer> result = subscriber.resultCF.join();
462
assertEquals(result, List.of());
463
assertTrue(subscriber.items.isEmpty());;
464
}
465
466
// verifies that error emitted by upstream publishers are propagated downstream.
467
@Test(dataProvider = "sparseContent") // nulls are replaced with error publisher
468
public void testOnError(String description, String[] content) {
469
final RequestSubscriber subscriber = new RequestSubscriber();
470
final PublishWithError errorPublisher;
471
final BodyPublisher[] publishers;
472
String result = BODIES.stream().collect(Collectors.joining());
473
if (content == null) {
474
content = List.of(result).toArray(String[]::new);
475
errorPublisher = new PublishWithError(BODIES, BODIES.size(),
476
() -> new AssertionError("Unexpected!!"));
477
publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);
478
description = "No error";
479
} else {
480
publishers = publishers(content);
481
description = description.replace("null", "error at");
482
errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));
483
}
484
result = "";
485
boolean hasErrors = false;
486
for (int i=0; i < content.length; i++) {
487
if (content[i] == null) {
488
publishers[i] = errorPublisher;
489
if (hasErrors) continue;
490
if (!errorPublisher.hasErrors()) {
491
result = result + errorPublisher
492
.content.stream().collect(Collectors.joining());
493
} else {
494
result = result + errorPublisher.content
495
.stream().limit(errorPublisher.errorAt)
496
.collect(Collectors.joining());
497
result = result + "<error>";
498
hasErrors = true;
499
}
500
} else if (!hasErrors) {
501
result = result + content[i];
502
}
503
}
504
BodyPublisher publisher = BodyPublishers.concat(publishers);
505
publisher.subscribe(subscriber);
506
subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));
507
if (errorPublisher.hasErrors()) {
508
CompletionException ce = expectThrows(CompletionException.class,
509
() -> subscriber.resultCF.join());
510
out.println(description + ": got expected " + ce);
511
assertEquals(ce.getCause().getClass(), Exception.class);
512
assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);
513
} else {
514
assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);
515
out.println(description + ": got expected result: " + result);
516
}
517
}
518
519
// Verifies that if an upstream publisher has an unknown length, the
520
// aggregate publisher will have an unknown length as well. Otherwise
521
// the length should be known.
522
@Test(dataProvider = "sparseContent") // nulls are replaced with unknown length
523
public void testUnknownContentLength(String description, String[] content) {
524
if (content == null) {
525
content = BODIES.toArray(String[]::new);
526
description = "BODIES (known length)";
527
} else {
528
description = description.replace("null", "length(-1)");
529
}
530
BodyPublisher[] publishers = publishers(content);
531
BodyPublisher nolength = new BodyPublisher() {
532
final BodyPublisher missing = BodyPublishers.ofString("missing");
533
@Override
534
public long contentLength() { return -1; }
535
@Override
536
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
537
missing.subscribe(subscriber);
538
}
539
};
540
long length = 0;
541
for (int i=0; i < content.length; i++) {
542
if (content[i] == null) {
543
publishers[i] = nolength;
544
length = -1;
545
} else if (length >= 0) {
546
length += content[i].length();
547
}
548
}
549
out.printf("testUnknownContentLength(%s): %d%n", description, length);
550
BodyPublisher publisher = BodyPublishers.concat(publishers);
551
assertEquals(publisher.contentLength(), length,
552
description.replace("null", "length(-1)"));
553
}
554
555
private static final Throwable completionCause(CompletionException x) {
556
while (x.getCause() instanceof CompletionException) {
557
x = (CompletionException)x.getCause();
558
}
559
return x.getCause();
560
}
561
562
@Test(dataProvider = "negativeRequests")
563
public void testNegativeRequest(long n) {
564
assert n <= 0 : "test for negative request called with n > 0 : " + n;
565
BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));
566
BodyPublisher publisher = BodyPublishers.concat(publishers);
567
RequestSubscriber subscriber = new RequestSubscriber();
568
publisher.subscribe(subscriber);
569
Subscription subscription = subscriber.subscriptionCF.join();
570
subscription.request(n);
571
CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());
572
Throwable cause = completionCause(expected);
573
if (cause instanceof IllegalArgumentException) {
574
System.out.printf("Got expected IAE for %d: %s%n", n, cause);
575
} else {
576
throw new AssertionError("Unexpected exception: " + cause,
577
(cause == null) ? expected : cause);
578
}
579
}
580
581
static BodyPublisher[] ofStrings(String... strings) {
582
return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);
583
}
584
585
@Test
586
public void testPositiveRequests() {
587
// A composite array of publishers
588
BodyPublisher[] publishers = Stream.of(
589
Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),
590
Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),
591
Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),
592
Stream.of(ofStrings(" ")),
593
Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))
594
).flatMap((s) -> s).toArray(BodyPublisher[]::new);
595
BodyPublisher publisher = BodyPublishers.concat(publishers);
596
597
// Test that we can request all 13 items in a single request call.
598
RequestSubscriber requestSubscriber1 = new RequestSubscriber();
599
publisher.subscribe(requestSubscriber1);
600
Subscription subscription1 = requestSubscriber1.subscriptionCF.join();
601
subscription1.request(16);
602
assertTrue(requestSubscriber1.resultCF().isDone());
603
List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();
604
String result1 = stringFromBytes(list1.stream());
605
assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
606
System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));
607
608
// Test that we can split our requests call any which way we want
609
// (whether in the 'middle of a publisher' or at the boundaries.
610
RequestSubscriber requestSubscriber2 = new RequestSubscriber();
611
publisher.subscribe(requestSubscriber2);
612
Subscription subscription2 = requestSubscriber2.subscriptionCF.join();
613
subscription2.request(1);
614
assertFalse(requestSubscriber2.resultCF().isDone());
615
subscription2.request(10);
616
assertFalse(requestSubscriber2.resultCF().isDone());
617
subscription2.request(4);
618
assertFalse(requestSubscriber2.resultCF().isDone());
619
subscription2.request(1);
620
assertTrue(requestSubscriber2.resultCF().isDone());
621
List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();
622
String result2 = stringFromBytes(list2.stream());
623
assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
624
System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));
625
}
626
627
@Test(dataProvider = "contentLengths")
628
public void testContentLength(long expected, List<Long> lengths) {
629
BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);
630
BodyPublisher aggregate = BodyPublishers.concat(publishers);
631
assertEquals(aggregate.contentLength(), expected,
632
"Unexpected result for %s".formatted(lengths));
633
}
634
635
// Verifies that cancelling the subscription ensure that downstream
636
// publishers are no longer subscribed etc...
637
@Test
638
public void testCancel() {
639
BodyPublisher[] publishers = BODIES.stream()
640
.map(BodyPublishers::ofString)
641
.toArray(BodyPublisher[]::new);
642
BodyPublisher publisher = BodyPublishers.concat(publishers);
643
644
assertEquals(publisher.contentLength(),
645
BODIES.stream().mapToInt(String::length).sum());
646
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
647
648
for (int n=0; n < BODIES.size(); n++) {
649
650
String description = String.format(
651
"cancel after %d/%d onNext() invocations",
652
n, BODIES.size());
653
RequestSubscriber subscriber = new RequestSubscriber();
654
publisher.subscribe(subscriber);
655
Subscription subscription = subscriber.subscriptionCF.join();
656
subscribers.put(subscriber, description);
657
658
// receive half the data
659
for (int i = 0; i < n; i++) {
660
subscription.request(1);
661
ByteBuffer buffer = subscriber.items.pop();
662
}
663
664
// cancel subscription
665
subscription.cancel();
666
// request the rest...
667
subscription.request(Long.MAX_VALUE);
668
}
669
670
CompletableFuture[] results = subscribers.keySet()
671
.stream().map(RequestSubscriber::resultCF)
672
.toArray(CompletableFuture[]::new);
673
CompletableFuture<?> any = CompletableFuture.anyOf(results);
674
675
// subscription was cancelled, so nothing should be received...
676
try {
677
TimeoutException x = Assert.expectThrows(TimeoutException.class,
678
() -> any.get(5, TimeUnit.SECONDS));
679
out.println("Got expected " + x);
680
} finally {
681
subscribers.keySet().stream()
682
.filter(rs -> rs.resultCF.isDone())
683
.forEach(rs -> System.err.printf(
684
"Failed: %s completed with %s",
685
subscribers.get(rs), rs.resultCF));
686
}
687
Consumer<RequestSubscriber> check = (rs) -> {
688
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
689
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
690
out.println(subscribers.get(rs) + ": PASSED");
691
};
692
subscribers.keySet().stream().forEach(check);
693
}
694
695
// Verifies that cancelling the subscription is propagated downstream
696
@Test
697
public void testCancelSubscription() {
698
PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),
699
() -> new AssertionError("should not come here"));
700
BodyPublisher publisher = BodyPublishers.concat(upstream);
701
702
assertEquals(publisher.contentLength(),
703
BODIES.stream().mapToInt(String::length).sum());
704
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
705
706
for (int n=0; n < BODIES.size(); n++) {
707
708
String description = String.format(
709
"cancel after %d/%d onNext() invocations",
710
n, BODIES.size());
711
RequestSubscriber subscriber = new RequestSubscriber();
712
publisher.subscribe(subscriber);
713
Subscription subscription = subscriber.subscriptionCF.join();
714
subscribers.put(subscriber, description);
715
716
// receive half the data
717
for (int i = 0; i < n; i++) {
718
subscription.request(1);
719
ByteBuffer buffer = subscriber.items.pop();
720
}
721
722
// cancel subscription
723
subscription.cancel();
724
// request the rest...
725
subscription.request(Long.MAX_VALUE);
726
assertTrue(upstream.subscribers.get(subscriber).cancelled,
727
description + " upstream subscription not cancelled");
728
out.println(description + " upstream subscription was properly cancelled");
729
}
730
731
CompletableFuture[] results = subscribers.keySet()
732
.stream().map(RequestSubscriber::resultCF)
733
.toArray(CompletableFuture[]::new);
734
CompletableFuture<?> any = CompletableFuture.anyOf(results);
735
736
// subscription was cancelled, so nothing should be received...
737
try {
738
TimeoutException x = Assert.expectThrows(TimeoutException.class,
739
() -> any.get(5, TimeUnit.SECONDS));
740
out.println("Got expected " + x);
741
} finally {
742
subscribers.keySet().stream()
743
.filter(rs -> rs.resultCF.isDone())
744
.forEach(rs -> System.err.printf(
745
"Failed: %s completed with %s",
746
subscribers.get(rs), rs.resultCF));
747
}
748
Consumer<RequestSubscriber> check = (rs) -> {
749
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
750
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
751
out.println(subscribers.get(rs) + ": PASSED");
752
};
753
subscribers.keySet().stream().forEach(check);
754
755
}
756
757
@Test(dataProvider = "variants")
758
public void test(String uri, boolean sameClient) throws Exception {
759
System.out.println("Request to " + uri);
760
761
HttpClient client = newHttpClient(sameClient);
762
763
BodyPublisher publisher = BodyPublishers.concat(
764
BODIES.stream()
765
.map(BodyPublishers::ofString)
766
.toArray(HttpRequest.BodyPublisher[]::new)
767
);
768
HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
769
.POST(publisher)
770
.build();
771
for (int i = 0; i < ITERATION_COUNT; i++) {
772
System.out.println("Iteration: " + i);
773
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
774
int expectedResponse = RESPONSE_CODE;
775
if (response.statusCode() != expectedResponse)
776
throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));
777
assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));
778
}
779
System.out.println("test: DONE");
780
}
781
782
@BeforeTest
783
public void setup() throws Exception {
784
sslContext = new SimpleSSLContext().get();
785
if (sslContext == null)
786
throw new AssertionError("Unexpected null sslContext");
787
788
HttpTestHandler handler = new HttpTestEchoHandler();
789
InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
790
791
HttpServer http1 = HttpServer.create(loopback, 0);
792
http1TestServer = HttpTestServer.of(http1);
793
http1TestServer.addHandler(handler, "/http1/echo/");
794
http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";
795
796
HttpsServer https1 = HttpsServer.create(loopback, 0);
797
https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));
798
https1TestServer = HttpTestServer.of(https1);
799
https1TestServer.addHandler(handler, "/https1/echo/");
800
https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";
801
802
// HTTP/2
803
http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
804
http2TestServer.addHandler(handler, "/http2/echo/");
805
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";
806
807
https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
808
https2TestServer.addHandler(handler, "/https2/echo/");
809
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";
810
811
serverCount.addAndGet(4);
812
http1TestServer.start();
813
https1TestServer.start();
814
http2TestServer.start();
815
https2TestServer.start();
816
}
817
818
@AfterTest
819
public void teardown() throws Exception {
820
String sharedClientName =
821
sharedClient == null ? null : sharedClient.toString();
822
sharedClient = null;
823
Thread.sleep(100);
824
AssertionError fail = TRACKER.check(500);
825
try {
826
http1TestServer.stop();
827
https1TestServer.stop();
828
http2TestServer.stop();
829
https2TestServer.stop();
830
} finally {
831
if (fail != null) {
832
if (sharedClientName != null) {
833
System.err.println("Shared client name is: " + sharedClientName);
834
}
835
throw fail;
836
}
837
}
838
}
839
}
840
841