Path: blob/master/test/jdk/java/net/httpclient/CustomRequestPublisher.java
41149 views
/*1* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24* @test25* @summary Checks correct handling of Publishers that call onComplete without demand26* @modules java.base/sun.net.www.http27* java.net.http/jdk.internal.net.http.common28* java.net.http/jdk.internal.net.http.frame29* java.net.http/jdk.internal.net.http.hpack30* java.logging31* jdk.httpserver32* @library /test/lib http2/server33* @build Http2TestServer34* @build jdk.test.lib.net.SimpleSSLContext35* @run testng/othervm CustomRequestPublisher36*/3738import com.sun.net.httpserver.HttpExchange;39import com.sun.net.httpserver.HttpHandler;40import com.sun.net.httpserver.HttpServer;41import com.sun.net.httpserver.HttpsConfigurator;42import com.sun.net.httpserver.HttpsServer;43import java.io.IOException;44import java.io.InputStream;45import java.io.OutputStream;46import java.net.InetAddress;47import java.net.InetSocketAddress;48import java.net.URI;49import java.nio.ByteBuffer;50import java.util.Arrays;51import java.util.Optional;52import java.util.concurrent.CompletableFuture;53import java.util.concurrent.Flow;54import java.util.concurrent.atomic.AtomicBoolean;55import java.util.concurrent.atomic.AtomicInteger;56import java.util.concurrent.atomic.AtomicLong;57import java.util.function.Supplier;58import java.util.stream.Collectors;59import javax.net.ssl.SSLContext;60import javax.net.ssl.SSLSession;61import java.net.http.HttpClient;62import java.net.http.HttpRequest;63import java.net.http.HttpResponse;64import jdk.test.lib.net.SimpleSSLContext;65import org.testng.annotations.AfterTest;66import org.testng.annotations.BeforeTest;67import org.testng.annotations.DataProvider;68import org.testng.annotations.Test;69import static java.lang.System.out;70import static java.net.http.HttpClient.Version.HTTP_1_1;71import static java.net.http.HttpClient.Version.HTTP_2;72import static java.nio.charset.StandardCharsets.US_ASCII;73import static java.net.http.HttpResponse.BodyHandlers.ofString;74import static org.testng.Assert.assertEquals;75import static org.testng.Assert.assertTrue;76import static org.testng.Assert.fail;7778public class CustomRequestPublisher {7980SSLContext sslContext;81HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]82HttpsServer httpsTestServer; // HTTPS/1.183Http2TestServer http2TestServer; // HTTP/2 ( h2c )84Http2TestServer https2TestServer; // HTTP/2 ( h2 )85String httpURI;86String httpsURI;87String http2URI;88String https2URI;8990@DataProvider(name = "variants")91public Object[][] variants() {92Supplier<BodyPublisher> fixedSupplier = () -> new FixedLengthBodyPublisher();93Supplier<BodyPublisher> unknownSupplier = () -> new UnknownLengthBodyPublisher();9495return new Object[][]{96{ httpURI, fixedSupplier, false },97{ httpURI, unknownSupplier, false },98{ httpsURI, fixedSupplier, false },99{ httpsURI, unknownSupplier, false },100{ http2URI, fixedSupplier, false },101{ http2URI, unknownSupplier, false },102{ https2URI, fixedSupplier, false,},103{ https2URI, unknownSupplier, false },104105{ httpURI, fixedSupplier, true },106{ httpURI, unknownSupplier, true },107{ httpsURI, fixedSupplier, true },108{ httpsURI, unknownSupplier, true },109{ http2URI, fixedSupplier, true },110{ http2URI, unknownSupplier, true },111{ https2URI, fixedSupplier, true,},112{ https2URI, unknownSupplier, true },113};114}115116static final int ITERATION_COUNT = 10;117118/** Asserts HTTP Version, and SSLSession presence when applicable. */119static void assertVersionAndSession(HttpResponse response, String uri) {120if (uri.contains("http2") || uri.contains("https2"))121assertEquals(response.version(), HTTP_2);122else if (uri.contains("http1") || uri.contains("https1"))123assertEquals(response.version(), HTTP_1_1);124else125fail("Unknown HTTP version in test for: " + uri);126127Optional<SSLSession> ssl = response.sslSession();128if (uri.contains("https")) {129assertTrue(ssl.isPresent(),130"Expected optional containing SSLSession but got:" + ssl);131try {132ssl.get().invalidate();133fail("SSLSession is not immutable: " + ssl.get());134} catch (UnsupportedOperationException expected) { }135} else {136assertTrue(!ssl.isPresent(), "UNEXPECTED non-empty optional:" + ssl);137}138}139140@Test(dataProvider = "variants")141void test(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient)142throws Exception143{144HttpClient client = null;145for (int i=0; i< ITERATION_COUNT; i++) {146if (!sameClient || client == null)147client = HttpClient.newBuilder().sslContext(sslContext).build();148149BodyPublisher bodyPublisher = bpSupplier.get();150HttpRequest request = HttpRequest.newBuilder(URI.create(uri))151.POST(bodyPublisher)152.build();153154HttpResponse<String> resp = client.send(request, ofString());155156out.println("Got response: " + resp);157out.println("Got body: " + resp.body());158assertTrue(resp.statusCode() == 200,159"Expected 200, got:" + resp.statusCode());160assertEquals(resp.body(), bodyPublisher.bodyAsString());161162assertVersionAndSession(resp, uri);163}164}165166@Test(dataProvider = "variants")167void testAsync(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient)168throws Exception169{170HttpClient client = null;171for (int i=0; i< ITERATION_COUNT; i++) {172if (!sameClient || client == null)173client = HttpClient.newBuilder().sslContext(sslContext).build();174175BodyPublisher bodyPublisher = bpSupplier.get();176HttpRequest request = HttpRequest.newBuilder(URI.create(uri))177.POST(bodyPublisher)178.build();179180CompletableFuture<HttpResponse<String>> cf = client.sendAsync(request, ofString());181HttpResponse<String> resp = cf.get();182183out.println("Got response: " + resp);184out.println("Got body: " + resp.body());185assertTrue(resp.statusCode() == 200,186"Expected 200, got:" + resp.statusCode());187assertEquals(resp.body(), bodyPublisher.bodyAsString());188189assertVersionAndSession(resp, uri);190}191}192193/** A Publisher that returns an UNKNOWN content length. */194static class UnknownLengthBodyPublisher extends BodyPublisher {195@Override196public long contentLength() {197return -1; // unknown198}199}200201/** A Publisher that returns a FIXED content length. */202static class FixedLengthBodyPublisher extends BodyPublisher {203final int LENGTH = Arrays.stream(BODY)204.mapToInt(s-> s.getBytes(US_ASCII).length)205.sum();206@Override207public long contentLength() {208return LENGTH;209}210}211212/**213* A Publisher that ( quite correctly ) invokes onComplete, after the last214* item has been published, even without any outstanding demand.215*/216static abstract class BodyPublisher implements HttpRequest.BodyPublisher {217218String[] BODY = new String[]219{ "Say ", "Hello ", "To ", "My ", "Little ", "Friend" };220221protected volatile Flow.Subscriber subscriber;222223@Override224public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {225this.subscriber = subscriber;226subscriber.onSubscribe(new InternalSubscription());227}228229@Override230public abstract long contentLength();231232String bodyAsString() {233return Arrays.stream(BODY).collect(Collectors.joining());234}235236class InternalSubscription implements Flow.Subscription {237238private final AtomicLong demand = new AtomicLong();239private final AtomicBoolean cancelled = new AtomicBoolean();240private volatile int position;241242private static final int IDLE = 1;243private static final int PUSHING = 2;244private static final int AGAIN = 4;245private final AtomicInteger state = new AtomicInteger(IDLE);246247@Override248public void request(long n) {249if (n <= 0L) {250subscriber.onError(new IllegalArgumentException(251"non-positive subscription request"));252return;253}254if (cancelled.get()) {255return;256}257258while (true) {259long prev = demand.get(), d;260if ((d = prev + n) < prev) // saturate261d = Long.MAX_VALUE;262if (demand.compareAndSet(prev, d))263break;264}265266while (true) {267int s = state.get();268if (s == IDLE) {269if (state.compareAndSet(IDLE, PUSHING)) {270while (true) {271push();272if (state.compareAndSet(PUSHING, IDLE))273return;274else if (state.compareAndSet(AGAIN, PUSHING))275continue;276}277}278} else if (s == PUSHING) {279if (state.compareAndSet(PUSHING, AGAIN))280return;281} else if (s == AGAIN){282// do nothing, the pusher will already rerun283return;284} else {285throw new AssertionError("Unknown state:" + s);286}287}288}289290private void push() {291long prev;292while ((prev = demand.get()) > 0) {293if (!demand.compareAndSet(prev, prev -1))294continue;295296int index = position;297if (index < BODY.length) {298position++;299subscriber.onNext(ByteBuffer.wrap(BODY[index].getBytes(US_ASCII)));300}301}302303if (position == BODY.length && !cancelled.get()) {304cancelled.set(true);305subscriber.onComplete(); // NOTE: onComplete without demand306}307}308309@Override310public void cancel() {311if (cancelled.compareAndExchange(false, true))312return; // already cancelled313}314}315}316317static String serverAuthority(HttpServer server) {318return InetAddress.getLoopbackAddress().getHostName() + ":"319+ server.getAddress().getPort();320}321322@BeforeTest323public void setup() throws Exception {324sslContext = new SimpleSSLContext().get();325if (sslContext == null)326throw new AssertionError("Unexpected null sslContext");327328InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);329httpTestServer = HttpServer.create(sa, 0);330httpTestServer.createContext("/http1/echo", new Http1EchoHandler());331httpURI = "http://" + serverAuthority(httpTestServer) + "/http1/echo";332333httpsTestServer = HttpsServer.create(sa, 0);334httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));335httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());336httpsURI = "https://" + serverAuthority(httpsTestServer) + "/https1/echo";337338http2TestServer = new Http2TestServer("localhost", false, 0);339http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");340http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo";341342https2TestServer = new Http2TestServer("localhost", true, sslContext);343https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");344https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo";345346httpTestServer.start();347httpsTestServer.start();348http2TestServer.start();349https2TestServer.start();350}351352@AfterTest353public void teardown() throws Exception {354httpTestServer.stop(0);355httpsTestServer.stop(0);356http2TestServer.stop();357https2TestServer.stop();358}359360static class Http1EchoHandler implements HttpHandler {361@Override362public void handle(HttpExchange t) throws IOException {363try (InputStream is = t.getRequestBody();364OutputStream os = t.getResponseBody()) {365byte[] bytes = is.readAllBytes();366t.sendResponseHeaders(200, bytes.length);367os.write(bytes);368}369}370}371372static class Http2EchoHandler implements Http2Handler {373@Override374public void handle(Http2TestExchange t) throws IOException {375try (InputStream is = t.getRequestBody();376OutputStream os = t.getResponseBody()) {377byte[] bytes = is.readAllBytes();378t.sendResponseHeaders(200, bytes.length);379os.write(bytes);380}381}382}383}384385386