Path: blob/master/test/jdk/java/net/httpclient/CancelledResponse.java
41149 views
/*1* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.net.http.HttpClient;24import java.net.http.HttpClient.Version;25import java.net.http.HttpHeaders;26import java.net.http.HttpRequest;27import java.net.http.HttpResponse;28import jdk.test.lib.net.SimpleSSLContext;2930import javax.net.ServerSocketFactory;31import javax.net.ssl.SSLContext;32import javax.net.ssl.SSLException;33import javax.net.ssl.SSLServerSocketFactory;34import java.io.IOException;35import java.net.SocketException;36import java.net.URI;37import java.nio.ByteBuffer;38import java.util.List;39import java.util.concurrent.CompletableFuture;40import java.util.concurrent.CompletionException;41import java.util.concurrent.CompletionStage;42import java.util.concurrent.ExecutionException;43import java.util.concurrent.Flow;44import java.util.concurrent.atomic.AtomicBoolean;45import java.util.concurrent.atomic.AtomicInteger;4647import java.net.http.HttpResponse.BodyHandler;48import java.net.http.HttpResponse.BodySubscriber;4950import static java.lang.String.format;51import static java.lang.System.out;52import static java.nio.charset.StandardCharsets.ISO_8859_1;5354/**55* @test56* @bug 808711257* @library /test/lib58* @modules java.net.http/jdk.internal.net.http.common59* @build jdk.test.lib.net.SimpleSSLContext60* @build MockServer ReferenceTracker61* @run main/othervm CancelledResponse62* @run main/othervm CancelledResponse SSL63*/6465/**66* Similar test to SplitResponse except that the client will cancel the response67* before receiving it fully.68*/69public class CancelledResponse {7071static String response(String body, boolean serverKeepalive) {72StringBuilder sb = new StringBuilder();73sb.append("HTTP/1.1 200 OK\r\n");74if (!serverKeepalive)75sb.append("Connection: Close\r\n");7677sb.append("Content-length: ")78.append(body.getBytes(ISO_8859_1).length)79.append("\r\n");80sb.append("\r\n");81sb.append(body);82return sb.toString();83}8485static final String responses[] = {86"Lorem ipsum dolor sit amet consectetur adipiscing elit,",87"sed do eiusmod tempor quis nostrud exercitation ullamco laboris nisi ut",88"aliquip ex ea commodo consequat."89};9091static final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;92final ServerSocketFactory factory;93final SSLContext context;94final boolean useSSL;95CancelledResponse(boolean useSSL) throws IOException {96this.useSSL = useSSL;97context = new SimpleSSLContext().get();98SSLContext.setDefault(context);99factory = useSSL ? SSLServerSocketFactory.getDefault()100: ServerSocketFactory.getDefault();101}102103public HttpClient newHttpClient() {104HttpClient client;105if (useSSL) {106client = HttpClient.newBuilder()107.sslContext(context)108.build();109} else {110client = HttpClient.newHttpClient();111}112return TRACKER.track(client);113}114115public static void main(String[] args) throws Exception {116boolean useSSL = false;117if (args != null && args.length == 1) {118useSSL = "SSL".equals(args[0]);119}120CancelledResponse sp = new CancelledResponse(useSSL);121122Throwable failed = null;123try {124for (Version version : Version.values()) {125for (boolean serverKeepalive : new boolean[]{true, false}) {126// Note: the mock server doesn't support Keep-Alive, but127// pretending that it might exercises code paths in and out of128// the connection pool, and retry logic129for (boolean async : new boolean[]{true, false}) {130sp.test(version, serverKeepalive, async);131}132}133}134} catch (Exception | Error t) {135failed = t;136throw t;137} finally {138Thread.sleep(100);139AssertionError trackFailed = TRACKER.check(500);140if (trackFailed != null) {141if (failed != null) {142failed.addSuppressed(trackFailed);143if (failed instanceof Error) throw (Error) failed;144if (failed instanceof Exception) throw (Exception) failed;145}146throw trackFailed;147}148}149}150151static class CancelException extends IOException {152}153154// @Test155void test(Version version, boolean serverKeepalive, boolean async)156throws Exception157{158out.println(format("*** version %s, serverKeepAlive: %s, async: %s ***",159version, serverKeepalive, async));160MockServer server = new MockServer(0, factory);161URI uri = new URI(server.getURL());162out.println("server is: " + uri);163server.start();164165HttpClient client = newHttpClient();166HttpRequest request = HttpRequest.newBuilder(uri).version(version).build();167try {168for (int i = 0; i < responses.length; i++) {169HttpResponse<String> r = null;170CompletableFuture<HttpResponse<String>> cf1;171CancelException expected = null;172AtomicBoolean cancelled = new AtomicBoolean();173174out.println("----- iteration " + i + " -----");175String body = responses[i];176Thread t = sendSplitResponse(response(body, serverKeepalive), server, cancelled);177178try {179if (async) {180out.println("send async: " + request);181cf1 = client.sendAsync(request, ofString(body, cancelled));182r = cf1.get();183} else { // sync184out.println("send sync: " + request);185r = client.send(request, ofString(body, cancelled));186}187} catch (CancelException c1) {188System.out.println("Got expected exception: " + c1);189expected = c1;190} catch (IOException | ExecutionException | CompletionException c2) {191Throwable c = c2;192while (c != null && !(c instanceof CancelException)) {193c = c.getCause();194}195if (c instanceof CancelException) {196System.out.println("Got expected exception: " + c);197expected = (CancelException)c;198} else throw c2;199}200if (r != null) {201if (r.statusCode() != 200)202throw new RuntimeException("Failed");203204String rxbody = r.body();205out.println("received " + rxbody);206if (!rxbody.equals(body))207throw new RuntimeException(format("Expected:%s, got:%s", body, rxbody));208}209t.join();210conn.close();211if (expected == null) {212throw new RuntimeException("Expected exception not raised for "213+ i + " cancelled=" + cancelled.get());214}215}216} finally {217server.close();218}219System.out.println("OK");220}221222static class CancellingSubscriber implements BodySubscriber<String> {223private final String expected;224private final CompletableFuture<String> result;225private Flow.Subscription subscription;226final AtomicInteger index = new AtomicInteger();227final AtomicBoolean cancelled;228CancellingSubscriber(String expected, AtomicBoolean cancelled) {229this.cancelled = cancelled;230this.expected = expected;231result = new CompletableFuture<>();232}233234@Override235public CompletionStage<String> getBody() {236return result;237}238239@Override240public void onSubscribe(Flow.Subscription subscription) {241this.subscription = subscription;242subscription.request(1);243}244245@Override246public void onNext(List<ByteBuffer> item) {247//if (result.isDone())248for (ByteBuffer b : item) {249while (b.hasRemaining() && !result.isDone()) {250int i = index.getAndIncrement();251char at = expected.charAt(i);252byte[] data = new byte[b.remaining()];253b.get(data); // we know that the server writes 1 char254String s = new String(data);255char c = s.charAt(0);256if (c != at) {257Throwable x = new IllegalStateException("char at "258+ i + " is '" + c + "' expected '"259+ at + "' for \"" + expected +"\"");260out.println("unexpected char received, cancelling");261subscription.cancel();262result.completeExceptionally(x);263return;264}265}266}267if (index.get() > 0 && !result.isDone()) {268// we should complete the result here, but let's269// see if we get something back...270out.println("Cancelling subscription after reading " + index.get());271cancelled.set(true);272subscription.cancel();273result.completeExceptionally(new CancelException());274return;275}276if (!result.isDone()) {277out.println("requesting 1 more");278subscription.request(1);279}280}281282@Override283public void onError(Throwable throwable) {284result.completeExceptionally(throwable);285}286287@Override288public void onComplete() {289int len = index.get();290if (len == expected.length()) {291result.complete(expected);292} else {293Throwable x = new IllegalStateException("received only "294+ len + " chars, expected " + expected.length()295+ " for \"" + expected +"\"");296result.completeExceptionally(x);297}298}299}300301static class CancellingHandler implements BodyHandler<String> {302final String expected;303final AtomicBoolean cancelled;304CancellingHandler(String expected, AtomicBoolean cancelled) {305this.expected = expected;306this.cancelled = cancelled;307}308@Override309public BodySubscriber<String> apply(HttpResponse.ResponseInfo rinfo) {310assert !cancelled.get();311return new CancellingSubscriber(expected, cancelled);312}313}314315BodyHandler<String> ofString(String expected, AtomicBoolean cancelled) {316return new CancellingHandler(expected, cancelled);317}318319// required for cleanup320volatile MockServer.Connection conn;321322// Sends the response, mostly, one byte at a time with a small delay323// between bytes, to encourage that each byte is read in a separate read324Thread sendSplitResponse(String s, MockServer server, AtomicBoolean cancelled) {325System.out.println("Sending: ");326Thread t = new Thread(() -> {327System.out.println("Waiting for server to receive headers");328conn = server.activity();329System.out.println("Start sending response");330int sent = 0;331try {332int len = s.length();333out.println("sending " + s);334for (int i = 0; i < len; i++) {335String onechar = s.substring(i, i + 1);336conn.send(onechar);337sent++;338Thread.sleep(10);339}340out.println("sent " + s);341} catch (SSLException | SocketException | RuntimeException x) {342// if SSL then we might get a "Broken Pipe", or a343// RuntimeException wrapping an InvalidAlgorithmParameterException344// (probably if the channel is closed during the handshake),345// otherwise we get a "Socket closed".346boolean expected = cancelled.get();347if (sent > 0 && expected) {348System.out.println("Connection closed by peer as expected: " + x);349return;350} else {351System.out.println("Unexpected exception (sent="352+ sent + ", cancelled=" + expected + "): " + x);353if (x instanceof RuntimeException) throw (RuntimeException) x;354throw new RuntimeException(x);355}356} catch (IOException | InterruptedException e) {357throw new RuntimeException(e);358}359});360t.setDaemon(true);361t.start();362return t;363}364}365366367