Path: blob/master/test/jdk/java/net/httpclient/BufferingSubscriberErrorCompleteTest.java
41152 views
/*1* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.nio.ByteBuffer;24import java.util.ArrayList;25import java.util.List;26import java.util.concurrent.CompletionStage;27import java.util.concurrent.ExecutorService;28import java.util.concurrent.Executors;29import java.util.concurrent.Flow.Subscription;30import java.util.concurrent.Phaser;31import java.util.concurrent.SubmissionPublisher;32import java.util.stream.IntStream;33import java.net.http.HttpResponse.BodySubscriber;34import org.testng.annotations.DataProvider;35import org.testng.annotations.Test;36import static java.lang.Long.MAX_VALUE;37import static java.lang.Long.MIN_VALUE;38import static java.nio.ByteBuffer.wrap;39import static java.net.http.HttpResponse.BodySubscribers.buffering;40import static org.testng.Assert.*;4142/*43* @test44* @summary Test for HttpResponse.BodySubscriber.buffering() onError/onComplete45* @run testng/othervm BufferingSubscriberErrorCompleteTest46*/4748public class BufferingSubscriberErrorCompleteTest {4950@DataProvider(name = "illegalDemand")51public Object[][] illegalDemand() {52return new Object[][]{53{0L}, {-1L}, {-5L}, {-100L}, {-101L}, {-100_001L}, {MIN_VALUE}54};55}5657@Test(dataProvider = "illegalDemand")58public void illegalRequest(long demand) throws Exception {59ExecutorService executor = Executors.newFixedThreadPool(1);60SubmissionPublisher<List<ByteBuffer>> publisher =61new SubmissionPublisher<>(executor, 1);6263Phaser gate = new Phaser(2); // single onSubscribe and onError64ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);65BodySubscriber subscriber = buffering(exposingSubscriber, 1);66publisher.subscribe(subscriber);67gate.arriveAndAwaitAdvance();6869Subscription s = exposingSubscriber.subscription;70int previous = exposingSubscriber.onErrorInvocations;71s.request(demand);72gate.arriveAndAwaitAdvance();7374assertEquals(previous + 1, exposingSubscriber.onErrorInvocations);75assertTrue(exposingSubscriber.throwable instanceof IllegalArgumentException,76"Expected IAE, got:" + exposingSubscriber.throwable);7778furtherCancelsRequestsShouldBeNoOp(s);79assertEquals(exposingSubscriber.onErrorInvocations, 1);80executor.shutdown();81}828384@DataProvider(name = "bufferAndItemSizes")85public Object[][] bufferAndItemSizes() {86List<Object[]> values = new ArrayList<>();8788for (int bufferSize : new int[] { 1, 5, 10, 100, 1000 })89for (int items : new int[] { 0, 1, 2, 5, 9, 10, 11, 15, 29, 99 })90values.add(new Object[] { bufferSize, items });9192return values.stream().toArray(Object[][]::new);93}9495@Test(dataProvider = "bufferAndItemSizes")96public void onErrorFromPublisher(int bufferSize,97int numberOfItems)98throws Exception99{100ExecutorService executor = Executors.newFixedThreadPool(1);101SubmissionPublisher<List<ByteBuffer>> publisher =102new SubmissionPublisher<>(executor, 1);103104// onSubscribe + onError + this thread105Phaser gate = new Phaser(3);106ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);107BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize);108publisher.subscribe(subscriber);109110List<ByteBuffer> item = List.of(wrap(new byte[] { 1 }));111IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item));112Throwable t = new Throwable("a message from me to me");113publisher.closeExceptionally(t);114115gate.arriveAndAwaitAdvance();116117Subscription s = exposingSubscriber.subscription;118119assertEquals(exposingSubscriber.onErrorInvocations, 1);120assertEquals(exposingSubscriber.onCompleteInvocations, 0);121assertEquals(exposingSubscriber.throwable, t);122assertEquals(exposingSubscriber.throwable.getMessage(),123"a message from me to me");124125furtherCancelsRequestsShouldBeNoOp(s);126assertEquals(exposingSubscriber.onErrorInvocations, 1);127assertEquals(exposingSubscriber.onCompleteInvocations, 0);128executor.shutdown();129}130131@Test(dataProvider = "bufferAndItemSizes")132public void onCompleteFromPublisher(int bufferSize,133int numberOfItems)134throws Exception135{136ExecutorService executor = Executors.newFixedThreadPool(1);137SubmissionPublisher<List<ByteBuffer>> publisher =138new SubmissionPublisher<>(executor, 1);139140// onSubscribe + onComplete + this thread141Phaser gate = new Phaser(3);142ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);143BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize);144publisher.subscribe(subscriber);145146List<ByteBuffer> item = List.of(wrap(new byte[] { 1 }));147IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item));148publisher.close();149150gate.arriveAndAwaitAdvance();151152Subscription s = exposingSubscriber.subscription;153154assertEquals(exposingSubscriber.onErrorInvocations, 0);155assertEquals(exposingSubscriber.onCompleteInvocations, 1);156assertEquals(exposingSubscriber.throwable, null);157158furtherCancelsRequestsShouldBeNoOp(s);159assertEquals(exposingSubscriber.onErrorInvocations, 0);160assertEquals(exposingSubscriber.onCompleteInvocations, 1);161assertEquals(exposingSubscriber.throwable, null);162executor.shutdown();163}164165static class ExposingSubscriber implements BodySubscriber<Void> {166final Phaser gate;167volatile Subscription subscription;168volatile int onNextInvocations;169volatile int onErrorInvocations;170volatile int onCompleteInvocations;171volatile Throwable throwable;172173ExposingSubscriber(Phaser gate) {174this.gate = gate;175}176177@Override178public void onSubscribe(Subscription subscription) {179//out.println("onSubscribe " + subscription);180this.subscription = subscription;181subscription.request(MAX_VALUE);182gate.arrive();183}184185@Override186public void onNext(List<ByteBuffer> item) {187//out.println("onNext " + item);188onNextInvocations++;189}190191@Override192public void onError(Throwable throwable) {193//out.println("onError " + throwable);194this.throwable = throwable;195onErrorInvocations++;196gate.arrive();197}198199@Override200public void onComplete() {201//out.println("onComplete ");202onCompleteInvocations++;203gate.arrive();204}205206@Override207public CompletionStage<Void> getBody() {208throw new UnsupportedOperationException("getBody is unsupported");209}210}211212static void furtherCancelsRequestsShouldBeNoOp(Subscription s) {213s.cancel(); s.request(1);214s.cancel(); s.request(100); s.cancel();215s.cancel(); s.request(MAX_VALUE); s.cancel(); s.cancel();216s.cancel(); s.cancel(); s.cancel(); s.cancel();217s.request(MAX_VALUE); s.request(MAX_VALUE); s.request(MAX_VALUE);218s.request(-1); s.request(-100); s.request(MIN_VALUE);219}220}221222223