Path: blob/master/test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java
41154 views
/*1* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @bug 8222774 443013925* @run testng AdaptorStreams26* @summary Exercise socket adaptor input/output streams27*/2829import java.io.Closeable;30import java.io.IOException;31import java.io.InputStream;32import java.io.OutputStream;33import java.net.InetAddress;34import java.net.InetSocketAddress;35import java.net.ServerSocket;36import java.net.Socket;37import java.net.SocketTimeoutException;38import java.nio.channels.IllegalBlockingModeException;39import java.nio.channels.SocketChannel;40import java.util.concurrent.ExecutorService;41import java.util.concurrent.Executors;42import java.util.concurrent.Future;43import java.util.concurrent.ScheduledExecutorService;44import java.util.concurrent.TimeUnit;4546import org.testng.annotations.Test;47import static org.testng.Assert.*;4849@Test50public class AdaptorStreams {5152/**53* Test read when bytes are available54*/55public void testRead1() throws Exception {56withConnection((sc, peer) -> {57peer.getOutputStream().write(99);58int n = sc.socket().getInputStream().read();59assertEquals(n, 99);60});61}6263/**64* Test read blocking before bytes are available65*/66public void testRead2() throws Exception {67withConnection((sc, peer) -> {68scheduleWrite(peer.getOutputStream(), 99, 1000);69int n = sc.socket().getInputStream().read();70assertEquals(n, 99);71});72}7374/**75* Test read when peer has closed connection76*/77public void testRead3() throws Exception {78withConnection((sc, peer) -> {79peer.close();80int n = sc.socket().getInputStream().read();81assertEquals(n, -1);82});83}8485/**86* Test read blocking before peer closes connection87*/88public void testRead4() throws Exception {89withConnection((sc, peer) -> {90scheduleClose(peer, 1000);91int n = sc.socket().getInputStream().read();92assertEquals(n, -1);93});94}9596/**97* Test async close of socket when thread blocked in read98*/99public void testRead5() throws Exception {100withConnection((sc, peer) -> {101scheduleClose(sc, 2000);102InputStream in = sc.socket().getInputStream();103expectThrows(IOException.class, () -> in.read());104});105}106107/**108* Test interrupt status set before read109*/110public void testRead6() throws Exception {111withConnection((sc, peer) -> {112Socket s = sc.socket();113Thread.currentThread().interrupt();114try {115InputStream in = s.getInputStream();116expectThrows(IOException.class, () -> in.read());117} finally {118Thread.interrupted(); // clear interrupt119}120assertTrue(s.isClosed());121});122}123124/**125* Test interrupt of thread blocked in read126*/127public void testRead7() throws Exception {128withConnection((sc, peer) -> {129Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);130Socket s = sc.socket();131try {132InputStream in = s.getInputStream();133expectThrows(IOException.class, () -> in.read());134} finally {135interrupter.cancel(true);136Thread.interrupted(); // clear interrupt137}138assertTrue(s.isClosed());139});140}141142/**143* Test read when channel is configured non-blocking144*/145public void testRead8() throws Exception {146withConnection((sc, peer) -> {147sc.configureBlocking(false);148InputStream in = sc.socket().getInputStream();149expectThrows(IllegalBlockingModeException.class, () -> in.read());150});151}152153/**154* Test timed read when bytes are available155*/156public void testTimedRead1() throws Exception {157withConnection((sc, peer) -> {158peer.getOutputStream().write(99);159Socket s = sc.socket();160s.setSoTimeout(1000);161int n = s.getInputStream().read();162assertEquals(n, 99);163});164}165166/**167* Test timed read blocking before bytes are available168*/169public void testTimedRead2() throws Exception {170withConnection((sc, peer) -> {171scheduleWrite(peer.getOutputStream(), 99, 1000);172Socket s = sc.socket();173s.setSoTimeout(5000);174int n = s.getInputStream().read();175assertEquals(n, 99);176});177}178179/**180* Test timed read when the read times out181*/182public void testTimedRead3() throws Exception {183withConnection((sc, peer) -> {184Socket s = sc.socket();185s.setSoTimeout(1000);186InputStream in = s.getInputStream();187expectThrows(SocketTimeoutException.class, () -> in.read());188});189}190191/**192* Test async close of socket when thread blocked in timed read193*/194public void testTimedRead4() throws Exception {195withConnection((sc, peer) -> {196scheduleClose(sc, 2000);197Socket s = sc.socket();198s.setSoTimeout(60*1000);199InputStream in = s.getInputStream();200expectThrows(IOException.class, () -> in.read());201});202}203204/**205* Test interrupt status set before timed read206*/207public void testTimedRead5() throws Exception {208withConnection((sc, peer) -> {209Socket s = sc.socket();210Thread.currentThread().interrupt();211try {212s.setSoTimeout(60*1000);213InputStream in = s.getInputStream();214expectThrows(IOException.class, () -> in.read());215} finally {216Thread.interrupted(); // clear interrupt217}218assertTrue(s.isClosed());219});220}221222/**223* Test interrupt of thread blocked in timed read224*/225public void testTimedRead6() throws Exception {226withConnection((sc, peer) -> {227Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);228Socket s = sc.socket();229try {230s.setSoTimeout(60*1000);231InputStream in = s.getInputStream();232expectThrows(IOException.class, () -> in.read());233assertTrue(s.isClosed());234} finally {235interrupter.cancel(true);236Thread.interrupted(); // clear interrupt237}238assertTrue(s.isClosed());239});240}241242/**243* Test async close of socket when thread blocked in write244*/245public void testWrite1() throws Exception {246withConnection((sc, peer) -> {247scheduleClose(sc, 2000);248expectThrows(IOException.class, () -> {249OutputStream out = sc.socket().getOutputStream();250byte[] data = new byte[64*1000];251while (true) {252out.write(data);253}254});255});256}257258/**259* Test interrupt status set before write260*/261public void testWrite2() throws Exception {262withConnection((sc, peer) -> {263Socket s = sc.socket();264Thread.currentThread().interrupt();265try {266OutputStream out = s.getOutputStream();267expectThrows(IOException.class, () -> out.write(99));268} finally {269Thread.interrupted(); // clear interrupt270}271assertTrue(s.isClosed());272});273}274275/**276* Test interrupt of thread blocked in write277*/278public void testWrite3() throws Exception {279withConnection((sc, peer) -> {280Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);281Socket s = sc.socket();282try {283expectThrows(IOException.class, () -> {284OutputStream out = sc.socket().getOutputStream();285byte[] data = new byte[64*1000];286while (true) {287out.write(data);288}289});290} finally {291interrupter.cancel(true);292Thread.interrupted(); // clear interrupt293}294assertTrue(s.isClosed());295});296}297298/**299* Test write when channel is configured non-blocking300*/301public void testWrite4() throws Exception {302withConnection((sc, peer) -> {303sc.configureBlocking(false);304OutputStream out = sc.socket().getOutputStream();305expectThrows(IllegalBlockingModeException.class, () -> out.write(99));306});307}308309/**310* Test read when there are bytes available and another thread is blocked311* in write312*/313public void testConcurrentReadWrite1() throws Exception {314withConnection((sc, peer) -> {315Socket s = sc.socket();316317// block thread in write318execute(() -> {319var data = new byte[64*1024];320OutputStream out = s.getOutputStream();321for (;;) {322out.write(data);323}324});325Thread.sleep(1000); // give writer time to block326327// test read when bytes are available328peer.getOutputStream().write(99);329int n = s.getInputStream().read();330assertEquals(n, 99);331});332}333334/**335* Test read blocking when another thread is blocked in write336*/337public void testConcurrentReadWrite2() throws Exception {338withConnection((sc, peer) -> {339Socket s = sc.socket();340341// block thread in write342execute(() -> {343var data = new byte[64*1024];344OutputStream out = s.getOutputStream();345for (;;) {346out.write(data);347}348});349Thread.sleep(1000); // give writer time to block350351// test read blocking until bytes are available352scheduleWrite(peer.getOutputStream(), 99, 500);353int n = s.getInputStream().read();354assertEquals(n, 99);355});356}357358/**359* Test writing when another thread is blocked in read360*/361public void testConcurrentReadWrite3() throws Exception {362withConnection((sc, peer) -> {363Socket s = sc.socket();364365// block thread in read366execute(() -> {367s.getInputStream().read();368});369Thread.sleep(100); // give reader time to block370371// test write372s.getOutputStream().write(99);373int n = peer.getInputStream().read();374assertEquals(n, 99);375});376}377378/**379* Test timed read when there are bytes available and another thread is380* blocked in write381*/382public void testConcurrentTimedReadWrite1() throws Exception {383withConnection((sc, peer) -> {384Socket s = sc.socket();385386// block thread in write387execute(() -> {388var data = new byte[64*1024];389OutputStream out = s.getOutputStream();390for (;;) {391out.write(data);392}393});394Thread.sleep(1000); // give writer time to block395396// test read when bytes are available397peer.getOutputStream().write(99);398s.setSoTimeout(60*1000);399int n = s.getInputStream().read();400assertEquals(n, 99);401});402}403404/**405* Test timed read blocking when another thread is blocked in write406*/407public void testConcurrentTimedReadWrite2() throws Exception {408withConnection((sc, peer) -> {409Socket s = sc.socket();410411// block thread in write412execute(() -> {413var data = new byte[64*1024];414OutputStream out = s.getOutputStream();415for (;;) {416out.write(data);417}418});419Thread.sleep(1000); // give writer time to block420421// test read blocking until bytes are available422scheduleWrite(peer.getOutputStream(), 99, 500);423s.setSoTimeout(60*1000);424int n = s.getInputStream().read();425assertEquals(n, 99);426});427}428429/**430* Test writing when another thread is blocked in read431*/432public void testConcurrentTimedReadWrite3() throws Exception {433withConnection((sc, peer) -> {434Socket s = sc.socket();435436// block thread in read437execute(() -> {438s.setSoTimeout(60*1000);439s.getInputStream().read();440});441Thread.sleep(100); // give reader time to block442443// test write444s.getOutputStream().write(99);445int n = peer.getInputStream().read();446assertEquals(n, 99);447});448}449450// -- test infrastructure --451452interface ThrowingTask {453void run() throws Exception;454}455456interface ThrowingBiConsumer<T, U> {457void accept(T t, U u) throws Exception;458}459460/**461* Invokes the consumer with a connected pair of socket channel and socket462*/463static void withConnection(ThrowingBiConsumer<SocketChannel, Socket> consumer)464throws Exception465{466var loopback = InetAddress.getLoopbackAddress();467try (ServerSocket ss = new ServerSocket()) {468ss.bind(new InetSocketAddress(loopback, 0));469try (SocketChannel sc = SocketChannel.open(ss.getLocalSocketAddress())) {470try (Socket peer = ss.accept()) {471consumer.accept(sc, peer);472}473}474}475}476477static Future<?> scheduleWrite(OutputStream out, byte[] data, long delay) {478return schedule(() -> {479try {480out.write(data);481} catch (IOException ioe) { }482}, delay);483}484485static Future<?> scheduleWrite(OutputStream out, int b, long delay) {486return scheduleWrite(out, new byte[] { (byte)b }, delay);487}488489static Future<?> scheduleClose(Closeable c, long delay) {490return schedule(() -> {491try {492c.close();493} catch (IOException ioe) { }494}, delay);495}496497static Future<?> scheduleInterrupt(Thread t, long delay) {498return schedule(() -> t.interrupt(), delay);499}500501static Future<?> schedule(Runnable task, long delay) {502ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();503try {504return executor.schedule(task, delay, TimeUnit.MILLISECONDS);505} finally {506executor.shutdown();507}508}509510static Future<?> execute(ThrowingTask task) {511ExecutorService pool = Executors.newFixedThreadPool(1);512try {513return pool.submit(() -> {514task.run();515return null;516});517} finally {518pool.shutdown();519}520}521}522523524