Path: blob/master/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java
41149 views
/*1* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.io.IOException;24import java.io.InputStream;25import java.io.OutputStream;26import java.net.InetAddress;27import java.net.InetSocketAddress;28import java.net.URI;29import java.nio.ByteBuffer;30import java.nio.MappedByteBuffer;31import java.util.Arrays;32import java.util.concurrent.Flow;33import java.util.concurrent.Flow.Publisher;34import java.util.concurrent.atomic.AtomicBoolean;35import java.util.concurrent.atomic.AtomicInteger;36import java.util.concurrent.atomic.AtomicLong;37import com.sun.net.httpserver.HttpExchange;38import com.sun.net.httpserver.HttpHandler;39import com.sun.net.httpserver.HttpServer;40import com.sun.net.httpserver.HttpsConfigurator;41import com.sun.net.httpserver.HttpsServer;42import java.net.http.HttpClient;43import java.net.http.HttpRequest;44import java.net.http.HttpResponse;45import jdk.test.lib.net.SimpleSSLContext;46import org.testng.annotations.AfterTest;47import org.testng.annotations.BeforeTest;48import org.testng.annotations.DataProvider;49import org.testng.annotations.Test;50import javax.net.ssl.SSLContext;51import static java.util.stream.Collectors.joining;52import static java.nio.charset.StandardCharsets.UTF_8;53import static java.net.http.HttpRequest.BodyPublishers.fromPublisher;54import static java.net.http.HttpResponse.BodyHandlers.ofString;55import static org.testng.Assert.assertEquals;56import static org.testng.Assert.assertThrows;57import static org.testng.Assert.assertTrue;58import static org.testng.Assert.fail;5960/*61* @test62* @summary Basic tests for Flow adapter Publishers63* @modules java.base/sun.net.www.http64* java.net.http/jdk.internal.net.http.common65* java.net.http/jdk.internal.net.http.frame66* java.net.http/jdk.internal.net.http.hpack67* java.logging68* jdk.httpserver69* @library /test/lib http2/server70* @build Http2TestServer71* @build jdk.test.lib.net.SimpleSSLContext72* @run testng/othervm FlowAdapterPublisherTest73*/7475public class FlowAdapterPublisherTest {7677SSLContext sslContext;78HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]79HttpsServer httpsTestServer; // HTTPS/1.180Http2TestServer http2TestServer; // HTTP/2 ( h2c )81Http2TestServer https2TestServer; // HTTP/2 ( h2 )82String httpURI;83String httpsURI;84String http2URI;85String https2URI;8687@DataProvider(name = "uris")88public Object[][] variants() {89return new Object[][]{90{ httpURI },91{ httpsURI },92{ http2URI },93{ https2URI },94};95}9697static final Class<NullPointerException> NPE = NullPointerException.class;98static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;99100@Test101public void testAPIExceptions() {102assertThrows(NPE, () -> fromPublisher(null));103assertThrows(NPE, () -> fromPublisher(null, 1));104assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0));105assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1));106assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE));107108Publisher publisher = fromPublisher(new BBPublisher());109assertThrows(NPE, () -> publisher.subscribe(null));110}111112// Flow.Publisher<ByteBuffer>113114@Test(dataProvider = "uris")115void testByteBufferPublisherUnknownLength(String url) {116String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",117"when the ", "rain gets ", "warmer." };118HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();119HttpRequest request = HttpRequest.newBuilder(URI.create(url))120.POST(fromPublisher(new BBPublisher(body))).build();121122HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();123String text = response.body();124System.out.println(text);125assertEquals(response.statusCode(), 200);126assertEquals(text, Arrays.stream(body).collect(joining()));127}128129@Test(dataProvider = "uris")130void testByteBufferPublisherFixedLength(String url) {131String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",132"when the ", "rain gets ", "warmer." };133int cl = Arrays.stream(body).mapToInt(String::length).sum();134HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();135HttpRequest request = HttpRequest.newBuilder(URI.create(url))136.POST(fromPublisher(new BBPublisher(body), cl)).build();137138HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();139String text = response.body();140System.out.println(text);141assertEquals(response.statusCode(), 200);142assertEquals(text, Arrays.stream(body).collect(joining()));143}144145// Flow.Publisher<MappedByteBuffer>146147@Test(dataProvider = "uris")148void testMappedByteBufferPublisherUnknownLength(String url) {149String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",150"Irish from ", "ruling the ", "world." };151HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();152HttpRequest request = HttpRequest.newBuilder(URI.create(url))153.POST(fromPublisher(new MBBPublisher(body))).build();154155HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();156String text = response.body();157System.out.println(text);158assertEquals(response.statusCode(), 200);159assertEquals(text, Arrays.stream(body).collect(joining()));160}161162@Test(dataProvider = "uris")163void testMappedByteBufferPublisherFixedLength(String url) {164String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",165"Irish from ", "ruling the ", "world." };166int cl = Arrays.stream(body).mapToInt(String::length).sum();167HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();168HttpRequest request = HttpRequest.newBuilder(URI.create(url))169.POST(fromPublisher(new MBBPublisher(body), cl)).build();170171HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();172String text = response.body();173System.out.println(text);174assertEquals(response.statusCode(), 200);175assertEquals(text, Arrays.stream(body).collect(joining()));176}177178// The following two tests depend on Exception detail messages, which is179// not ideal, but necessary to discern correct behavior. They should be180// updated if the exception message is updated.181182@Test(dataProvider = "uris")183void testPublishTooFew(String url) throws InterruptedException {184String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",185"when the ", "rain gets ", "warmer." };186int cl = Arrays.stream(body).mapToInt(String::length).sum() + 1; // length + 1187HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();188HttpRequest request = HttpRequest.newBuilder(URI.create(url))189.POST(fromPublisher(new BBPublisher(body), cl)).build();190191try {192HttpResponse<String> response = client.send(request, ofString(UTF_8));193fail("Unexpected response: " + response);194} catch (IOException expected) {195assertMessage(expected, "Too few bytes returned");196}197}198199@Test(dataProvider = "uris")200void testPublishTooMany(String url) throws InterruptedException {201String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",202"when the ", "rain gets ", "warmer." };203int cl = Arrays.stream(body).mapToInt(String::length).sum() - 1; // length - 1204HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();205HttpRequest request = HttpRequest.newBuilder(URI.create(url))206.POST(fromPublisher(new BBPublisher(body), cl)).build();207208try {209HttpResponse<String> response = client.send(request, ofString(UTF_8));210fail("Unexpected response: " + response);211} catch (IOException expected) {212assertMessage(expected, "Too many bytes in request body");213}214}215216private void assertMessage(Throwable t, String contains) {217if (!t.getMessage().contains(contains)) {218String error = "Exception message:[" + t.toString() + "] doesn't contain [" + contains + "]";219throw new AssertionError(error, t);220}221}222223static class BBPublisher extends AbstractPublisher224implements Flow.Publisher<ByteBuffer>225{226BBPublisher(String... bodyParts) { super(bodyParts); }227228@Override229public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {230this.subscriber = subscriber;231subscriber.onSubscribe(new InternalSubscription());232}233}234235static class MBBPublisher extends AbstractPublisher236implements Flow.Publisher<MappedByteBuffer>237{238MBBPublisher(String... bodyParts) { super(bodyParts); }239240@Override241public void subscribe(Flow.Subscriber<? super MappedByteBuffer> subscriber) {242this.subscriber = subscriber;243subscriber.onSubscribe(new InternalSubscription());244}245}246247static abstract class AbstractPublisher {248private final String[] bodyParts;249protected volatile Flow.Subscriber subscriber;250251AbstractPublisher(String... bodyParts) {252this.bodyParts = bodyParts;253}254255class InternalSubscription implements Flow.Subscription {256257private final AtomicLong demand = new AtomicLong();258private final AtomicBoolean cancelled = new AtomicBoolean();259private volatile int position;260261private static final int IDLE = 1;262private static final int PUSHING = 2;263private static final int AGAIN = 4;264private final AtomicInteger state = new AtomicInteger(IDLE);265266@Override267public void request(long n) {268if (n <= 0L) {269subscriber.onError(new IllegalArgumentException(270"non-positive subscription request"));271return;272}273if (cancelled.get()) {274return;275}276277while (true) {278long prev = demand.get(), d;279if ((d = prev + n) < prev) // saturate280d = Long.MAX_VALUE;281if (demand.compareAndSet(prev, d))282break;283}284285while (true) {286int s = state.get();287if (s == IDLE) {288if (state.compareAndSet(IDLE, PUSHING)) {289while (true) {290push();291if (state.compareAndSet(PUSHING, IDLE))292return;293else if (state.compareAndSet(AGAIN, PUSHING))294continue;295}296}297} else if (s == PUSHING) {298if (state.compareAndSet(PUSHING, AGAIN))299return;300} else if (s == AGAIN){301// do nothing, the pusher will already rerun302return;303} else {304throw new AssertionError("Unknown state:" + s);305}306}307}308309private void push() {310long prev;311while ((prev = demand.get()) > 0) {312if (!demand.compareAndSet(prev, prev -1))313continue;314315int index = position;316if (index < bodyParts.length) {317position++;318subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8)));319}320}321322if (position == bodyParts.length && !cancelled.get()) {323cancelled.set(true);324subscriber.onComplete();325}326}327328@Override329public void cancel() {330if (cancelled.compareAndExchange(false, true))331return; // already cancelled332}333}334}335336static String serverAuthority(HttpServer server) {337return InetAddress.getLoopbackAddress().getHostName() + ":"338+ server.getAddress().getPort();339}340341@BeforeTest342public void setup() throws Exception {343sslContext = new SimpleSSLContext().get();344if (sslContext == null)345throw new AssertionError("Unexpected null sslContext");346347InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(),0);348httpTestServer = HttpServer.create(sa, 0);349httpTestServer.createContext("/http1/echo", new Http1EchoHandler());350httpURI = "http://" + serverAuthority(httpTestServer) + "/http1/echo";351352httpsTestServer = HttpsServer.create(sa, 0);353httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));354httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());355httpsURI = "https://" + serverAuthority(httpsTestServer) + "/https1/echo";356357http2TestServer = new Http2TestServer("localhost", false, 0);358http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");359http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo";360361https2TestServer = new Http2TestServer("localhost", true, sslContext);362https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");363https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo";364365httpTestServer.start();366httpsTestServer.start();367http2TestServer.start();368https2TestServer.start();369}370371@AfterTest372public void teardown() throws Exception {373httpTestServer.stop(0);374httpsTestServer.stop(0);375http2TestServer.stop();376https2TestServer.stop();377}378379static class Http1EchoHandler implements HttpHandler {380@Override381public void handle(HttpExchange t) throws IOException {382try (InputStream is = t.getRequestBody();383OutputStream os = t.getResponseBody()) {384byte[] bytes = is.readAllBytes();385t.sendResponseHeaders(200, bytes.length);386os.write(bytes);387}388}389}390391static class Http2EchoHandler implements Http2Handler {392@Override393public void handle(Http2TestExchange t) throws IOException {394try (InputStream is = t.getRequestBody();395OutputStream os = t.getResponseBody()) {396byte[] bytes = is.readAllBytes();397t.sendResponseHeaders(200, bytes.length);398os.write(bytes);399}400}401}402}403404405