Path: blob/master/test/jdk/java/net/httpclient/DependentActionsTest.java
41149 views
/*1* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24* @test25* @summary Verify that dependent synchronous actions added before the CF26* completes are executed either asynchronously in an executor when the27* CF later completes, or in the user thread that joins.28* @library /test/lib http2/server29* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters DependentActionsTest30* @modules java.base/sun.net.www.http31* java.net.http/jdk.internal.net.http.common32* java.net.http/jdk.internal.net.http.frame33* java.net.http/jdk.internal.net.http.hpack34* @run testng/othervm -Djdk.internal.httpclient.debug=true DependentActionsTest35* @run testng/othervm/java.security.policy=dependent.policy36* -Djdk.internal.httpclient.debug=true DependentActionsTest37*/3839import java.io.BufferedReader;40import java.io.InputStreamReader;41import java.lang.StackWalker.StackFrame;42import com.sun.net.httpserver.HttpServer;43import com.sun.net.httpserver.HttpsConfigurator;44import com.sun.net.httpserver.HttpsServer;45import jdk.test.lib.net.SimpleSSLContext;46import org.testng.annotations.AfterTest;47import org.testng.annotations.AfterClass;48import org.testng.annotations.BeforeTest;49import org.testng.annotations.DataProvider;50import org.testng.annotations.Test;5152import javax.net.ssl.SSLContext;53import java.io.IOException;54import java.io.InputStream;55import java.io.OutputStream;56import java.net.InetAddress;57import java.net.InetSocketAddress;58import java.net.URI;59import java.net.http.HttpClient;60import java.net.http.HttpHeaders;61import java.net.http.HttpRequest;62import java.net.http.HttpResponse;63import java.net.http.HttpResponse.BodyHandler;64import java.net.http.HttpResponse.BodyHandlers;65import java.net.http.HttpResponse.BodySubscriber;66import java.nio.ByteBuffer;67import java.nio.charset.StandardCharsets;68import java.util.EnumSet;69import java.util.List;70import java.util.Optional;71import java.util.concurrent.CompletableFuture;72import java.util.concurrent.CompletionException;73import java.util.concurrent.CompletionStage;74import java.util.concurrent.ConcurrentHashMap;75import java.util.concurrent.ConcurrentMap;76import java.util.concurrent.Executor;77import java.util.concurrent.Executors;78import java.util.concurrent.Flow;79import java.util.concurrent.Semaphore;80import java.util.concurrent.atomic.AtomicBoolean;81import java.util.concurrent.atomic.AtomicLong;82import java.util.concurrent.atomic.AtomicReference;83import java.util.function.Consumer;84import java.util.function.Predicate;85import java.util.function.Supplier;86import java.util.stream.Collectors;87import java.util.stream.Stream;8889import static java.lang.System.out;90import static java.lang.String.format;91import static java.util.stream.Collectors.toList;92import static org.testng.Assert.assertEquals;93import static org.testng.Assert.assertTrue;9495public class DependentActionsTest implements HttpServerAdapters {9697SSLContext sslContext;98HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]99HttpTestServer httpsTestServer; // HTTPS/1.1100HttpTestServer http2TestServer; // HTTP/2 ( h2c )101HttpTestServer https2TestServer; // HTTP/2 ( h2 )102String httpURI_fixed;103String httpURI_chunk;104String httpsURI_fixed;105String httpsURI_chunk;106String http2URI_fixed;107String http2URI_chunk;108String https2URI_fixed;109String https2URI_chunk;110111static final StackWalker WALKER =112StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);113114static final int ITERATION_COUNT = 1;115// a shared executor helps reduce the amount of threads created by the test116static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());117static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();118static volatile boolean tasksFailed;119static final AtomicLong serverCount = new AtomicLong();120static final AtomicLong clientCount = new AtomicLong();121static final long start = System.nanoTime();122public static String now() {123long now = System.nanoTime() - start;124long secs = now / 1000_000_000;125long mill = (now % 1000_000_000) / 1000_000;126long nan = now % 1000_000;127return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);128}129130private volatile HttpClient sharedClient;131132static class TestExecutor implements Executor {133final AtomicLong tasks = new AtomicLong();134Executor executor;135TestExecutor(Executor executor) {136this.executor = executor;137}138139@Override140public void execute(Runnable command) {141long id = tasks.incrementAndGet();142executor.execute(() -> {143try {144command.run();145} catch (Throwable t) {146tasksFailed = true;147System.out.printf(now() + "Task %s failed: %s%n", id, t);148System.err.printf(now() + "Task %s failed: %s%n", id, t);149FAILURES.putIfAbsent("Task " + id, t);150throw t;151}152});153}154}155156@AfterClass157static final void printFailedTests() {158out.println("\n=========================");159try {160out.printf("%n%sCreated %d servers and %d clients%n",161now(), serverCount.get(), clientCount.get());162if (FAILURES.isEmpty()) return;163out.println("Failed tests: ");164FAILURES.entrySet().forEach((e) -> {165out.printf("\t%s: %s%n", e.getKey(), e.getValue());166e.getValue().printStackTrace(out);167e.getValue().printStackTrace();168});169if (tasksFailed) {170System.out.println("WARNING: Some tasks failed");171}172} finally {173out.println("\n=========================\n");174}175}176177private String[] uris() {178return new String[] {179httpURI_fixed,180httpURI_chunk,181httpsURI_fixed,182httpsURI_chunk,183http2URI_fixed,184http2URI_chunk,185https2URI_fixed,186https2URI_chunk,187};188}189190static final class SemaphoreStallerSupplier191implements Supplier<SemaphoreStaller> {192@Override193public SemaphoreStaller get() {194return new SemaphoreStaller();195}196@Override197public String toString() {198return "SemaphoreStaller";199}200}201202@DataProvider(name = "noStalls")203public Object[][] noThrows() {204String[] uris = uris();205Object[][] result = new Object[uris.length * 2][];206int i = 0;207for (boolean sameClient : List.of(false, true)) {208for (String uri: uris()) {209result[i++] = new Object[] {uri, sameClient};210}211}212assert i == uris.length * 2;213return result;214}215216@DataProvider(name = "variants")217public Object[][] variants() {218String[] uris = uris();219Object[][] result = new Object[uris.length * 2][];220int i = 0;221Supplier<? extends Staller> s = new SemaphoreStallerSupplier();222for (Supplier<? extends Staller> staller : List.of(s)) {223for (boolean sameClient : List.of(false, true)) {224for (String uri : uris()) {225result[i++] = new Object[]{uri, sameClient, staller};226}227}228}229assert i == uris.length * 2;230return result;231}232233private HttpClient makeNewClient() {234clientCount.incrementAndGet();235return HttpClient.newBuilder()236.executor(executor)237.sslContext(sslContext)238.build();239}240241HttpClient newHttpClient(boolean share) {242if (!share) return makeNewClient();243HttpClient shared = sharedClient;244if (shared != null) return shared;245synchronized (this) {246shared = sharedClient;247if (shared == null) {248shared = sharedClient = makeNewClient();249}250return shared;251}252}253254@Test(dataProvider = "noStalls")255public void testNoStalls(String uri, boolean sameClient)256throws Exception {257HttpClient client = null;258out.printf("%ntestNoStalls(%s, %b)%n", uri, sameClient);259for (int i=0; i< ITERATION_COUNT; i++) {260if (!sameClient || client == null)261client = newHttpClient(sameClient);262263HttpRequest req = HttpRequest.newBuilder(URI.create(uri))264.build();265BodyHandler<String> handler =266new StallingBodyHandler((w) -> {},267BodyHandlers.ofString());268HttpResponse<String> response = client.send(req, handler);269String body = response.body();270assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());271}272}273274@Test(dataProvider = "variants")275public void testAsStringAsync(String uri,276boolean sameClient,277Supplier<Staller> s)278throws Exception279{280Staller staller = s.get();281String test = format("testAsStringAsync(%s, %b, %s)",282uri, sameClient, staller);283testDependent(test, uri, sameClient, BodyHandlers::ofString,284this::finish, this::extractString, staller);285}286287@Test(dataProvider = "variants")288public void testAsLinesAsync(String uri,289boolean sameClient,290Supplier<Staller> s)291throws Exception292{293Staller staller = s.get();294String test = format("testAsLinesAsync(%s, %b, %s)",295uri, sameClient, staller);296testDependent(test, uri, sameClient, BodyHandlers::ofLines,297this::finish, this::extractStream, staller);298}299300@Test(dataProvider = "variants")301public void testAsInputStreamAsync(String uri,302boolean sameClient,303Supplier<Staller> s)304throws Exception305{306Staller staller = s.get();307String test = format("testAsInputStreamAsync(%s, %b, %s)",308uri, sameClient, staller);309testDependent(test, uri, sameClient, BodyHandlers::ofInputStream,310this::finish, this::extractInputStream, staller);311}312313private <T,U> void testDependent(String name, String uri, boolean sameClient,314Supplier<BodyHandler<T>> handlers,315Finisher finisher,316Extractor extractor,317Staller staller)318throws Exception319{320out.printf("%n%s%s%n", now(), name);321try {322testDependent(uri, sameClient, handlers, finisher, extractor, staller);323} catch (Error | Exception x) {324FAILURES.putIfAbsent(name, x);325throw x;326}327}328329private <T,U> void testDependent(String uri, boolean sameClient,330Supplier<BodyHandler<T>> handlers,331Finisher finisher,332Extractor extractor,333Staller staller)334throws Exception335{336HttpClient client = null;337for (Where where : EnumSet.of(Where.BODY_HANDLER)) {338if (!sameClient || client == null)339client = newHttpClient(sameClient);340341HttpRequest req = HttpRequest.342newBuilder(URI.create(uri))343.build();344BodyHandler<T> handler =345new StallingBodyHandler(where.select(staller), handlers.get());346System.out.println("try stalling in " + where);347staller.acquire();348assert staller.willStall();349CompletableFuture<HttpResponse<T>> responseCF = client.sendAsync(req, handler);350assert !responseCF.isDone();351finisher.finish(where, responseCF, staller, extractor);352}353}354355enum Where {356BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;357public Consumer<Where> select(Consumer<Where> consumer) {358return new Consumer<Where>() {359@Override360public void accept(Where where) {361if (Where.this == where) {362consumer.accept(where);363}364}365};366}367}368369interface Extractor<T> {370public List<String> extract(HttpResponse<T> resp);371}372373final List<String> extractString(HttpResponse<String> resp) {374return List.of(resp.body());375}376377final List<String> extractStream(HttpResponse<Stream<String>> resp) {378return resp.body().collect(toList());379}380381final List<String> extractInputStream(HttpResponse<InputStream> resp) {382try (InputStream is = resp.body()) {383return new BufferedReader(new InputStreamReader(is))384.lines().collect(toList());385} catch (IOException x) {386throw new CompletionException(x);387}388}389390interface Finisher<T> {391public void finish(Where w,392CompletableFuture<HttpResponse<T>> cf,393Staller staller,394Extractor extractor);395}396397Optional<StackFrame> findFrame(Stream<StackFrame> s, String name) {398return s.filter((f) -> f.getClassName().contains(name))399.filter((f) -> f.getDeclaringClass().getModule().equals(HttpClient.class.getModule()))400.findFirst();401}402403static final Predicate<StackFrame> DAT = sfe ->404sfe.getClassName().startsWith("DependentActionsTest");405static final Predicate<StackFrame> JUC = sfe ->406sfe.getClassName().startsWith("java.util.concurrent");407static final Predicate<StackFrame> JLT = sfe ->408sfe.getClassName().startsWith("java.lang.Thread");409static final Predicate<StackFrame> NotDATorJUCorJLT = Predicate.not(DAT.or(JUC).or(JLT));410411412<T> void checkThreadAndStack(Thread thread,413AtomicReference<RuntimeException> failed,414T result,415Throwable error) {416//failed.set(new RuntimeException("Dependant action was executed in " + thread));417List<StackFrame> otherFrames = WALKER.walk(s -> s.filter(NotDATorJUCorJLT).collect(toList()));418if (!otherFrames.isEmpty()) {419System.out.println("Found unexpected trace: ");420otherFrames.forEach(f -> System.out.printf("\t%s%n", f));421failed.set(new RuntimeException("Dependant action has unexpected frame in " +422Thread.currentThread() + ": " + otherFrames.get(0)));423424}425}426427<T> void finish(Where w, CompletableFuture<HttpResponse<T>> cf,428Staller staller,429Extractor<T> extractor) {430Thread thread = Thread.currentThread();431AtomicReference<RuntimeException> failed = new AtomicReference<>();432CompletableFuture<HttpResponse<T>> done = cf.whenComplete(433(r,t) -> checkThreadAndStack(thread, failed, r, t));434assert !cf.isDone();435try {436Thread.sleep(100);437} catch (Throwable t) {/* don't care */}438assert !cf.isDone();439staller.release();440try {441HttpResponse<T> response = done.join();442List<String> result = extractor.extract(response);443RuntimeException error = failed.get();444if (error != null) {445throw new RuntimeException("Test failed in "446+ w + ": " + response, error);447}448assertEquals(result, List.of(response.request().uri().getPath()));449} finally {450staller.reset();451}452}453454interface Staller extends Consumer<Where> {455void release();456void acquire();457void reset();458boolean willStall();459}460461static final class SemaphoreStaller implements Staller {462final Semaphore sem = new Semaphore(1);463@Override464public void accept(Where where) {465System.out.println("Acquiring semaphore in "466+ where + " permits=" + sem.availablePermits());467sem.acquireUninterruptibly();468System.out.println("Semaphored acquired in " + where);469}470471@Override472public void release() {473System.out.println("Releasing semaphore: permits="474+ sem.availablePermits());475sem.release();476}477478@Override479public void acquire() {480sem.acquireUninterruptibly();481System.out.println("Semaphored acquired");482}483484@Override485public void reset() {486System.out.println("Reseting semaphore: permits="487+ sem.availablePermits());488sem.drainPermits();489sem.release();490System.out.println("Semaphore reset: permits="491+ sem.availablePermits());492}493494@Override495public boolean willStall() {496return sem.availablePermits() <= 0;497}498499@Override500public String toString() {501return "SemaphoreStaller";502}503}504505static final class StallingBodyHandler<T> implements BodyHandler<T> {506final Consumer<Where> stalling;507final BodyHandler<T> bodyHandler;508StallingBodyHandler(Consumer<Where> stalling, BodyHandler<T> bodyHandler) {509this.stalling = stalling;510this.bodyHandler = bodyHandler;511}512@Override513public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {514stalling.accept(Where.BODY_HANDLER);515BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);516return new StallingBodySubscriber(stalling, subscriber);517}518}519520static final class StallingBodySubscriber<T> implements BodySubscriber<T> {521private final BodySubscriber<T> subscriber;522volatile boolean onSubscribeCalled;523final Consumer<Where> stalling;524StallingBodySubscriber(Consumer<Where> stalling, BodySubscriber<T> subscriber) {525this.stalling = stalling;526this.subscriber = subscriber;527}528529@Override530public void onSubscribe(Flow.Subscription subscription) {531//out.println("onSubscribe ");532onSubscribeCalled = true;533stalling.accept(Where.ON_SUBSCRIBE);534subscriber.onSubscribe(subscription);535}536537@Override538public void onNext(List<ByteBuffer> item) {539// out.println("onNext " + item);540assertTrue(onSubscribeCalled);541stalling.accept(Where.ON_NEXT);542subscriber.onNext(item);543}544545@Override546public void onError(Throwable throwable) {547//out.println("onError");548assertTrue(onSubscribeCalled);549stalling.accept(Where.ON_ERROR);550subscriber.onError(throwable);551}552553@Override554public void onComplete() {555//out.println("onComplete");556assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");557stalling.accept(Where.ON_COMPLETE);558subscriber.onComplete();559}560561@Override562public CompletionStage<T> getBody() {563stalling.accept(Where.GET_BODY);564try {565stalling.accept(Where.BODY_CF);566} catch (Throwable t) {567return CompletableFuture.failedFuture(t);568}569return subscriber.getBody();570}571}572573574@BeforeTest575public void setup() throws Exception {576sslContext = new SimpleSSLContext().get();577if (sslContext == null)578throw new AssertionError("Unexpected null sslContext");579580// HTTP/1.1581HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();582HttpTestHandler h1_chunkHandler = new HTTP_ChunkedHandler();583InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);584httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));585httpTestServer.addHandler(h1_fixedLengthHandler, "/http1/fixed");586httpTestServer.addHandler(h1_chunkHandler, "/http1/chunk");587httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed/x";588httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk/x";589590HttpsServer httpsServer = HttpsServer.create(sa, 0);591httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));592httpsTestServer = HttpTestServer.of(httpsServer);593httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");594httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");595httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed/x";596httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk/x";597598// HTTP/2599HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();600HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();601602http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));603http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");604http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");605http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";606http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";607608https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));609https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");610https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");611https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";612https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";613614serverCount.addAndGet(4);615httpTestServer.start();616httpsTestServer.start();617http2TestServer.start();618https2TestServer.start();619}620621@AfterTest622public void teardown() throws Exception {623sharedClient = null;624httpTestServer.stop();625httpsTestServer.stop();626http2TestServer.stop();627https2TestServer.stop();628}629630static class HTTP_FixedLengthHandler implements HttpTestHandler {631@Override632public void handle(HttpTestExchange t) throws IOException {633out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());634try (InputStream is = t.getRequestBody()) {635is.readAllBytes();636}637byte[] resp = t.getRequestURI().getPath().getBytes(StandardCharsets.UTF_8);638t.sendResponseHeaders(200, resp.length); //fixed content length639try (OutputStream os = t.getResponseBody()) {640os.write(resp);641}642}643}644645static class HTTP_ChunkedHandler implements HttpTestHandler {646@Override647public void handle(HttpTestExchange t) throws IOException {648out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());649byte[] resp = t.getRequestURI().getPath().toString().getBytes(StandardCharsets.UTF_8);650try (InputStream is = t.getRequestBody()) {651is.readAllBytes();652}653t.sendResponseHeaders(200, -1); // chunked/variable654try (OutputStream os = t.getResponseBody()) {655os.write(resp);656}657}658}659}660661662