Path: blob/master/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java
41149 views
/*1* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.io.ByteArrayOutputStream;24import java.io.IOException;25import java.io.InputStream;26import java.io.OutputStream;27import java.io.UncheckedIOException;28import java.net.InetAddress;29import java.net.InetSocketAddress;30import java.net.URI;31import java.nio.ByteBuffer;32import java.util.Collection;33import java.util.List;34import java.util.concurrent.CompletableFuture;35import java.util.concurrent.Flow;36import java.util.concurrent.Flow.Subscriber;37import java.util.function.Function;38import java.util.function.Supplier;39import com.sun.net.httpserver.HttpExchange;40import com.sun.net.httpserver.HttpHandler;41import com.sun.net.httpserver.HttpServer;42import com.sun.net.httpserver.HttpsConfigurator;43import com.sun.net.httpserver.HttpsServer;44import java.net.http.HttpClient;45import java.net.http.HttpRequest;46import java.net.http.HttpRequest.BodyPublishers;47import java.net.http.HttpResponse;48import java.net.http.HttpResponse.BodyHandlers;49import java.net.http.HttpResponse.BodySubscribers;50import jdk.test.lib.net.SimpleSSLContext;51import org.testng.annotations.AfterTest;52import org.testng.annotations.BeforeTest;53import org.testng.annotations.DataProvider;54import org.testng.annotations.Test;55import javax.net.ssl.SSLContext;56import static java.nio.charset.StandardCharsets.UTF_8;57import static org.testng.Assert.assertEquals;58import static org.testng.Assert.assertThrows;59import static org.testng.Assert.assertTrue;6061/*62* @test63* @summary Basic tests for Flow adapter Subscribers64* @modules java.base/sun.net.www.http65* java.net.http/jdk.internal.net.http.common66* java.net.http/jdk.internal.net.http.frame67* java.net.http/jdk.internal.net.http.hpack68* java.logging69* jdk.httpserver70* @library /test/lib http2/server71* @build Http2TestServer72* @build jdk.test.lib.net.SimpleSSLContext73* @run testng/othervm -Djdk.internal.httpclient.debug=true FlowAdapterSubscriberTest74*/7576public class FlowAdapterSubscriberTest {7778SSLContext sslContext;79HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]80HttpsServer httpsTestServer; // HTTPS/1.181Http2TestServer http2TestServer; // HTTP/2 ( h2c )82Http2TestServer https2TestServer; // HTTP/2 ( h2 )83String httpURI;84String httpsURI;85String http2URI;86String https2URI;87static final long start = System.nanoTime();88public static String now() {89long now = System.nanoTime() - start;90long secs = now / 1000_000_000;91long mill = (now % 1000_000_000) / 1000_000;92long nan = now % 1000_000;93return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);94}9596@DataProvider(name = "uris")97public Object[][] variants() {98return new Object[][]{99{ httpURI },100{ httpsURI },101{ http2URI },102{ https2URI },103};104}105106static final Class<NullPointerException> NPE = NullPointerException.class;107108@Test109public void testNull() {110System.out.printf(now() + "testNull() starting%n");111assertThrows(NPE, () -> BodyHandlers.fromSubscriber(null));112assertThrows(NPE, () -> BodyHandlers.fromSubscriber(null, Function.identity()));113assertThrows(NPE, () -> BodyHandlers.fromSubscriber(new ListSubscriber(), null));114assertThrows(NPE, () -> BodyHandlers.fromSubscriber(null, null));115116assertThrows(NPE, () -> BodySubscribers.fromSubscriber(null));117assertThrows(NPE, () -> BodySubscribers.fromSubscriber(null, Function.identity()));118assertThrows(NPE, () -> BodySubscribers.fromSubscriber(new ListSubscriber(), null));119assertThrows(NPE, () -> BodySubscribers.fromSubscriber(null, null));120121Subscriber subscriber = BodySubscribers.fromSubscriber(new ListSubscriber());122assertThrows(NPE, () -> subscriber.onSubscribe(null));123assertThrows(NPE, () -> subscriber.onNext(null));124assertThrows(NPE, () -> subscriber.onError(null));125}126127// List<ByteBuffer>128129@Test(dataProvider = "uris")130void testListWithFinisher(String url) {131System.out.printf(now() + "testListWithFinisher(%s) starting%n", url);132HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();133HttpRequest request = HttpRequest.newBuilder(URI.create(url))134.POST(BodyPublishers.ofString("May the luck of the Irish be with you!")).build();135136ListSubscriber subscriber = new ListSubscriber();137HttpResponse<String> response = client.sendAsync(request,138BodyHandlers.fromSubscriber(subscriber, Supplier::get)).join();139String text = response.body();140System.out.println(text);141assertEquals(response.statusCode(), 200);142assertEquals(text, "May the luck of the Irish be with you!");143}144145@Test(dataProvider = "uris")146void testListWithoutFinisher(String url) {147System.out.printf(now() + "testListWithoutFinisher(%s) starting%n", url);148HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();149HttpRequest request = HttpRequest.newBuilder(URI.create(url))150.POST(BodyPublishers.ofString("May the luck of the Irish be with you!")).build();151152ListSubscriber subscriber = new ListSubscriber();153HttpResponse<Void> response = client.sendAsync(request,154BodyHandlers.fromSubscriber(subscriber)).join();155String text = subscriber.get();156System.out.println(text);157assertEquals(response.statusCode(), 200);158assertEquals(text, "May the luck of the Irish be with you!");159}160161@Test(dataProvider = "uris")162void testListWithFinisherBlocking(String url) throws Exception {163System.out.printf(now() + "testListWithFinisherBlocking(%s) starting%n", url);164HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();165HttpRequest request = HttpRequest.newBuilder(URI.create(url))166.POST(BodyPublishers.ofString("May the luck of the Irish be with you!")).build();167168ListSubscriber subscriber = new ListSubscriber();169HttpResponse<String> response = client.send(request,170BodyHandlers.fromSubscriber(subscriber, Supplier::get));171String text = response.body();172System.out.println(text);173assertEquals(response.statusCode(), 200);174assertEquals(text, "May the luck of the Irish be with you!");175}176177@Test(dataProvider = "uris")178void testListWithoutFinisherBlocking(String url) throws Exception {179System.out.printf(now() + "testListWithoutFinisherBlocking(%s) starting%n", url);180HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();181HttpRequest request = HttpRequest.newBuilder(URI.create(url))182.POST(BodyPublishers.ofString("May the luck of the Irish be with you!")).build();183184ListSubscriber subscriber = new ListSubscriber();185HttpResponse<Void> response = client.send(request,186BodyHandlers.fromSubscriber(subscriber));187String text = subscriber.get();188System.out.println(text);189assertEquals(response.statusCode(), 200);190assertEquals(text, "May the luck of the Irish be with you!");191}192193// Collection<ByteBuffer>194195@Test(dataProvider = "uris")196void testCollectionWithFinisher(String url) {197System.out.printf(now() + "testCollectionWithFinisher(%s) starting%n", url);198HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();199HttpRequest request = HttpRequest.newBuilder(URI.create(url))200.POST(BodyPublishers.ofString("What's the craic?")).build();201202CollectionSubscriber subscriber = new CollectionSubscriber();203HttpResponse<String> response = client.sendAsync(request,204BodyHandlers.fromSubscriber(subscriber, CollectionSubscriber::get)).join();205String text = response.body();206System.out.println(text);207assertEquals(response.statusCode(), 200);208assertEquals(text, "What's the craic?");209}210211@Test(dataProvider = "uris")212void testCollectionWithoutFinisher(String url) {213System.out.printf(now() + "testCollectionWithoutFinisher(%s) starting%n", url);214HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();215HttpRequest request = HttpRequest.newBuilder(URI.create(url))216.POST(BodyPublishers.ofString("What's the craic?")).build();217218CollectionSubscriber subscriber = new CollectionSubscriber();219HttpResponse<Void> response = client.sendAsync(request,220BodyHandlers.fromSubscriber(subscriber)).join();221String text = subscriber.get();222System.out.println(text);223assertEquals(response.statusCode(), 200);224assertEquals(text, "What's the craic?");225}226227@Test(dataProvider = "uris")228void testCollectionWithFinisherBlocking(String url) throws Exception {229System.out.printf(now() + "testCollectionWithFinisherBlocking(%s) starting%n", url);230HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();231HttpRequest request = HttpRequest.newBuilder(URI.create(url))232.POST(BodyPublishers.ofString("What's the craic?")).build();233234CollectionSubscriber subscriber = new CollectionSubscriber();235HttpResponse<String> response = client.send(request,236BodyHandlers.fromSubscriber(subscriber, CollectionSubscriber::get));237String text = response.body();238System.out.println(text);239assertEquals(response.statusCode(), 200);240assertEquals(text, "What's the craic?");241}242243@Test(dataProvider = "uris")244void testCollectionWithoutFinisheBlocking(String url) throws Exception {245System.out.printf(now() + "testCollectionWithoutFinisheBlocking(%s) starting%n", url);246HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();247HttpRequest request = HttpRequest.newBuilder(URI.create(url))248.POST(BodyPublishers.ofString("What's the craic?")).build();249250CollectionSubscriber subscriber = new CollectionSubscriber();251HttpResponse<Void> response = client.send(request,252BodyHandlers.fromSubscriber(subscriber));253String text = subscriber.get();254System.out.println(text);255assertEquals(response.statusCode(), 200);256assertEquals(text, "What's the craic?");257}258259// Iterable<ByteBuffer>260261@Test(dataProvider = "uris")262void testIterableWithFinisher(String url) {263System.out.printf(now() + "testIterableWithFinisher(%s) starting%n", url);264HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();265HttpRequest request = HttpRequest.newBuilder(URI.create(url))266.POST(BodyPublishers.ofString("We're sucking diesel now!")).build();267268IterableSubscriber subscriber = new IterableSubscriber();269HttpResponse<String> response = client.sendAsync(request,270BodyHandlers.fromSubscriber(subscriber, Supplier::get)).join();271String text = response.body();272System.out.println(text);273assertEquals(response.statusCode(), 200);274assertEquals(text, "We're sucking diesel now!");275}276277@Test(dataProvider = "uris")278void testIterableWithoutFinisher(String url) {279System.out.printf(now() + "testIterableWithoutFinisher(%s) starting%n", url);280HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();281HttpRequest request = HttpRequest.newBuilder(URI.create(url))282.POST(BodyPublishers.ofString("We're sucking diesel now!")).build();283284IterableSubscriber subscriber = new IterableSubscriber();285HttpResponse<Void> response = client.sendAsync(request,286BodyHandlers.fromSubscriber(subscriber)).join();287String text = subscriber.get();288System.out.println(text);289assertEquals(response.statusCode(), 200);290assertEquals(text, "We're sucking diesel now!");291}292293@Test(dataProvider = "uris")294void testIterableWithFinisherBlocking(String url) throws Exception {295System.out.printf(now() + "testIterableWithFinisherBlocking(%s) starting%n", url);296HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();297HttpRequest request = HttpRequest.newBuilder(URI.create(url))298.POST(BodyPublishers.ofString("We're sucking diesel now!")).build();299300IterableSubscriber subscriber = new IterableSubscriber();301HttpResponse<String> response = client.send(request,302BodyHandlers.fromSubscriber(subscriber, Supplier::get));303String text = response.body();304System.out.println(text);305assertEquals(response.statusCode(), 200);306assertEquals(text, "We're sucking diesel now!");307}308309@Test(dataProvider = "uris")310void testIterableWithoutFinisherBlocking(String url) throws Exception {311System.out.printf(now() + "testIterableWithoutFinisherBlocking(%s) starting%n", url);312HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();313HttpRequest request = HttpRequest.newBuilder(URI.create(url))314.POST(BodyPublishers.ofString("We're sucking diesel now!")).build();315316IterableSubscriber subscriber = new IterableSubscriber();317HttpResponse<Void> response = client.send(request,318BodyHandlers.fromSubscriber(subscriber));319String text = subscriber.get();320System.out.println(text);321assertEquals(response.statusCode(), 200);322assertEquals(text, "We're sucking diesel now!");323}324325// Subscriber<Object>326327@Test(dataProvider = "uris")328void testObjectWithFinisher(String url) {329System.out.printf(now() + "testObjectWithFinisher(%s) starting%n", url);330HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();331HttpRequest request = HttpRequest.newBuilder(URI.create(url))332.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();333334ObjectSubscriber subscriber = new ObjectSubscriber();335HttpResponse<String> response = client.sendAsync(request,336BodyHandlers.fromSubscriber(subscriber, ObjectSubscriber::get)).join();337String text = response.body();338System.out.println(text);339assertEquals(response.statusCode(), 200);340assertTrue(text.length() != 0); // what else can be asserted!341}342343@Test(dataProvider = "uris")344void testObjectWithoutFinisher(String url) {345System.out.printf(now() + "testObjectWithoutFinisher(%s) starting%n", url);346HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();347HttpRequest request = HttpRequest.newBuilder(URI.create(url))348.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();349350ObjectSubscriber subscriber = new ObjectSubscriber();351HttpResponse<Void> response = client.sendAsync(request,352BodyHandlers.fromSubscriber(subscriber)).join();353String text = subscriber.get();354System.out.println(text);355assertEquals(response.statusCode(), 200);356assertTrue(text.length() != 0); // what else can be asserted!357}358359@Test(dataProvider = "uris")360void testObjectWithFinisherBlocking(String url) throws Exception {361System.out.printf(now() + "testObjectWithFinisherBlocking(%s) starting%n", url);362HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();363HttpRequest request = HttpRequest.newBuilder(URI.create(url))364.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();365366ObjectSubscriber subscriber = new ObjectSubscriber();367HttpResponse<String> response = client.send(request,368BodyHandlers.fromSubscriber(subscriber, ObjectSubscriber::get));369String text = response.body();370System.out.println(text);371assertEquals(response.statusCode(), 200);372assertTrue(text.length() != 0); // what else can be asserted!373}374375@Test(dataProvider = "uris")376void testObjectWithoutFinisherBlocking(String url) throws Exception {377System.out.printf(now() + "testObjectWithoutFinisherBlocking(%s) starting%n", url);378HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();379HttpRequest request = HttpRequest.newBuilder(URI.create(url))380.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();381382ObjectSubscriber subscriber = new ObjectSubscriber();383HttpResponse<Void> response = client.send(request,384BodyHandlers.fromSubscriber(subscriber));385String text = subscriber.get();386System.out.println(text);387assertEquals(response.statusCode(), 200);388assertTrue(text.length() != 0); // what else can be asserted!389}390391392// -- mapping using convenience handlers393394@Test(dataProvider = "uris")395void mappingFromByteArray(String url) throws Exception {396System.out.printf(now() + "mappingFromByteArray(%s) starting%n", url);397HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();398HttpRequest request = HttpRequest.newBuilder(URI.create(url))399.POST(BodyPublishers.ofString("We're sucking diesel now!")).build();400401client.sendAsync(request, BodyHandlers.fromSubscriber(BodySubscribers.ofByteArray(),402bas -> new String(bas.getBody().toCompletableFuture().join(), UTF_8)))403.thenApply(FlowAdapterSubscriberTest::assert200ResponseCode)404.thenApply(HttpResponse::body)405.thenAccept(body -> assertEquals(body, "We're sucking diesel now!"))406.join();407}408409@Test(dataProvider = "uris")410void mappingFromInputStream(String url) throws Exception {411System.out.printf(now() + "mappingFromInputStream(%s) starting%n", url);412HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();413HttpRequest request = HttpRequest.newBuilder(URI.create(url))414.POST(BodyPublishers.ofString("May the wind always be at your back.")).build();415416client.sendAsync(request, BodyHandlers.fromSubscriber(BodySubscribers.ofInputStream(),417ins -> {418InputStream is = ins.getBody().toCompletableFuture().join();419return new String(uncheckedReadAllBytes(is), UTF_8); } ))420.thenApply(FlowAdapterSubscriberTest::assert200ResponseCode)421.thenApply(HttpResponse::body)422.thenAccept(body -> assertEquals(body, "May the wind always be at your back."))423.join();424}425426/** An abstract Subscriber that converts all received data into a String. */427static abstract class AbstractSubscriber implements Supplier<String> {428protected volatile Flow.Subscription subscription;429protected volatile ByteArrayOutputStream baos = new ByteArrayOutputStream();430protected volatile String text;431432public void onSubscribe(Flow.Subscription subscription) {433this.subscription = subscription;434subscription.request(Long.MAX_VALUE);435}436public void onError(Throwable throwable) {437throw new RuntimeException(throwable);438}439public void onComplete() {440text = new String(baos.toByteArray(), UTF_8);441}442@Override public String get() { return text; }443}444445static class ListSubscriber extends AbstractSubscriber446implements Flow.Subscriber<List<ByteBuffer>>, Supplier<String>447{448@Override public void onNext(List<ByteBuffer> item) {449for (ByteBuffer bb : item) {450byte[] ba = new byte[bb.remaining()];451bb.get(ba);452uncheckedWrite(baos, ba);453}454}455}456457static class CollectionSubscriber extends AbstractSubscriber458implements Flow.Subscriber<Collection<ByteBuffer>>, Supplier<String>459{460@Override public void onNext(Collection<ByteBuffer> item) {461for (ByteBuffer bb : item) {462byte[] ba = new byte[bb.remaining()];463bb.get(ba);464uncheckedWrite(baos, ba);465}466}467}468469static class IterableSubscriber extends AbstractSubscriber470implements Flow.Subscriber<Iterable<ByteBuffer>>, Supplier<String>471{472@Override public void onNext(Iterable<ByteBuffer> item) {473for (ByteBuffer bb : item) {474byte[] ba = new byte[bb.remaining()];475bb.get(ba);476uncheckedWrite(baos, ba);477}478}479}480481static class ObjectSubscriber extends AbstractSubscriber482implements Flow.Subscriber<Object>, Supplier<String>483{484@Override public void onNext(Object item) {485// What can anyone do with Object, cast or toString it ?486uncheckedWrite(baos, item.toString().getBytes(UTF_8));487}488}489490static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {491try {492baos.write(ba);493} catch (IOException e) {494throw new UncheckedIOException(e);495}496}497498static byte[] uncheckedReadAllBytes(InputStream is) {499try {500return is.readAllBytes();501} catch (IOException e) {502throw new UncheckedIOException(e);503}504}505506static final <T> HttpResponse<T> assert200ResponseCode(HttpResponse<T> response) {507assertEquals(response.statusCode(), 200);508return response;509}510511static String serverAuthority(HttpServer server) {512return InetAddress.getLoopbackAddress().getHostName() + ":"513+ server.getAddress().getPort();514}515516@BeforeTest517public void setup() throws Exception {518sslContext = new SimpleSSLContext().get();519if (sslContext == null)520throw new AssertionError("Unexpected null sslContext");521522InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);523httpTestServer = HttpServer.create(sa, 0);524httpTestServer.createContext("/http1/echo", new Http1EchoHandler());525httpURI = "http://" + serverAuthority(httpTestServer) + "/http1/echo";526527httpsTestServer = HttpsServer.create(sa, 0);528httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));529httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());530httpsURI = "https://" + serverAuthority(httpsTestServer) + "/https1/echo";531532http2TestServer = new Http2TestServer("localhost", false, 0);533http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");534http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo";535536https2TestServer = new Http2TestServer("localhost", true, sslContext);537https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");538https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo";539540httpTestServer.start();541httpsTestServer.start();542http2TestServer.start();543https2TestServer.start();544}545546@AfterTest547public void teardown() throws Exception {548httpTestServer.stop(0);549httpsTestServer.stop(0);550http2TestServer.stop();551https2TestServer.stop();552}553554static class Http1EchoHandler implements HttpHandler {555@Override556public void handle(HttpExchange t) throws IOException {557try (InputStream is = t.getRequestBody();558OutputStream os = t.getResponseBody()) {559byte[] bytes = is.readAllBytes();560t.sendResponseHeaders(200, bytes.length);561os.write(bytes);562}563}564}565566static class Http2EchoHandler implements Http2Handler {567@Override568public void handle(Http2TestExchange t) throws IOException {569try (InputStream is = t.getRequestBody();570OutputStream os = t.getResponseBody()) {571byte[] bytes = is.readAllBytes();572t.sendResponseHeaders(200, bytes.length);573os.write(bytes);574}575}576}577}578579580