Path: blob/master/test/jdk/java/net/httpclient/BufferingSubscriberTest.java
41149 views
/*1* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.nio.ByteBuffer;24import java.util.List;25import java.util.Random;26import java.util.concurrent.CompletableFuture;27import java.util.concurrent.CompletionStage;28import java.util.concurrent.Executor;29import java.util.concurrent.ExecutorService;30import java.util.concurrent.Executors;31import java.util.concurrent.Flow;32import java.util.concurrent.Flow.Subscription;33import java.util.concurrent.SubmissionPublisher;34import java.util.function.BiConsumer;35import java.net.http.HttpResponse.BodyHandler;36import java.net.http.HttpResponse.BodyHandlers;37import java.net.http.HttpResponse.BodySubscriber;38import java.net.http.HttpResponse.BodySubscribers;39import jdk.test.lib.RandomFactory;40import org.testng.annotations.DataProvider;41import org.testng.annotations.Test;42import static java.lang.Long.MAX_VALUE;43import static java.lang.Long.min;44import static java.lang.System.out;45import static java.util.concurrent.CompletableFuture.delayedExecutor;46import static java.util.concurrent.TimeUnit.MILLISECONDS;47import static org.testng.Assert.*;4849/*50* @test51* @bug 818428552* @summary Direct test for HttpResponse.BodySubscriber.buffering() API53* @key randomness54* @library /test/lib55* @build jdk.test.lib.RandomFactory56* @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest57*/5859public class BufferingSubscriberTest {6061// If we compute that a test will take less that 10s62// we judge it acceptable63static final long LOWER_THRESHOLD = 10_000; // 10 sec.64// If we compute that a test will take more than 20 sec65// we judge it problematic: we will try to adjust the66// buffer sizes, and if we can't we will print a warning67static final long UPPER_THRESHOLD = 20_000; // 20 sec.6869static final Random random = RandomFactory.getRandom();70static final long start = System.nanoTime();71static final String START = "start";72static final String END = "end ";73static long elapsed() { return (System.nanoTime() - start)/1000_000;}74static void printStamp(String what, String fmt, Object... args) {75long elapsed = elapsed();76long sec = elapsed/1000;77long ms = elapsed % 1000;78String time = sec > 0 ? sec + "sec " : "";79time = time + ms + "ms";80out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args));81}82@DataProvider(name = "negatives")83public Object[][] negatives() {84return new Object[][] { { 0 }, { -1 }, { -1000 } };85}8687@Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)88public void subscriberThrowsIAE(int bufferSize) {89printStamp(START, "subscriberThrowsIAE(%d)", bufferSize);90try {91BodySubscriber<?> bp = BodySubscribers.ofByteArray();92BodySubscribers.buffering(bp, bufferSize);93} finally {94printStamp(END, "subscriberThrowsIAE(%d)", bufferSize);95}96}9798@Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)99public void handlerThrowsIAE(int bufferSize) {100printStamp(START, "handlerThrowsIAE(%d)", bufferSize);101try {102BodyHandler<?> bp = BodyHandlers.ofByteArray();103BodyHandlers.buffering(bp, bufferSize);104} finally {105printStamp(END, "handlerThrowsIAE(%d)", bufferSize);106}107}108109// ---110111@DataProvider(name = "config")112public Object[][] config() {113return new Object[][] {114// iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize115{ 1, 0, 1, 1, 2, 1 },116{ 1, 5, 1, 100, 2, 1 },117{ 1, 0, 1, 10, 1000, 1 },118{ 1, 10, 1, 10, 1000, 1 },119{ 1, 0, 1, 1000, 1000, 10 },120{ 1, 0, 10, 1000, 1000, 50 },121{ 1, 0, 1000, 10 , 1000, 50 },122{ 1, 100, 1, 1000 * 4, 1000, 50 },123{ 100, 0, 1000, 1, 2, 1 },124{ 3, 0, 4, 5006, 1000, 50 },125{ 20, 0, 100, 4888, 1000, 100 },126{ 16, 10, 1000, 50 , 1000, 100 },127{ 16, 10, 1000, 50 , 657, 657 },128};129}130131@Test(dataProvider = "config")132public void test(int iterations,133int delayMillis,134int numBuffers,135int bufferSize,136int maxBufferSize,137int minbufferSize) {138for (long perRequestAmount : new long[] { 1L, MAX_VALUE })139test(iterations,140delayMillis,141numBuffers,142bufferSize,143maxBufferSize,144minbufferSize,145perRequestAmount);146}147148volatile boolean onNextThrew;149150BiConsumer<Flow.Subscriber<?>, ? super Throwable> onNextThrowHandler =151(sub, ex) -> {152onNextThrew = true;153System.out.println("onNext threw " + ex);154ex.printStackTrace();155};156157public void test(int iterations,158int delayMillis,159int numBuffers,160int bufferSize,161int maxBufferSize,162int minBufferSize,163long requestAmount) {164ExecutorService executor = Executors.newFixedThreadPool(1);165try {166out.printf("Iterations %d\n", iterations);167for (int i=0; i<iterations; i++ ) {168printStamp(START, "Iteration %d", i);169try {170SubmissionPublisher<List<ByteBuffer>> publisher =171new SubmissionPublisher<>(executor,1721, // lock-step with the publisher, for now173onNextThrowHandler);174CompletableFuture<?> cf = sink(publisher,175delayMillis,176numBuffers * bufferSize,177requestAmount,178maxBufferSize,179minBufferSize);180source(publisher, numBuffers, bufferSize);181publisher.close();182cf.join();183} finally {184printStamp(END, "Iteration %d\n", i);185}186}187188assertFalse(onNextThrew, "Unexpected onNextThrew, check output");189190out.println("OK");191} finally {192executor.shutdown();193}194}195196static long accumulatedDataSize(List<ByteBuffer> bufs) {197return bufs.stream().mapToLong(ByteBuffer::remaining).sum();198}199200/** Returns a new BB with its contents set to monotonically increasing201* values, staring at the given start index and wrapping every 100. */202static ByteBuffer allocateBuffer(int size, int startIdx) {203ByteBuffer b = ByteBuffer.allocate(size);204for (int i=0; i<size; i++)205b.put((byte)((startIdx + i) % 100));206b.position(0);207return b;208}209210static class TestSubscriber implements BodySubscriber<Integer> {211final int delayMillis;212final int bufferSize;213final int expectedTotalSize;214final long requestAmount;215final CompletableFuture<Integer> completion;216final Executor delayedExecutor;217volatile Flow.Subscription subscription;218219TestSubscriber(int bufferSize,220int delayMillis,221int expectedTotalSize,222long requestAmount) {223this.bufferSize = bufferSize;224this.completion = new CompletableFuture<>();225this.delayMillis = delayMillis;226this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);227this.expectedTotalSize = expectedTotalSize;228this.requestAmount = requestAmount;229}230231/**232* Example of a factory method which would decorate a buffering233* subscriber to create a new subscriber dependent on buffering capability.234* <p>235* The integer type parameter simulates the body just by counting the236* number of bytes in the body.237*/238static BodySubscriber<Integer> createSubscriber(int bufferSize,239int delay,240int expectedTotalSize,241long requestAmount) {242TestSubscriber s = new TestSubscriber(bufferSize,243delay,244expectedTotalSize,245requestAmount);246return BodySubscribers.buffering(s, bufferSize);247}248249private void requestMore() {250subscription.request(requestAmount);251}252253@Override254public void onSubscribe(Subscription subscription) {255assertNull(this.subscription);256this.subscription = subscription;257if (delayMillis > 0)258delayedExecutor.execute(this::requestMore);259else260requestMore();261}262263volatile int wrongSizes;264volatile int totalBytesReceived;265volatile int onNextInvocations;266volatile long lastSeenSize = -1;267volatile boolean noMoreOnNext; // false268volatile int index; // 0269volatile long count;270271@Override272public void onNext(List<ByteBuffer> items) {273try {274long sz = accumulatedDataSize(items);275boolean printStamp = delayMillis > 0276&& requestAmount < Long.MAX_VALUE277&& count % 20 == 0;278if (printStamp) {279printStamp("stamp", "count=%d sz=%d accumulated=%d",280count, sz, (totalBytesReceived + sz));281}282count++;283onNextInvocations++;284assertNotEquals(sz, 0L, "Unexpected empty buffers");285items.stream().forEach(b -> assertEquals(b.position(), 0));286assertFalse(noMoreOnNext);287288if (sz != bufferSize) {289String msg = sz + ", should be less than bufferSize, " + bufferSize;290assertTrue(sz < bufferSize, msg);291assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);292noMoreOnNext = true;293wrongSizes++;294printStamp("onNext",295"Possibly received last buffer: sz=%d, accumulated=%d, total=%d",296sz, totalBytesReceived, totalBytesReceived + sz);297} else {298assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");299}300lastSeenSize = sz;301302// Ensure expected contents303for (ByteBuffer b : items) {304while (b.hasRemaining()) {305assertEquals(b.get(), (byte) (index % 100));306index++;307}308}309310totalBytesReceived += sz;311assertEquals(totalBytesReceived, index);312if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))313delayedExecutor.execute(this::requestMore);314else315requestMore();316} catch (Throwable t) {317completion.completeExceptionally(t);318}319}320321@Override322public void onError(Throwable throwable) {323completion.completeExceptionally(throwable);324}325326@Override327public void onComplete() {328if (wrongSizes > 1) { // allow just the final item to be smaller329String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";330completion.completeExceptionally(new Throwable(msg));331}332if (totalBytesReceived != expectedTotalSize) {333String msg = "Wrong number of bytes. [" + this + "]";334completion.completeExceptionally(new Throwable(msg));335} else {336completion.complete(totalBytesReceived);337}338}339340@Override341public CompletionStage<Integer> getBody() {342return completion;343}344345@Override346public String toString() {347StringBuilder sb = new StringBuilder();348sb.append(super.toString());349sb.append(", bufferSize=").append(bufferSize);350sb.append(", onNextInvocations=").append(onNextInvocations);351sb.append(", totalBytesReceived=").append(totalBytesReceived);352sb.append(", expectedTotalSize=").append(expectedTotalSize);353sb.append(", requestAmount=").append(requestAmount);354sb.append(", lastSeenSize=").append(lastSeenSize);355sb.append(", wrongSizes=").append(wrongSizes);356sb.append(", index=").append(index);357return sb.toString();358}359}360361/**362* Publishes data, through the given publisher, using the main thread.363*364* Note: The executor supplied when creating the SubmissionPublisher provides365* the threads for executing the Subscribers.366*367* @param publisher the publisher368* @param numBuffers the number of buffers to send ( before splitting in two )369* @param bufferSize the total size of the data to send ( before splitting in two )370*/371static void source(SubmissionPublisher<List<ByteBuffer>> publisher,372int numBuffers,373int bufferSize) {374printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize);375int index = 0;376for (int i=0; i<numBuffers; i++) {377int chunkSize = random.nextInt(bufferSize);378ByteBuffer buf1 = allocateBuffer(chunkSize, index);379index += chunkSize;380ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);381index += bufferSize - chunkSize;382publisher.submit(List.of(buf1, buf2));383}384printStamp("source", "complete");385}386387/**388* Creates and subscribes Subscribers that receive data from the given389* publisher.390*391* @param publisher the publisher392* @param delayMillis time, in milliseconds, to delay the Subscription393* requesting more bytes ( for simulating slow consumption )394* @param expectedTotalSize the total number of bytes expected to be received395* by the subscribers396* @return a CompletableFuture which completes when the subscription is complete397*/398static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,399int delayMillis,400int expectedTotalSize,401long requestAmount,402int maxBufferSize,403int minBufferSize) {404int bufferSize = chooseBufferSize(maxBufferSize,405minBufferSize,406delayMillis,407expectedTotalSize,408requestAmount);409assert bufferSize > 0;410assert bufferSize >= minBufferSize;411assert bufferSize <= maxBufferSize;412BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,413delayMillis,414expectedTotalSize,415requestAmount);416publisher.subscribe(sub);417printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize);418out.printf("Subscription delay is %d msec\n", delayMillis);419long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount;420out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000);421out.printf("Request amount is %d items\n", requestAmount);422return sub.getBody().toCompletableFuture();423}424425static int chooseBufferSize(int maxBufferSize,426int minBufferSize,427int delaysMillis,428int expectedTotalSize,429long requestAmount) {430assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;431int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :432(random.nextInt(maxBufferSize - minBufferSize)433+ minBufferSize);434if (requestAmount == Long.MAX_VALUE) return bufferSize;435long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)436/ requestAmount;437long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize)438/ requestAmount;439// if the maximum delay is < 10s just take a random number between min and max.440if (maxDelay <= LOWER_THRESHOLD) {441return bufferSize;442}443// if minimum delay is greater than 20s then print a warning and use max buffer.444if (minDelay >= UPPER_THRESHOLD) {445System.out.println("Warning: minimum delay is "446+ minDelay/1000 + "sec " + minDelay%1000 + "ms");447System.err.println("Warning: minimum delay is "448+ minDelay/1000 + "sec " + minDelay%1000 + "ms");449return maxBufferSize;450}451// maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD452// try to pick up a buffer size that keeps the delay below the453// UPPER_THRESHOLD454while (minBufferSize < maxBufferSize) {455bufferSize = random.nextInt(maxBufferSize - minBufferSize)456+ minBufferSize;457long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize)458/ requestAmount;459if (delay < UPPER_THRESHOLD) return bufferSize;460minBufferSize++;461}462return minBufferSize;463}464465// ---466467/* Main entry point for standalone testing of the main functional test. */468public static void main(String... args) {469BufferingSubscriberTest t = new BufferingSubscriberTest();470for (Object[] objs : t.config())471t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);472}473}474475476