Path: blob/master/test/jdk/java/nio/channels/AsynchronousSocketChannel/Basic.java
41153 views
/*1* Copyright (c) 2008, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @bug 4607272 6842687 6878369 6944810 702340325* @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed)26* @library /test/lib27* @modules jdk.net28* @key randomness intermittent29* @build jdk.test.lib.RandomFactory jdk.test.lib.Utils30* @run main/othervm/timeout=600 Basic -skipSlowConnectTest31*/3233import java.io.Closeable;34import java.io.IOException;35import java.net.*;36import static java.net.StandardSocketOptions.*;37import java.nio.ByteBuffer;38import java.nio.channels.*;39import java.util.Random;40import java.util.Set;41import java.util.concurrent.*;42import java.util.concurrent.atomic.*;43import jdk.test.lib.RandomFactory;44import java.util.List;45import static jdk.net.ExtendedSocketOptions.TCP_KEEPCOUNT;46import static jdk.net.ExtendedSocketOptions.TCP_KEEPIDLE;47import static jdk.net.ExtendedSocketOptions.TCP_KEEPINTERVAL;4849public class Basic {50private static final Random RAND = RandomFactory.getRandom();5152static boolean skipSlowConnectTest = false;5354public static void main(String[] args) throws Exception {55for (String arg: args) {56switch (arg) {57case "-skipSlowConnectTest" :58skipSlowConnectTest = true;59break;60default:61throw new RuntimeException("Unrecognized argument: " + arg);62}63}6465testBind();66testSocketOptions();67testConnect();68testCloseWhenPending();69testCancel();70testRead1();71testRead2();72testRead3();73testWrite1();74testWrite2();75// skip timeout tests until 7052549 is fixed76if (!System.getProperty("os.name").startsWith("Windows"))77testTimeout();78testShutdown();79}8081static class Server implements Closeable {82private final ServerSocketChannel ssc;83private final InetSocketAddress address;8485Server() throws IOException {86this(0);87}8889Server(int recvBufSize) throws IOException {90ssc = ServerSocketChannel.open();91if (recvBufSize > 0) {92ssc.setOption(SO_RCVBUF, recvBufSize);93}94ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));95address = (InetSocketAddress)ssc.getLocalAddress();96}9798InetSocketAddress address() {99return address;100}101102SocketChannel accept() throws IOException {103return ssc.accept();104}105106public void close() throws IOException {107ssc.close();108}109110}111112static void testBind() throws Exception {113System.out.println("-- bind --");114115try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {116if (ch.getLocalAddress() != null)117throw new RuntimeException("Local address should be 'null'");118ch.bind(new InetSocketAddress(0));119120// check local address after binding121InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();122if (local.getPort() == 0)123throw new RuntimeException("Unexpected port");124if (!local.getAddress().isAnyLocalAddress())125throw new RuntimeException("Not bound to a wildcard address");126127// try to re-bind128try {129ch.bind(new InetSocketAddress(0));130throw new RuntimeException("AlreadyBoundException expected");131} catch (AlreadyBoundException x) {132}133}134135// check ClosedChannelException136AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();137ch.close();138try {139ch.bind(new InetSocketAddress(0));140throw new RuntimeException("ClosedChannelException expected");141} catch (ClosedChannelException x) {142}143}144145static void testSocketOptions() throws Exception {146System.out.println("-- socket options --");147148try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {149ch.setOption(SO_RCVBUF, 128*1024)150.setOption(SO_SNDBUF, 128*1024)151.setOption(SO_REUSEADDR, true);152153// check SO_SNDBUF/SO_RCVBUF limits154int before, after;155before = ch.getOption(SO_SNDBUF);156after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);157if (after < before)158throw new RuntimeException("setOption caused SO_SNDBUF to decrease");159before = ch.getOption(SO_RCVBUF);160after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);161if (after < before)162throw new RuntimeException("setOption caused SO_RCVBUF to decrease");163164ch.bind(new InetSocketAddress(0));165166// default values167if (ch.getOption(SO_KEEPALIVE))168throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");169if (ch.getOption(TCP_NODELAY))170throw new RuntimeException("Default of TCP_NODELAY should be 'false'");171172// set and check173if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))174throw new RuntimeException("SO_KEEPALIVE did not change");175if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))176throw new RuntimeException("SO_KEEPALIVE did not change");177178// read others (can't check as actual value is implementation dependent)179ch.getOption(SO_RCVBUF);180ch.getOption(SO_SNDBUF);181182Set<SocketOption<?>> options = ch.supportedOptions();183boolean reuseport = options.contains(SO_REUSEPORT);184if (reuseport) {185if (ch.getOption(SO_REUSEPORT))186throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");187if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))188throw new RuntimeException("SO_REUSEPORT did not change");189}190List<? extends SocketOption> extOptions = List.of(TCP_KEEPCOUNT,191TCP_KEEPIDLE, TCP_KEEPINTERVAL);192if (options.containsAll(extOptions)) {193ch.setOption(TCP_KEEPIDLE, 1234);194checkOption(ch, TCP_KEEPIDLE, 1234);195ch.setOption(TCP_KEEPINTERVAL, 123);196checkOption(ch, TCP_KEEPINTERVAL, 123);197ch.setOption(TCP_KEEPCOUNT, 7);198checkOption(ch, TCP_KEEPCOUNT, 7);199}200}201}202203static void checkOption(AsynchronousSocketChannel sc, SocketOption name, Object expectedValue)204throws IOException {205Object value = sc.getOption(name);206if (!value.equals(expectedValue)) {207throw new RuntimeException("value not as expected");208}209}210static void testConnect() throws Exception {211System.out.println("-- connect --");212213SocketAddress address;214215try (Server server = new Server()) {216address = server.address();217218// connect to server and check local/remote addresses219try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {220ch.connect(address).get();221// check local address222if (ch.getLocalAddress() == null)223throw new RuntimeException("Not bound to local address");224225// check remote address226InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();227if (remote.getPort() != server.address().getPort())228throw new RuntimeException("Connected to unexpected port");229if (!remote.getAddress().equals(server.address().getAddress()))230throw new RuntimeException("Connected to unexpected address");231232// try to connect again233try {234ch.connect(server.address()).get();235throw new RuntimeException("AlreadyConnectedException expected");236} catch (AlreadyConnectedException x) {237}238239// clean-up240server.accept().close();241}242243// check that connect fails with ClosedChannelException244AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();245ch.close();246try {247ch.connect(server.address()).get();248throw new RuntimeException("ExecutionException expected");249} catch (ExecutionException x) {250if (!(x.getCause() instanceof ClosedChannelException))251throw new RuntimeException("Cause of ClosedChannelException expected",252x.getCause());253}254final AtomicReference<Throwable> connectException = new AtomicReference<>();255ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {256public void completed(Void result, Void att) {257}258public void failed(Throwable exc, Void att) {259connectException.set(exc);260}261});262while (connectException.get() == null) {263Thread.sleep(100);264}265if (!(connectException.get() instanceof ClosedChannelException))266throw new RuntimeException("ClosedChannelException expected",267connectException.get());268}269270// test that failure to connect closes the channel271try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {272try {273ch.connect(address).get();274} catch (ExecutionException x) {275// failed to establish connection276if (ch.isOpen())277throw new RuntimeException("Channel should be closed");278}279}280281// repeat test by connecting to a (probably) non-existent host. This282// improves the chance that the connect will not fail immediately.283if (!skipSlowConnectTest) {284try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {285try {286ch.connect(genSocketAddress()).get();287} catch (ExecutionException x) {288// failed to establish connection289if (ch.isOpen())290throw new RuntimeException("Channel should be closed");291}292}293}294}295296static void testCloseWhenPending() throws Exception {297System.out.println("-- asynchronous close when connecting --");298299AsynchronousSocketChannel ch;300301// asynchronous close while connecting302ch = AsynchronousSocketChannel.open();303Future<Void> connectResult = ch.connect(genSocketAddress());304305// give time to initiate the connect (SYN)306Thread.sleep(50);307308// close309ch.close();310311// check that exception is thrown in timely manner312try {313connectResult.get(5, TimeUnit.SECONDS);314} catch (TimeoutException x) {315throw new RuntimeException("AsynchronousCloseException not thrown");316} catch (ExecutionException x) {317// expected318}319320System.out.println("-- asynchronous close when reading --");321322try (Server server = new Server(1)) {323ch = AsynchronousSocketChannel.open();324ch.connect(server.address()).get();325326ByteBuffer dst = ByteBuffer.allocateDirect(100);327Future<Integer> result = ch.read(dst);328329// attempt a second read - should fail with ReadPendingException330ByteBuffer buf = ByteBuffer.allocateDirect(100);331try {332ch.read(buf);333throw new RuntimeException("ReadPendingException expected");334} catch (ReadPendingException x) {335}336337// close channel (should cause initial read to complete)338SocketChannel peer = server.accept();339ch.close();340peer.close();341342// check that AsynchronousCloseException is thrown343try {344result.get();345throw new RuntimeException("Should not read");346} catch (ExecutionException x) {347if (!(x.getCause() instanceof AsynchronousCloseException))348throw new RuntimeException(x);349}350351System.out.println("-- asynchronous close when writing --");352353ch = AsynchronousSocketChannel.open();354ch.connect(server.address()).get();355peer = server.accept();356peer.setOption(SO_RCVBUF, 1);357358final AtomicReference<Throwable> writeException =359new AtomicReference<Throwable>();360361// write bytes to fill socket buffer362final AtomicInteger numCompleted = new AtomicInteger();363ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {364public void completed(Integer result, AsynchronousSocketChannel ch) {365System.out.println("completed write to async channel: " + result);366numCompleted.incrementAndGet();367ch.write(genBuffer(), ch, this);368System.out.println("started another write to async channel: " + result);369}370public void failed(Throwable x, AsynchronousSocketChannel ch) {371System.out.println("failed write to async channel");372writeException.set(x);373}374});375376// give time for socket buffer to fill up -377// take pauses until the handler is no longer being invoked378// because all writes are being pended which guarantees that379// the internal channel state indicates it is writing380int prevNumCompleted = numCompleted.get();381do {382Thread.sleep((long)(1000 * jdk.test.lib.Utils.TIMEOUT_FACTOR));383System.out.println("check if buffer is filled up");384if (numCompleted.get() == prevNumCompleted) {385break;386}387prevNumCompleted = numCompleted.get();388} while (true);389390// attempt a concurrent write -391// should fail with WritePendingException392try {393System.out.println("concurrent write to async channel");394ch.write(genBuffer());395System.out.format("prevNumCompleted: %d, numCompleted: %d%n",396prevNumCompleted, numCompleted.get());397throw new RuntimeException("WritePendingException expected");398} catch (WritePendingException x) {399}400401// close channel - should cause initial write to complete402System.out.println("closing async channel...");403ch.close();404System.out.println("closed async channel");405peer.close();406407// wait for exception408while (writeException.get() == null) {409Thread.sleep(100);410}411if (!(writeException.get() instanceof AsynchronousCloseException))412throw new RuntimeException("AsynchronousCloseException expected",413writeException.get());414}415}416417static void testCancel() throws Exception {418System.out.println("-- cancel --");419420try (Server server = new Server()) {421for (int i=0; i<2; i++) {422boolean mayInterruptIfRunning = (i == 0) ? false : true;423424// establish loopback connection425AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();426ch.connect(server.address()).get();427SocketChannel peer = server.accept();428429// start read operation430ByteBuffer buf = ByteBuffer.allocate(1);431Future<Integer> res = ch.read(buf);432433// cancel operation434boolean cancelled = res.cancel(mayInterruptIfRunning);435436// check post-conditions437if (!res.isDone())438throw new RuntimeException("isDone should return true");439if (res.isCancelled() != cancelled)440throw new RuntimeException("isCancelled not consistent");441try {442res.get();443throw new RuntimeException("CancellationException expected");444} catch (CancellationException x) {445}446try {447res.get(1, TimeUnit.SECONDS);448throw new RuntimeException("CancellationException expected");449} catch (CancellationException x) {450}451452// check that the cancel doesn't impact writing to the channel453if (!mayInterruptIfRunning) {454buf = ByteBuffer.wrap("a".getBytes());455ch.write(buf).get();456}457458ch.close();459peer.close();460}461}462}463464static void testRead1() throws Exception {465System.out.println("-- read (1) --");466467try (Server server = new Server()) {468final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();469ch.connect(server.address()).get();470471// read with 0 bytes remaining should complete immediately472ByteBuffer buf = ByteBuffer.allocate(1);473buf.put((byte)0);474int n = ch.read(buf).get();475if (n != 0)476throw new RuntimeException("0 expected");477478// write bytes and close connection479ByteBuffer src = genBuffer();480try (SocketChannel sc = server.accept()) {481sc.setOption(SO_SNDBUF, src.remaining());482while (src.hasRemaining())483sc.write(src);484}485486// reads should complete immediately487final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);488final CountDownLatch latch = new CountDownLatch(1);489ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {490public void completed(Integer result, Void att) {491int n = result;492if (n > 0) {493ch.read(dst, (Void)null, this);494} else {495latch.countDown();496}497}498public void failed(Throwable exc, Void att) {499}500});501502latch.await();503504// check buffers505src.flip();506dst.flip();507if (!src.equals(dst)) {508throw new RuntimeException("Contents differ");509}510511// close channel512ch.close();513514// check read fails with ClosedChannelException515try {516ch.read(dst).get();517throw new RuntimeException("ExecutionException expected");518} catch (ExecutionException x) {519if (!(x.getCause() instanceof ClosedChannelException))520throw new RuntimeException("Cause of ClosedChannelException expected",521x.getCause());522}523}524}525526static void testRead2() throws Exception {527System.out.println("-- read (2) --");528529try (Server server = new Server()) {530final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();531ch.connect(server.address()).get();532SocketChannel sc = server.accept();533534ByteBuffer src = genBuffer();535536// read until the buffer is full537final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());538final CountDownLatch latch = new CountDownLatch(1);539ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {540public void completed(Integer result, Void att) {541if (dst.hasRemaining()) {542ch.read(dst, (Void)null, this);543} else {544latch.countDown();545}546}547public void failed(Throwable exc, Void att) {548}549});550551// trickle the writing552do {553int rem = src.remaining();554int size = (rem <= 100) ? rem : 50 + RAND.nextInt(rem - 100);555ByteBuffer buf = ByteBuffer.allocate(size);556for (int i=0; i<size; i++)557buf.put(src.get());558buf.flip();559Thread.sleep(50 + RAND.nextInt(1500));560while (buf.hasRemaining())561sc.write(buf);562} while (src.hasRemaining());563564// wait until ascynrhonous reading has completed565latch.await();566567// check buffers568src.flip();569dst.flip();570if (!src.equals(dst)) {571throw new RuntimeException("Contents differ");572}573574sc.close();575ch.close();576}577}578579// exercise scattering read580static void testRead3() throws Exception {581System.out.println("-- read (3) --");582583try (Server server = new Server()) {584final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();585ch.connect(server.address()).get();586SocketChannel sc = server.accept();587588ByteBuffer[] dsts = new ByteBuffer[3];589for (int i=0; i<dsts.length; i++) {590dsts[i] = ByteBuffer.allocateDirect(100);591}592593// scattering read that completes ascynhronously594final CountDownLatch l1 = new CountDownLatch(1);595ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,596new CompletionHandler<Long,Void>() {597public void completed(Long result, Void att) {598long n = result;599if (n <= 0)600throw new RuntimeException("No bytes read");601l1.countDown();602}603public void failed(Throwable exc, Void att) {604}605});606607// write some bytes608sc.write(genBuffer());609610// read should now complete611l1.await();612613// write more bytes614sc.write(genBuffer());615616// read should complete immediately617for (int i=0; i<dsts.length; i++) {618dsts[i].rewind();619}620621final CountDownLatch l2 = new CountDownLatch(1);622ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,623new CompletionHandler<Long,Void>() {624public void completed(Long result, Void att) {625long n = result;626if (n <= 0)627throw new RuntimeException("No bytes read");628l2.countDown();629}630public void failed(Throwable exc, Void att) {631}632});633l2.await();634635ch.close();636sc.close();637}638}639640static void testWrite1() throws Exception {641System.out.println("-- write (1) --");642643try (Server server = new Server()) {644final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();645ch.connect(server.address()).get();646SocketChannel sc = server.accept();647648// write with 0 bytes remaining should complete immediately649ByteBuffer buf = ByteBuffer.allocate(1);650buf.put((byte)0);651int n = ch.write(buf).get();652if (n != 0)653throw new RuntimeException("0 expected");654655// write all bytes and close connection when done656final ByteBuffer src = genBuffer();657ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {658public void completed(Integer result, Void att) {659if (src.hasRemaining()) {660ch.write(src, (Void)null, this);661} else {662try {663ch.close();664} catch (IOException ignore) { }665}666}667public void failed(Throwable exc, Void att) {668}669});670671// read to EOF or buffer full672ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);673do {674n = sc.read(dst);675} while (n > 0);676sc.close();677678// check buffers679src.flip();680dst.flip();681if (!src.equals(dst)) {682throw new RuntimeException("Contents differ");683}684685// check write fails with ClosedChannelException686try {687ch.read(dst).get();688throw new RuntimeException("ExecutionException expected");689} catch (ExecutionException x) {690if (!(x.getCause() instanceof ClosedChannelException))691throw new RuntimeException("Cause of ClosedChannelException expected",692x.getCause());693}694}695}696697// exercise gathering write698static void testWrite2() throws Exception {699System.out.println("-- write (2) --");700701try (Server server = new Server()) {702final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();703ch.connect(server.address()).get();704SocketChannel sc = server.accept();705706// number of bytes written707final AtomicLong bytesWritten = new AtomicLong(0);708709// write buffers (should complete immediately)710ByteBuffer[] srcs = genBuffers(1);711final CountDownLatch l1 = new CountDownLatch(1);712ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,713new CompletionHandler<Long,Void>() {714public void completed(Long result, Void att) {715long n = result;716if (n <= 0)717throw new RuntimeException("No bytes read");718bytesWritten.addAndGet(n);719l1.countDown();720}721public void failed(Throwable exc, Void att) {722}723});724l1.await();725726// set to true to signal that no more buffers should be written727final AtomicBoolean continueWriting = new AtomicBoolean(true);728729// write until socket buffer is full so as to create the conditions730// for when a write does not complete immediately731srcs = genBuffers(1);732ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,733new CompletionHandler<Long,Void>() {734public void completed(Long result, Void att) {735long n = result;736if (n <= 0)737throw new RuntimeException("No bytes written");738bytesWritten.addAndGet(n);739if (continueWriting.get()) {740ByteBuffer[] srcs = genBuffers(8);741ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,742(Void)null, this);743}744}745public void failed(Throwable exc, Void att) {746}747});748749// give time for socket buffer to fill up.750Thread.sleep(5*1000);751752// signal handler to stop further writing753continueWriting.set(false);754755// read until done756ByteBuffer buf = ByteBuffer.allocateDirect(4096);757long total = 0L;758do {759int n = sc.read(buf);760if (n <= 0)761throw new RuntimeException("No bytes read");762buf.rewind();763total += n;764} while (total < bytesWritten.get());765766ch.close();767sc.close();768}769}770771static void testShutdown() throws Exception {772System.out.println("-- shutdown --");773774try (Server server = new Server();775AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())776{777ch.connect(server.address()).get();778try (SocketChannel peer = server.accept()) {779ByteBuffer buf = ByteBuffer.allocateDirect(1000);780int n;781782// check read783ch.shutdownInput();784n = ch.read(buf).get();785if (n != -1)786throw new RuntimeException("-1 expected");787// check full with full buffer788buf.put(new byte[100]);789n = ch.read(buf).get();790if (n != -1)791throw new RuntimeException("-1 expected");792793// check write794ch.shutdownOutput();795try {796ch.write(buf).get();797throw new RuntimeException("ClosedChannelException expected");798} catch (ExecutionException x) {799if (!(x.getCause() instanceof ClosedChannelException))800throw new RuntimeException("ClosedChannelException expected",801x.getCause());802}803}804}805}806807static void testTimeout() throws Exception {808System.out.println("-- timeouts --");809testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);810testTimeout(-1L, TimeUnit.SECONDS);811testTimeout(0L, TimeUnit.SECONDS);812testTimeout(2L, TimeUnit.SECONDS);813}814815static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {816System.out.printf("---- timeout: %d ms%n", unit.toMillis(timeout));817try (Server server = new Server()) {818AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();819ch.connect(server.address()).get();820821ByteBuffer dst = ByteBuffer.allocate(512);822823final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();824825// this read should timeout if value is > 0826ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {827public void completed(Integer result, Void att) {828readException.set(new RuntimeException("Should not complete"));829}830public void failed(Throwable exc, Void att) {831readException.set(exc);832}833});834if (timeout > 0L) {835// wait for exception836while (readException.get() == null) {837Thread.sleep(100);838}839if (!(readException.get() instanceof InterruptedByTimeoutException))840throw new RuntimeException("InterruptedByTimeoutException expected",841readException.get());842843// after a timeout then further reading should throw unspecified runtime exception844boolean exceptionThrown = false;845try {846ch.read(dst);847} catch (RuntimeException x) {848exceptionThrown = true;849}850if (!exceptionThrown)851throw new RuntimeException("RuntimeException expected after timeout.");852} else {853Thread.sleep(1000);854Throwable exc = readException.get();855if (exc != null)856throw new RuntimeException(exc);857}858859final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();860861// write bytes to fill socket buffer862ch.write(genBuffer(), timeout, unit, ch,863new CompletionHandler<Integer,AsynchronousSocketChannel>()864{865public void completed(Integer result, AsynchronousSocketChannel ch) {866ch.write(genBuffer(), timeout, unit, ch, this);867}868public void failed(Throwable exc, AsynchronousSocketChannel ch) {869writeException.set(exc);870}871});872if (timeout > 0) {873// wait for exception874while (writeException.get() == null) {875Thread.sleep(100);876}877if (!(writeException.get() instanceof InterruptedByTimeoutException))878throw new RuntimeException("InterruptedByTimeoutException expected",879writeException.get());880881// after a timeout then further writing should throw unspecified runtime exception882boolean exceptionThrown = false;883try {884ch.write(genBuffer());885} catch (RuntimeException x) {886exceptionThrown = true;887}888if (!exceptionThrown)889throw new RuntimeException("RuntimeException expected after timeout.");890} else {891Thread.sleep(1000);892Throwable exc = writeException.get();893if (exc != null)894throw new RuntimeException(exc);895}896897// clean-up898server.accept().close();899ch.close();900}901}902903// returns ByteBuffer with random bytes904static ByteBuffer genBuffer() {905int size = 1024 + RAND.nextInt(16000);906byte[] buf = new byte[size];907RAND.nextBytes(buf);908boolean useDirect = RAND.nextBoolean();909if (useDirect) {910ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);911bb.put(buf);912bb.flip();913return bb;914} else {915return ByteBuffer.wrap(buf);916}917}918919// return ByteBuffer[] with random bytes920static ByteBuffer[] genBuffers(int max) {921int len = 1;922if (max > 1)923len += RAND.nextInt(max);924ByteBuffer[] bufs = new ByteBuffer[len];925for (int i=0; i<len; i++)926bufs[i] = genBuffer();927return bufs;928}929930// return random SocketAddress931static SocketAddress genSocketAddress() {932StringBuilder sb = new StringBuilder("10.");933sb.append(RAND.nextInt(256));934sb.append('.');935sb.append(RAND.nextInt(256));936sb.append('.');937sb.append(RAND.nextInt(256));938InetAddress rh;939try {940rh = InetAddress.getByName(sb.toString());941} catch (UnknownHostException x) {942throw new InternalError("Should not happen");943}944return new InetSocketAddress(rh, RAND.nextInt(65535)+1);945}946}947948949