Path: blob/master/test/jdk/java/net/httpclient/AggregateRequestBodyTest.java
41152 views
/*1* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24* @test25* @bug 825237426* @library /test/lib http2/server27* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters28* ReferenceTracker AggregateRequestBodyTest29* @modules java.base/sun.net.www.http30* java.net.http/jdk.internal.net.http.common31* java.net.http/jdk.internal.net.http.frame32* java.net.http/jdk.internal.net.http.hpack33* @run testng/othervm -Djdk.internal.httpclient.debug=true34* -Djdk.httpclient.HttpClient.log=requests,responses,errors35* AggregateRequestBodyTest36* @summary Tests HttpRequest.BodyPublishers::concat37*/3839import java.net.InetAddress;40import java.net.InetSocketAddress;41import java.net.URI;42import java.net.http.HttpClient;43import java.net.http.HttpRequest;44import java.net.http.HttpRequest.BodyPublisher;45import java.net.http.HttpRequest.BodyPublishers;46import java.net.http.HttpResponse;47import java.net.http.HttpResponse.BodyHandlers;48import java.nio.ByteBuffer;49import java.util.LinkedHashMap;50import java.util.List;51import java.util.Map;52import java.util.concurrent.CompletableFuture;53import java.util.concurrent.CompletionException;54import java.util.concurrent.ConcurrentHashMap;55import java.util.concurrent.ConcurrentLinkedDeque;56import java.util.concurrent.ConcurrentMap;57import java.util.concurrent.Executor;58import java.util.concurrent.Executors;59import java.util.concurrent.Flow;60import java.util.concurrent.Flow.Subscriber;61import java.util.concurrent.Flow.Subscription;62import java.util.concurrent.TimeUnit;63import java.util.concurrent.TimeoutException;64import java.util.concurrent.atomic.AtomicLong;65import java.util.function.Consumer;66import java.util.function.Supplier;67import java.util.stream.Collectors;68import java.util.stream.LongStream;69import java.util.stream.Stream;70import javax.net.ssl.SSLContext;7172import com.sun.net.httpserver.HttpServer;73import com.sun.net.httpserver.HttpsConfigurator;74import com.sun.net.httpserver.HttpsServer;75import jdk.test.lib.net.SimpleSSLContext;76import org.testng.Assert;77import org.testng.ITestContext;78import org.testng.annotations.AfterClass;79import org.testng.annotations.AfterTest;80import org.testng.annotations.BeforeMethod;81import org.testng.annotations.BeforeTest;82import org.testng.annotations.DataProvider;83import org.testng.annotations.Test;8485import static java.lang.System.out;86import static org.testng.Assert.assertEquals;87import static org.testng.Assert.assertFalse;88import static org.testng.Assert.assertTrue;89import static org.testng.Assert.expectThrows;9091public class AggregateRequestBodyTest implements HttpServerAdapters {9293SSLContext sslContext;94HttpTestServer http1TestServer; // HTTP/1.1 ( http )95HttpTestServer https1TestServer; // HTTPS/1.1 ( https )96HttpTestServer http2TestServer; // HTTP/2 ( h2c )97HttpTestServer https2TestServer; // HTTP/2 ( h2 )98String http1URI;99String https1URI;100String http2URI;101String https2URI;102103static final int RESPONSE_CODE = 200;104static final int ITERATION_COUNT = 4;105static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;106static final Class<CompletionException> CE = CompletionException.class;107// a shared executor helps reduce the amount of threads created by the test108static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());109static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();110static volatile boolean tasksFailed;111static final AtomicLong serverCount = new AtomicLong();112static final AtomicLong clientCount = new AtomicLong();113static final long start = System.nanoTime();114public static String now() {115long now = System.nanoTime() - start;116long secs = now / 1000_000_000;117long mill = (now % 1000_000_000) / 1000_000;118long nan = now % 1000_000;119return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);120}121122final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;123private volatile HttpClient sharedClient;124125static class TestExecutor implements Executor {126final AtomicLong tasks = new AtomicLong();127Executor executor;128TestExecutor(Executor executor) {129this.executor = executor;130}131132@Override133public void execute(Runnable command) {134long id = tasks.incrementAndGet();135executor.execute(() -> {136try {137command.run();138} catch (Throwable t) {139tasksFailed = true;140System.out.printf(now() + "Task %s failed: %s%n", id, t);141System.err.printf(now() + "Task %s failed: %s%n", id, t);142FAILURES.putIfAbsent("Task " + id, t);143throw t;144}145});146}147}148149protected boolean stopAfterFirstFailure() {150return Boolean.getBoolean("jdk.internal.httpclient.debug");151}152153@BeforeMethod154void beforeMethod(ITestContext context) {155if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {156throw new RuntimeException("some tests failed");157}158}159160@AfterClass161static final void printFailedTests() {162out.println("\n=========================");163try {164out.printf("%n%sCreated %d servers and %d clients%n",165now(), serverCount.get(), clientCount.get());166if (FAILURES.isEmpty()) return;167out.println("Failed tests: ");168FAILURES.entrySet().forEach((e) -> {169out.printf("\t%s: %s%n", e.getKey(), e.getValue());170e.getValue().printStackTrace(out);171e.getValue().printStackTrace();172});173if (tasksFailed) {174System.out.println("WARNING: Some tasks failed");175}176} finally {177out.println("\n=========================\n");178}179}180181private String[] uris() {182return new String[] {183http1URI,184https1URI,185http2URI,186https2URI,187};188}189190static AtomicLong URICOUNT = new AtomicLong();191192@DataProvider(name = "variants")193public Object[][] variants(ITestContext context) {194if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {195return new Object[0][];196}197String[] uris = uris();198Object[][] result = new Object[uris.length * 2][];199int i = 0;200for (boolean sameClient : List.of(false, true)) {201for (String uri : uris()) {202result[i++] = new Object[]{uri, sameClient};203}204}205assert i == uris.length * 2;206return result;207}208209private HttpClient makeNewClient() {210clientCount.incrementAndGet();211HttpClient client = HttpClient.newBuilder()212.proxy(HttpClient.Builder.NO_PROXY)213.executor(executor)214.sslContext(sslContext)215.build();216return TRACKER.track(client);217}218219HttpClient newHttpClient(boolean share) {220if (!share) return makeNewClient();221HttpClient shared = sharedClient;222if (shared != null) return shared;223synchronized (this) {224shared = sharedClient;225if (shared == null) {226shared = sharedClient = makeNewClient();227}228return shared;229}230}231232static final List<String> BODIES = List.of(233"Lorem ipsum",234"dolor sit amet",235"consectetur adipiscing elit, sed do eiusmod tempor",236"quis nostrud exercitation ullamco",237"laboris nisi",238"ut",239"aliquip ex ea commodo consequat." +240"Duis aute irure dolor in reprehenderit in voluptate velit esse" +241"cillum dolore eu fugiat nulla pariatur.",242"Excepteur sint occaecat cupidatat non proident."243);244245static BodyPublisher[] publishers(String... content) {246if (content == null) return null;247BodyPublisher[] result = new BodyPublisher[content.length];248for (int i=0; i < content.length ; i++) {249result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);250}251return result;252}253254static String[] strings(String... s) {255return s;256}257258@DataProvider(name = "sparseContent")259Object[][] nulls() {260return new Object[][] {261{"null array", null},262{"null element", strings((String)null)},263{"null first element", strings(null, "one")},264{"null second element", strings( "one", null)},265{"null third element", strings( "one", "two", null)},266{"null fourth element", strings( "one", "two", "three", null)},267{"null random element", strings( "one", "two", "three", null, "five")},268};269}270271static List<Long> lengths(long... lengths) {272return LongStream.of(lengths)273.mapToObj(Long::valueOf)274.collect(Collectors.toList());275}276277@DataProvider(name = "contentLengths")278Object[][] contentLengths() {279return new Object[][] {280{-1, lengths(-1)},281{-42, lengths(-42)},282{42, lengths(42)},283{42, lengths(10, 0, 20, 0, 12)},284{-1, lengths(10, 0, 20, -1, 12)},285{-1, lengths(-1, 0, 20, 10, 12)},286{-1, lengths(10, 0, 20, 12, -1)},287{-1, lengths(10, 0, 20, -10, 12)},288{-1, lengths(-10, 0, 20, 10, 12)},289{-1, lengths(10, 0, 20, 12, -10)},290{-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},291{-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},292{-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},293{Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},294{-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},295{-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},296{-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},297{-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},298{-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},299{-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},300{-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},301{-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}302};303}304305@DataProvider(name="negativeRequests")306Object[][] negativeRequests() {307return new Object[][] {308{0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}309};310}311312313static class ContentLengthPublisher implements BodyPublisher {314final long length;315ContentLengthPublisher(long length) {316this.length = length;317}318@Override319public long contentLength() {320return length;321}322323@Override324public void subscribe(Subscriber<? super ByteBuffer> subscriber) {325}326327static ContentLengthPublisher[] of(List<Long> lengths) {328return lengths.stream()329.map(ContentLengthPublisher::new)330.toArray(ContentLengthPublisher[]::new);331}332}333334/**335* A dummy publisher that allows to call onError on its subscriber (or not...).336*/337static class PublishWithError implements BodyPublisher {338final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();339final long length;340final List<String> content;341final int errorAt;342final Supplier<? extends Throwable> errorSupplier;343PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {344this.content = content;345this.errorAt = errorAt;346this.errorSupplier = supplier;347length = content.stream().mapToInt(String::length).sum();348}349350boolean hasErrors() {351return errorAt < content.size();352}353354@Override355public long contentLength() {356return length;357}358359@Override360public void subscribe(Subscriber<? super ByteBuffer> subscriber) {361ErrorSubscription subscription = new ErrorSubscription(subscriber);362subscribers.put(subscriber, subscription);363subscriber.onSubscribe(subscription);364}365366class ErrorSubscription implements Flow.Subscription {367volatile boolean cancelled;368volatile int at;369final Subscriber<? super ByteBuffer> subscriber;370ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {371this.subscriber = subscriber;372}373@Override374public void request(long n) {375while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {376if (at++ == errorAt) {377subscriber.onError(errorSupplier.get());378return;379} else if (at <= content.size()){380subscriber.onNext(ByteBuffer.wrap(381content.get(at-1).getBytes()));382if (at == content.size()) {383subscriber.onComplete();384return;385}386}387}388}389390@Override391public void cancel() {392cancelled = true;393}394}395}396397static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {398CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();399ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();400CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();401402@Override403public void onSubscribe(Subscription subscription) {404this.subscriptionCF.complete(subscription);405}406407@Override408public void onNext(ByteBuffer item) {409items.addLast(item);410}411412@Override413public void onError(Throwable throwable) {414resultCF.completeExceptionally(throwable);415}416417@Override418public void onComplete() {419resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));420}421422CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }423}424425static String stringFromBuffer(ByteBuffer buffer) {426byte[] bytes = new byte[buffer.remaining()];427buffer.get(bytes);428return new String(bytes);429}430431String stringFromBytes(Stream<ByteBuffer> buffers) {432return buffers.map(AggregateRequestBodyTest::stringFromBuffer)433.collect(Collectors.joining());434}435436static PublishWithError withNoError(String content) {437return new PublishWithError(List.of(content), 1,438() -> new AssertionError("Should not happen!"));439}440441static PublishWithError withNoError(List<String> content) {442return new PublishWithError(content, content.size(),443() -> new AssertionError("Should not happen!"));444}445446@Test(dataProvider = "sparseContent") // checks that NPE is thrown447public void testNullPointerException(String description, String[] content) {448BodyPublisher[] publishers = publishers(content);449Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));450}451452// Verifies that an empty array creates a "noBody" publisher453@Test454public void testEmpty() {455BodyPublisher publisher = BodyPublishers.concat();456RequestSubscriber subscriber = new RequestSubscriber();457assertEquals(publisher.contentLength(), 0);458publisher.subscribe(subscriber);459subscriber.subscriptionCF.thenAccept(s -> s.request(1));460List<ByteBuffer> result = subscriber.resultCF.join();461assertEquals(result, List.of());462assertTrue(subscriber.items.isEmpty());;463}464465// verifies that error emitted by upstream publishers are propagated downstream.466@Test(dataProvider = "sparseContent") // nulls are replaced with error publisher467public void testOnError(String description, String[] content) {468final RequestSubscriber subscriber = new RequestSubscriber();469final PublishWithError errorPublisher;470final BodyPublisher[] publishers;471String result = BODIES.stream().collect(Collectors.joining());472if (content == null) {473content = List.of(result).toArray(String[]::new);474errorPublisher = new PublishWithError(BODIES, BODIES.size(),475() -> new AssertionError("Unexpected!!"));476publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);477description = "No error";478} else {479publishers = publishers(content);480description = description.replace("null", "error at");481errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));482}483result = "";484boolean hasErrors = false;485for (int i=0; i < content.length; i++) {486if (content[i] == null) {487publishers[i] = errorPublisher;488if (hasErrors) continue;489if (!errorPublisher.hasErrors()) {490result = result + errorPublisher491.content.stream().collect(Collectors.joining());492} else {493result = result + errorPublisher.content494.stream().limit(errorPublisher.errorAt)495.collect(Collectors.joining());496result = result + "<error>";497hasErrors = true;498}499} else if (!hasErrors) {500result = result + content[i];501}502}503BodyPublisher publisher = BodyPublishers.concat(publishers);504publisher.subscribe(subscriber);505subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));506if (errorPublisher.hasErrors()) {507CompletionException ce = expectThrows(CompletionException.class,508() -> subscriber.resultCF.join());509out.println(description + ": got expected " + ce);510assertEquals(ce.getCause().getClass(), Exception.class);511assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);512} else {513assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);514out.println(description + ": got expected result: " + result);515}516}517518// Verifies that if an upstream publisher has an unknown length, the519// aggregate publisher will have an unknown length as well. Otherwise520// the length should be known.521@Test(dataProvider = "sparseContent") // nulls are replaced with unknown length522public void testUnknownContentLength(String description, String[] content) {523if (content == null) {524content = BODIES.toArray(String[]::new);525description = "BODIES (known length)";526} else {527description = description.replace("null", "length(-1)");528}529BodyPublisher[] publishers = publishers(content);530BodyPublisher nolength = new BodyPublisher() {531final BodyPublisher missing = BodyPublishers.ofString("missing");532@Override533public long contentLength() { return -1; }534@Override535public void subscribe(Subscriber<? super ByteBuffer> subscriber) {536missing.subscribe(subscriber);537}538};539long length = 0;540for (int i=0; i < content.length; i++) {541if (content[i] == null) {542publishers[i] = nolength;543length = -1;544} else if (length >= 0) {545length += content[i].length();546}547}548out.printf("testUnknownContentLength(%s): %d%n", description, length);549BodyPublisher publisher = BodyPublishers.concat(publishers);550assertEquals(publisher.contentLength(), length,551description.replace("null", "length(-1)"));552}553554private static final Throwable completionCause(CompletionException x) {555while (x.getCause() instanceof CompletionException) {556x = (CompletionException)x.getCause();557}558return x.getCause();559}560561@Test(dataProvider = "negativeRequests")562public void testNegativeRequest(long n) {563assert n <= 0 : "test for negative request called with n > 0 : " + n;564BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));565BodyPublisher publisher = BodyPublishers.concat(publishers);566RequestSubscriber subscriber = new RequestSubscriber();567publisher.subscribe(subscriber);568Subscription subscription = subscriber.subscriptionCF.join();569subscription.request(n);570CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());571Throwable cause = completionCause(expected);572if (cause instanceof IllegalArgumentException) {573System.out.printf("Got expected IAE for %d: %s%n", n, cause);574} else {575throw new AssertionError("Unexpected exception: " + cause,576(cause == null) ? expected : cause);577}578}579580static BodyPublisher[] ofStrings(String... strings) {581return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);582}583584@Test585public void testPositiveRequests() {586// A composite array of publishers587BodyPublisher[] publishers = Stream.of(588Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),589Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),590Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),591Stream.of(ofStrings(" ")),592Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))593).flatMap((s) -> s).toArray(BodyPublisher[]::new);594BodyPublisher publisher = BodyPublishers.concat(publishers);595596// Test that we can request all 13 items in a single request call.597RequestSubscriber requestSubscriber1 = new RequestSubscriber();598publisher.subscribe(requestSubscriber1);599Subscription subscription1 = requestSubscriber1.subscriptionCF.join();600subscription1.request(16);601assertTrue(requestSubscriber1.resultCF().isDone());602List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();603String result1 = stringFromBytes(list1.stream());604assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");605System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));606607// Test that we can split our requests call any which way we want608// (whether in the 'middle of a publisher' or at the boundaries.609RequestSubscriber requestSubscriber2 = new RequestSubscriber();610publisher.subscribe(requestSubscriber2);611Subscription subscription2 = requestSubscriber2.subscriptionCF.join();612subscription2.request(1);613assertFalse(requestSubscriber2.resultCF().isDone());614subscription2.request(10);615assertFalse(requestSubscriber2.resultCF().isDone());616subscription2.request(4);617assertFalse(requestSubscriber2.resultCF().isDone());618subscription2.request(1);619assertTrue(requestSubscriber2.resultCF().isDone());620List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();621String result2 = stringFromBytes(list2.stream());622assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");623System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));624}625626@Test(dataProvider = "contentLengths")627public void testContentLength(long expected, List<Long> lengths) {628BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);629BodyPublisher aggregate = BodyPublishers.concat(publishers);630assertEquals(aggregate.contentLength(), expected,631"Unexpected result for %s".formatted(lengths));632}633634// Verifies that cancelling the subscription ensure that downstream635// publishers are no longer subscribed etc...636@Test637public void testCancel() {638BodyPublisher[] publishers = BODIES.stream()639.map(BodyPublishers::ofString)640.toArray(BodyPublisher[]::new);641BodyPublisher publisher = BodyPublishers.concat(publishers);642643assertEquals(publisher.contentLength(),644BODIES.stream().mapToInt(String::length).sum());645Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();646647for (int n=0; n < BODIES.size(); n++) {648649String description = String.format(650"cancel after %d/%d onNext() invocations",651n, BODIES.size());652RequestSubscriber subscriber = new RequestSubscriber();653publisher.subscribe(subscriber);654Subscription subscription = subscriber.subscriptionCF.join();655subscribers.put(subscriber, description);656657// receive half the data658for (int i = 0; i < n; i++) {659subscription.request(1);660ByteBuffer buffer = subscriber.items.pop();661}662663// cancel subscription664subscription.cancel();665// request the rest...666subscription.request(Long.MAX_VALUE);667}668669CompletableFuture[] results = subscribers.keySet()670.stream().map(RequestSubscriber::resultCF)671.toArray(CompletableFuture[]::new);672CompletableFuture<?> any = CompletableFuture.anyOf(results);673674// subscription was cancelled, so nothing should be received...675try {676TimeoutException x = Assert.expectThrows(TimeoutException.class,677() -> any.get(5, TimeUnit.SECONDS));678out.println("Got expected " + x);679} finally {680subscribers.keySet().stream()681.filter(rs -> rs.resultCF.isDone())682.forEach(rs -> System.err.printf(683"Failed: %s completed with %s",684subscribers.get(rs), rs.resultCF));685}686Consumer<RequestSubscriber> check = (rs) -> {687Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");688Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");689out.println(subscribers.get(rs) + ": PASSED");690};691subscribers.keySet().stream().forEach(check);692}693694// Verifies that cancelling the subscription is propagated downstream695@Test696public void testCancelSubscription() {697PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),698() -> new AssertionError("should not come here"));699BodyPublisher publisher = BodyPublishers.concat(upstream);700701assertEquals(publisher.contentLength(),702BODIES.stream().mapToInt(String::length).sum());703Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();704705for (int n=0; n < BODIES.size(); n++) {706707String description = String.format(708"cancel after %d/%d onNext() invocations",709n, BODIES.size());710RequestSubscriber subscriber = new RequestSubscriber();711publisher.subscribe(subscriber);712Subscription subscription = subscriber.subscriptionCF.join();713subscribers.put(subscriber, description);714715// receive half the data716for (int i = 0; i < n; i++) {717subscription.request(1);718ByteBuffer buffer = subscriber.items.pop();719}720721// cancel subscription722subscription.cancel();723// request the rest...724subscription.request(Long.MAX_VALUE);725assertTrue(upstream.subscribers.get(subscriber).cancelled,726description + " upstream subscription not cancelled");727out.println(description + " upstream subscription was properly cancelled");728}729730CompletableFuture[] results = subscribers.keySet()731.stream().map(RequestSubscriber::resultCF)732.toArray(CompletableFuture[]::new);733CompletableFuture<?> any = CompletableFuture.anyOf(results);734735// subscription was cancelled, so nothing should be received...736try {737TimeoutException x = Assert.expectThrows(TimeoutException.class,738() -> any.get(5, TimeUnit.SECONDS));739out.println("Got expected " + x);740} finally {741subscribers.keySet().stream()742.filter(rs -> rs.resultCF.isDone())743.forEach(rs -> System.err.printf(744"Failed: %s completed with %s",745subscribers.get(rs), rs.resultCF));746}747Consumer<RequestSubscriber> check = (rs) -> {748Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");749Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");750out.println(subscribers.get(rs) + ": PASSED");751};752subscribers.keySet().stream().forEach(check);753754}755756@Test(dataProvider = "variants")757public void test(String uri, boolean sameClient) throws Exception {758System.out.println("Request to " + uri);759760HttpClient client = newHttpClient(sameClient);761762BodyPublisher publisher = BodyPublishers.concat(763BODIES.stream()764.map(BodyPublishers::ofString)765.toArray(HttpRequest.BodyPublisher[]::new)766);767HttpRequest request = HttpRequest.newBuilder(URI.create(uri))768.POST(publisher)769.build();770for (int i = 0; i < ITERATION_COUNT; i++) {771System.out.println("Iteration: " + i);772HttpResponse<String> response = client.send(request, BodyHandlers.ofString());773int expectedResponse = RESPONSE_CODE;774if (response.statusCode() != expectedResponse)775throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));776assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));777}778System.out.println("test: DONE");779}780781@BeforeTest782public void setup() throws Exception {783sslContext = new SimpleSSLContext().get();784if (sslContext == null)785throw new AssertionError("Unexpected null sslContext");786787HttpTestHandler handler = new HttpTestEchoHandler();788InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);789790HttpServer http1 = HttpServer.create(loopback, 0);791http1TestServer = HttpTestServer.of(http1);792http1TestServer.addHandler(handler, "/http1/echo/");793http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";794795HttpsServer https1 = HttpsServer.create(loopback, 0);796https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));797https1TestServer = HttpTestServer.of(https1);798https1TestServer.addHandler(handler, "/https1/echo/");799https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";800801// HTTP/2802http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));803http2TestServer.addHandler(handler, "/http2/echo/");804http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";805806https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));807https2TestServer.addHandler(handler, "/https2/echo/");808https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";809810serverCount.addAndGet(4);811http1TestServer.start();812https1TestServer.start();813http2TestServer.start();814https2TestServer.start();815}816817@AfterTest818public void teardown() throws Exception {819String sharedClientName =820sharedClient == null ? null : sharedClient.toString();821sharedClient = null;822Thread.sleep(100);823AssertionError fail = TRACKER.check(500);824try {825http1TestServer.stop();826https1TestServer.stop();827http2TestServer.stop();828https2TestServer.stop();829} finally {830if (fail != null) {831if (sharedClientName != null) {832System.err.println("Shared client name is: " + sharedClientName);833}834throw fail;835}836}837}838}839840841