Path: blob/master/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java
41149 views
/*1* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import com.sun.net.httpserver.HttpServer;24import com.sun.net.httpserver.HttpsConfigurator;25import com.sun.net.httpserver.HttpsServer;26import jdk.test.lib.net.SimpleSSLContext;27import org.testng.ITestContext;28import org.testng.annotations.AfterTest;29import org.testng.annotations.AfterClass;30import org.testng.annotations.BeforeMethod;31import org.testng.annotations.BeforeTest;32import org.testng.annotations.DataProvider;33import org.testng.annotations.Test;3435import javax.net.ssl.SSLContext;36import java.io.BufferedReader;37import java.io.IOException;38import java.io.InputStream;39import java.io.InputStreamReader;40import java.io.OutputStream;41import java.io.UncheckedIOException;42import java.net.InetAddress;43import java.net.InetSocketAddress;44import java.net.URI;45import java.net.http.HttpClient;46import java.net.http.HttpHeaders;47import java.net.http.HttpRequest;48import java.net.http.HttpResponse;49import java.net.http.HttpResponse.BodyHandler;50import java.net.http.HttpResponse.BodyHandlers;51import java.net.http.HttpResponse.BodySubscriber;52import java.nio.ByteBuffer;53import java.nio.charset.StandardCharsets;54import java.util.EnumSet;55import java.util.List;56import java.util.concurrent.CompletableFuture;57import java.util.concurrent.CompletionStage;58import java.util.concurrent.ConcurrentHashMap;59import java.util.concurrent.ConcurrentMap;60import java.util.concurrent.Executor;61import java.util.concurrent.Executors;62import java.util.concurrent.Flow;63import java.util.concurrent.atomic.AtomicLong;64import java.util.function.Consumer;65import java.util.function.Predicate;66import java.util.function.Supplier;67import java.util.stream.Collectors;68import java.util.stream.Stream;6970import static java.lang.System.out;71import static java.lang.String.format;72import static java.nio.charset.StandardCharsets.UTF_8;73import static org.testng.Assert.assertEquals;74import static org.testng.Assert.assertTrue;7576public abstract class AbstractThrowingSubscribers implements HttpServerAdapters {7778SSLContext sslContext;79HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]80HttpTestServer httpsTestServer; // HTTPS/1.181HttpTestServer http2TestServer; // HTTP/2 ( h2c )82HttpTestServer https2TestServer; // HTTP/2 ( h2 )83String httpURI_fixed;84String httpURI_chunk;85String httpsURI_fixed;86String httpsURI_chunk;87String http2URI_fixed;88String http2URI_chunk;89String https2URI_fixed;90String https2URI_chunk;9192static final int ITERATION_COUNT = 1;93// a shared executor helps reduce the amount of threads created by the test94static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());95static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();96static volatile boolean tasksFailed;97static final AtomicLong serverCount = new AtomicLong();98static final AtomicLong clientCount = new AtomicLong();99static final long start = System.nanoTime();100public static String now() {101long now = System.nanoTime() - start;102long secs = now / 1000_000_000;103long mill = (now % 1000_000_000) / 1000_000;104long nan = now % 1000_000;105return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);106}107108final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;109private volatile HttpClient sharedClient;110111static class TestExecutor implements Executor {112final AtomicLong tasks = new AtomicLong();113Executor executor;114TestExecutor(Executor executor) {115this.executor = executor;116}117118@Override119public void execute(Runnable command) {120long id = tasks.incrementAndGet();121executor.execute(() -> {122try {123command.run();124} catch (Throwable t) {125tasksFailed = true;126System.out.printf(now() + "Task %s failed: %s%n", id, t);127System.err.printf(now() + "Task %s failed: %s%n", id, t);128FAILURES.putIfAbsent("Task " + id, t);129throw t;130}131});132}133}134135protected boolean stopAfterFirstFailure() {136return Boolean.getBoolean("jdk.internal.httpclient.debug");137}138139@BeforeMethod140void beforeMethod(ITestContext context) {141if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {142throw new RuntimeException("some tests failed");143}144}145146@AfterClass147static final void printFailedTests() {148out.println("\n=========================");149try {150out.printf("%n%sCreated %d servers and %d clients%n",151now(), serverCount.get(), clientCount.get());152if (FAILURES.isEmpty()) return;153out.println("Failed tests: ");154FAILURES.entrySet().forEach((e) -> {155out.printf("\t%s: %s%n", e.getKey(), e.getValue());156e.getValue().printStackTrace(out);157e.getValue().printStackTrace();158});159if (tasksFailed) {160System.out.println("WARNING: Some tasks failed");161}162} finally {163out.println("\n=========================\n");164}165}166167private String[] uris() {168return new String[] {169httpURI_fixed,170httpURI_chunk,171httpsURI_fixed,172httpsURI_chunk,173http2URI_fixed,174http2URI_chunk,175https2URI_fixed,176https2URI_chunk,177};178}179180static AtomicLong URICOUNT = new AtomicLong();181182@DataProvider(name = "sanity")183public Object[][] sanity() {184String[] uris = uris();185Object[][] result = new Object[uris.length * 2][];186int i = 0;187for (boolean sameClient : List.of(false, true)) {188for (String uri: uris()) {189result[i++] = new Object[] {uri, sameClient};190}191}192assert i == uris.length * 2;193return result;194}195196@DataProvider(name = "variants")197public Object[][] variants(ITestContext context) {198if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {199return new Object[0][];200}201String[] uris = uris();202Object[][] result = new Object[uris.length * 2 * 2][];203int i = 0;204for (Thrower thrower : List.of(205new UncheckedIOExceptionThrower(),206new UncheckedCustomExceptionThrower())) {207for (boolean sameClient : List.of(false, true)) {208for (String uri : uris()) {209result[i++] = new Object[]{uri, sameClient, thrower};210}211}212}213assert i == uris.length * 2 * 2;214return result;215}216217private HttpClient makeNewClient() {218clientCount.incrementAndGet();219HttpClient client = HttpClient.newBuilder()220.proxy(HttpClient.Builder.NO_PROXY)221.executor(executor)222.sslContext(sslContext)223.build();224return TRACKER.track(client);225}226227HttpClient newHttpClient(boolean share) {228if (!share) return makeNewClient();229HttpClient shared = sharedClient;230if (shared != null) return shared;231synchronized (this) {232shared = sharedClient;233if (shared == null) {234shared = sharedClient = makeNewClient();235}236return shared;237}238}239240enum SubscriberType {241INLINE, // In line subscribers complete their CF on ON_COMPLETE242// e.g. BodySubscribers::ofString243OFFLINE; // Off line subscribers complete their CF immediately244// but require the client to pull the data after the245// CF completes (e.g. BodySubscribers::ofInputStream)246}247248static EnumSet<Where> excludes(SubscriberType type) {249EnumSet<Where> set = EnumSet.noneOf(Where.class);250251if (type == SubscriberType.OFFLINE) {252// Throwing on onSubscribe needs some more work253// for the case of InputStream, where the body has already254// completed by the time the subscriber is subscribed.255// The only way we have at that point to relay the exception256// is to call onError on the subscriber, but should we if257// Subscriber::onSubscribed has thrown an exception and258// not completed normally?259set.add(Where.ON_SUBSCRIBE);260}261262// Don't know how to make the stack reliably cause onError263// to be called without closing the connection.264// And how do we get the exception if onError throws anyway?265set.add(Where.ON_ERROR);266267return set;268}269270//@Test(dataProvider = "sanity")271protected void testSanityImpl(String uri, boolean sameClient)272throws Exception {273HttpClient client = null;274String uri2 = uri + "-" + URICOUNT.incrementAndGet() + "/sanity";275out.printf("%ntestSanity(%s, %b)%n", uri2, sameClient);276for (int i=0; i< ITERATION_COUNT; i++) {277if (!sameClient || client == null)278client = newHttpClient(sameClient);279280HttpRequest req = HttpRequest.newBuilder(URI.create(uri2))281.build();282BodyHandler<String> handler =283new ThrowingBodyHandler((w) -> {},284BodyHandlers.ofString());285HttpResponse<String> response = client.send(req, handler);286String body = response.body();287assertEquals(URI.create(body).getPath(), URI.create(uri2).getPath());288}289}290291//@Test(dataProvider = "variants")292protected void testThrowingAsStringImpl(String uri,293boolean sameClient,294Thrower thrower)295throws Exception296{297uri = uri + "-" + URICOUNT.incrementAndGet();298String test = format("testThrowingAsString(%s, %b, %s)",299uri, sameClient, thrower);300testThrowing(test, uri, sameClient, BodyHandlers::ofString,301this::shouldHaveThrown, thrower,false,302excludes(SubscriberType.INLINE));303}304305//@Test(dataProvider = "variants")306protected void testThrowingAsLinesImpl(String uri,307boolean sameClient,308Thrower thrower)309throws Exception310{311uri = uri + "-" + URICOUNT.incrementAndGet();312String test = format("testThrowingAsLines(%s, %b, %s)",313uri, sameClient, thrower);314testThrowing(test, uri, sameClient, BodyHandlers::ofLines,315this::checkAsLines, thrower,false,316excludes(SubscriberType.OFFLINE));317}318319//@Test(dataProvider = "variants")320protected void testThrowingAsInputStreamImpl(String uri,321boolean sameClient,322Thrower thrower)323throws Exception324{325uri = uri + "-" + URICOUNT.incrementAndGet();326String test = format("testThrowingAsInputStream(%s, %b, %s)",327uri, sameClient, thrower);328testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,329this::checkAsInputStream, thrower,false,330excludes(SubscriberType.OFFLINE));331}332333//@Test(dataProvider = "variants")334protected void testThrowingAsStringAsyncImpl(String uri,335boolean sameClient,336Thrower thrower)337throws Exception338{339uri = uri + "-" + URICOUNT.incrementAndGet();340String test = format("testThrowingAsStringAsync(%s, %b, %s)",341uri, sameClient, thrower);342testThrowing(test, uri, sameClient, BodyHandlers::ofString,343this::shouldHaveThrown, thrower, true,344excludes(SubscriberType.INLINE));345}346347//@Test(dataProvider = "variants")348protected void testThrowingAsLinesAsyncImpl(String uri,349boolean sameClient,350Thrower thrower)351throws Exception352{353uri = uri + "-" + URICOUNT.incrementAndGet();354String test = format("testThrowingAsLinesAsync(%s, %b, %s)",355uri, sameClient, thrower);356testThrowing(test, uri, sameClient, BodyHandlers::ofLines,357this::checkAsLines, thrower,true,358excludes(SubscriberType.OFFLINE));359}360361//@Test(dataProvider = "variants")362protected void testThrowingAsInputStreamAsyncImpl(String uri,363boolean sameClient,364Thrower thrower)365throws Exception366{367uri = uri + "-" + URICOUNT.incrementAndGet();368String test = format("testThrowingAsInputStreamAsync(%s, %b, %s)",369uri, sameClient, thrower);370testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,371this::checkAsInputStream, thrower,true,372excludes(SubscriberType.OFFLINE));373}374375private <T,U> void testThrowing(String name, String uri, boolean sameClient,376Supplier<BodyHandler<T>> handlers,377Finisher finisher, Thrower thrower,378boolean async, EnumSet<Where> excludes)379throws Exception380{381out.printf("%n%s%s%n", now(), name);382try {383testThrowing(uri, sameClient, handlers, finisher, thrower, async, excludes);384} catch (Error | Exception x) {385FAILURES.putIfAbsent(name, x);386throw x;387}388}389390private <T,U> void testThrowing(String uri, boolean sameClient,391Supplier<BodyHandler<T>> handlers,392Finisher finisher, Thrower thrower,393boolean async,394EnumSet<Where> excludes)395throws Exception396{397HttpClient client = null;398for (Where where : EnumSet.complementOf(excludes)) {399400if (!sameClient || client == null)401client = newHttpClient(sameClient);402String uri2 = uri + "-" + where;403HttpRequest req = HttpRequest.404newBuilder(URI.create(uri2))405.build();406BodyHandler<T> handler =407new ThrowingBodyHandler(where.select(thrower), handlers.get());408System.out.println("try throwing in " + where);409HttpResponse<T> response = null;410if (async) {411try {412response = client.sendAsync(req, handler).join();413} catch (Error | Exception x) {414Throwable cause = findCause(x, thrower);415if (cause == null) throw causeNotFound(where, x);416System.out.println(now() + "Got expected exception: " + cause);417}418} else {419try {420response = client.send(req, handler);421} catch (Error | Exception t) {422// synchronous send will rethrow exceptions423Throwable throwable = t.getCause();424assert throwable != null;425426if (thrower.test(throwable)) {427System.out.println(now() + "Got expected exception: " + throwable);428} else throw causeNotFound(where, t);429}430}431if (response != null) {432finisher.finish(where, response, thrower);433}434}435}436437enum Where {438BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;439public Consumer<Where> select(Consumer<Where> consumer) {440return new Consumer<Where>() {441@Override442public void accept(Where where) {443if (Where.this == where) {444consumer.accept(where);445}446}447};448}449}450451static AssertionError causeNotFound(Where w, Throwable t) {452return new AssertionError("Expected exception not found in " + w, t);453}454455interface Thrower extends Consumer<Where>, Predicate<Throwable> {456457}458459interface Finisher<T,U> {460U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;461}462463final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {464String msg = "Expected exception not thrown in " + w465+ "\n\tReceived: " + resp466+ "\n\tWith body: " + resp.body();467System.out.println(msg);468throw new RuntimeException(msg);469}470471final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {472switch(w) {473case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);474case GET_BODY: return shouldHaveThrown(w, resp, thrower);475case BODY_CF: return shouldHaveThrown(w, resp, thrower);476default: break;477}478List<String> result = null;479try {480result = resp.body().collect(Collectors.toList());481} catch (Error | Exception x) {482Throwable cause = findCause(x, thrower);483if (cause != null) {484out.println(now() + "Got expected exception in " + w + ": " + cause);485return result;486}487throw causeNotFound(w, x);488}489return shouldHaveThrown(w, resp, thrower);490}491492final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,493Thrower thrower)494throws IOException495{496switch(w) {497case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);498case GET_BODY: return shouldHaveThrown(w, resp, thrower);499case BODY_CF: return shouldHaveThrown(w, resp, thrower);500default: break;501}502List<String> result = null;503try (InputStreamReader r1 = new InputStreamReader(resp.body(), UTF_8);504BufferedReader r = new BufferedReader(r1)) {505try {506result = r.lines().collect(Collectors.toList());507} catch (Error | Exception x) {508Throwable cause = findCause(x, thrower);509if (cause != null) {510out.println(now() + "Got expected exception in " + w + ": " + cause);511return result;512}513throw causeNotFound(w, x);514}515}516return shouldHaveThrown(w, resp, thrower);517}518519private static Throwable findCause(Throwable x,520Predicate<Throwable> filter) {521while (x != null && !filter.test(x)) x = x.getCause();522return x;523}524525static final class UncheckedCustomExceptionThrower implements Thrower {526@Override527public void accept(Where where) {528out.println(now() + "Throwing in " + where);529throw new UncheckedCustomException(where.name());530}531532@Override533public boolean test(Throwable throwable) {534return UncheckedCustomException.class.isInstance(throwable);535}536537@Override538public String toString() {539return "UncheckedCustomExceptionThrower";540}541}542543static final class UncheckedIOExceptionThrower implements Thrower {544@Override545public void accept(Where where) {546out.println(now() + "Throwing in " + where);547throw new UncheckedIOException(new CustomIOException(where.name()));548}549550@Override551public boolean test(Throwable throwable) {552return UncheckedIOException.class.isInstance(throwable)553&& CustomIOException.class.isInstance(throwable.getCause());554}555556@Override557public String toString() {558return "UncheckedIOExceptionThrower";559}560}561562static final class UncheckedCustomException extends RuntimeException {563UncheckedCustomException(String message) {564super(message);565}566UncheckedCustomException(String message, Throwable cause) {567super(message, cause);568}569}570571static final class CustomIOException extends IOException {572CustomIOException(String message) {573super(message);574}575CustomIOException(String message, Throwable cause) {576super(message, cause);577}578}579580static final class ThrowingBodyHandler<T> implements BodyHandler<T> {581final Consumer<Where> throwing;582final BodyHandler<T> bodyHandler;583ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {584this.throwing = throwing;585this.bodyHandler = bodyHandler;586}587@Override588public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {589throwing.accept(Where.BODY_HANDLER);590BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);591return new ThrowingBodySubscriber(throwing, subscriber);592}593}594595static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {596private final BodySubscriber<T> subscriber;597volatile boolean onSubscribeCalled;598final Consumer<Where> throwing;599ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {600this.throwing = throwing;601this.subscriber = subscriber;602}603604@Override605public void onSubscribe(Flow.Subscription subscription) {606//out.println("onSubscribe ");607onSubscribeCalled = true;608throwing.accept(Where.ON_SUBSCRIBE);609subscriber.onSubscribe(subscription);610}611612@Override613public void onNext(List<ByteBuffer> item) {614// out.println("onNext " + item);615assertTrue(onSubscribeCalled);616throwing.accept(Where.ON_NEXT);617subscriber.onNext(item);618}619620@Override621public void onError(Throwable throwable) {622//out.println("onError");623assertTrue(onSubscribeCalled);624throwing.accept(Where.ON_ERROR);625subscriber.onError(throwable);626}627628@Override629public void onComplete() {630//out.println("onComplete");631assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");632throwing.accept(Where.ON_COMPLETE);633subscriber.onComplete();634}635636@Override637public CompletionStage<T> getBody() {638throwing.accept(Where.GET_BODY);639try {640throwing.accept(Where.BODY_CF);641} catch (Throwable t) {642return CompletableFuture.failedFuture(t);643}644return subscriber.getBody();645}646}647648649@BeforeTest650public void setup() throws Exception {651sslContext = new SimpleSSLContext().get();652if (sslContext == null)653throw new AssertionError("Unexpected null sslContext");654655// HTTP/1.1656HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();657HttpTestHandler h1_chunkHandler = new HTTP_ChunkedHandler();658InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);659httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));660httpTestServer.addHandler(h1_fixedLengthHandler, "/http1/fixed");661httpTestServer.addHandler(h1_chunkHandler, "/http1/chunk");662httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed/x";663httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk/x";664665HttpsServer httpsServer = HttpsServer.create(sa, 0);666httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));667httpsTestServer = HttpTestServer.of(httpsServer);668httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");669httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");670httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed/x";671httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk/x";672673// HTTP/2674HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();675HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();676677http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));678http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");679http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");680http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";681http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";682683https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));684https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");685https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");686https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";687https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";688689serverCount.addAndGet(4);690httpTestServer.start();691httpsTestServer.start();692http2TestServer.start();693https2TestServer.start();694}695696@AfterTest697public void teardown() throws Exception {698String sharedClientName =699sharedClient == null ? null : sharedClient.toString();700sharedClient = null;701Thread.sleep(100);702AssertionError fail = TRACKER.check(500);703try {704httpTestServer.stop();705httpsTestServer.stop();706http2TestServer.stop();707https2TestServer.stop();708} finally {709if (fail != null) {710if (sharedClientName != null) {711System.err.println("Shared client name is: " + sharedClientName);712}713throw fail;714}715}716}717718static class HTTP_FixedLengthHandler implements HttpTestHandler {719@Override720public void handle(HttpTestExchange t) throws IOException {721out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());722try (InputStream is = t.getRequestBody()) {723is.readAllBytes();724}725byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);726t.sendResponseHeaders(200, resp.length); //fixed content length727try (OutputStream os = t.getResponseBody()) {728os.write(resp);729}730}731}732733static class HTTP_ChunkedHandler implements HttpTestHandler {734@Override735public void handle(HttpTestExchange t) throws IOException {736out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());737byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);738try (InputStream is = t.getRequestBody()) {739is.readAllBytes();740}741t.sendResponseHeaders(200, -1); // chunked/variable742try (OutputStream os = t.getResponseBody()) {743os.write(resp);744}745}746}747748}749750751