Path: blob/master/test/jdk/java/nio/channels/AsyncCloseAndInterrupt.java
41149 views
/*1* Copyright (c) 2002, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 714291925* 8151582 8068693 815320926* @run main/othervm AsyncCloseAndInterrupt27* @key intermittent28* @summary Comprehensive test of asynchronous closing and interruption29* @author Mark Reinhold30*/3132import java.io.*;33import java.net.*;34import java.nio.channels.*;35import java.nio.ByteBuffer;36import java.util.ArrayList;37import java.util.List;38import java.util.concurrent.ExecutorService;39import java.util.concurrent.Executors;40import java.util.concurrent.ThreadFactory;41import java.util.concurrent.Callable;42import java.util.concurrent.Future;43import java.util.concurrent.TimeUnit;4445public class AsyncCloseAndInterrupt {4647static PrintStream log = System.err;4849static void sleep(int ms) {50try {51Thread.sleep(ms);52} catch (InterruptedException x) { }53}5455// Wildcard address localized to this machine -- Windoze doesn't allow56// connecting to a server socket that was previously bound to a true57// wildcard, namely new InetSocketAddress((InetAddress)null, 0).58//59private static InetSocketAddress wildcardAddress;606162// Server socket that blindly accepts all connections6364static ServerSocketChannel acceptor;6566private static void initAcceptor() throws IOException {67acceptor = ServerSocketChannel.open();68acceptor.socket().bind(wildcardAddress);6970Thread th = new Thread("Acceptor") {71public void run() {72try {73for (;;) {74SocketChannel sc = acceptor.accept();75}76} catch (IOException x) {77x.printStackTrace();78}79}80};8182th.setDaemon(true);83th.start();84}858687// Server socket that refuses all connections8889static ServerSocketChannel refuser;9091private static void initRefuser() throws IOException {92refuser = ServerSocketChannel.open();93refuser.bind(wildcardAddress, 1); // use minimum backlog94}9596// Dead pipe source and sink9798static Pipe.SourceChannel deadSource;99static Pipe.SinkChannel deadSink;100101private static void initPipes() throws IOException {102if (deadSource != null)103deadSource.close();104deadSource = Pipe.open().source();105if (deadSink != null)106deadSink.close();107deadSink = Pipe.open().sink();108}109110111// Files112113private static File fifoFile = null; // File that blocks on reads and writes114private static File diskFile = null; // Disk file115116private static void initFile() throws Exception {117118diskFile = File.createTempFile("aci", ".tmp");119diskFile.deleteOnExit();120FileChannel fc = new FileOutputStream(diskFile).getChannel();121buffer.clear();122if (fc.write(buffer) != buffer.capacity())123throw new RuntimeException("Cannot create disk file");124fc.close();125126if (TestUtil.onWindows()) {127log.println("WARNING: Cannot completely test FileChannels on Windows");128return;129}130fifoFile = new File("x.fifo");131if (fifoFile.exists()) {132if (!fifoFile.delete())133throw new IOException("Cannot delete existing fifo " + fifoFile);134}135Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);136if (p.waitFor() != 0)137throw new IOException("Error creating fifo");138new RandomAccessFile(fifoFile, "rw").close();139140}141142143// Channel factories144145static abstract class ChannelFactory {146private final String name;147ChannelFactory(String name) {148this.name = name;149}150public String toString() {151return name;152}153abstract InterruptibleChannel create() throws IOException;154}155156static ChannelFactory socketChannelFactory157= new ChannelFactory("SocketChannel") {158InterruptibleChannel create() throws IOException {159return SocketChannel.open();160}161};162163static ChannelFactory connectedSocketChannelFactory164= new ChannelFactory("SocketChannel") {165InterruptibleChannel create() throws IOException {166SocketAddress sa = acceptor.socket().getLocalSocketAddress();167return SocketChannel.open(sa);168}169};170171static ChannelFactory serverSocketChannelFactory172= new ChannelFactory("ServerSocketChannel") {173InterruptibleChannel create() throws IOException {174ServerSocketChannel ssc = ServerSocketChannel.open();175ssc.socket().bind(wildcardAddress);176return ssc;177}178};179180static ChannelFactory datagramChannelFactory181= new ChannelFactory("DatagramChannel") {182InterruptibleChannel create() throws IOException {183DatagramChannel dc = DatagramChannel.open();184InetAddress lb = InetAddress.getLoopbackAddress();185dc.bind(new InetSocketAddress(lb, 0));186dc.connect(new InetSocketAddress(lb, 80));187return dc;188}189};190191static ChannelFactory pipeSourceChannelFactory192= new ChannelFactory("Pipe.SourceChannel") {193InterruptibleChannel create() throws IOException {194// ## arrange to close sink195return Pipe.open().source();196}197};198199static ChannelFactory pipeSinkChannelFactory200= new ChannelFactory("Pipe.SinkChannel") {201InterruptibleChannel create() throws IOException {202// ## arrange to close source203return Pipe.open().sink();204}205};206207static ChannelFactory fifoFileChannelFactory208= new ChannelFactory("FileChannel") {209InterruptibleChannel create() throws IOException {210return new RandomAccessFile(fifoFile, "rw").getChannel();211}212};213214static ChannelFactory diskFileChannelFactory215= new ChannelFactory("FileChannel") {216InterruptibleChannel create() throws IOException {217return new RandomAccessFile(diskFile, "rw").getChannel();218}219};220221222// I/O operations223224static abstract class Op {225private final String name;226protected Op(String name) {227this.name = name;228}229abstract void doIO(InterruptibleChannel ich) throws IOException;230void setup() throws IOException { }231public String toString() { return name; }232}233234static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);235236static ByteBuffer[] buffers = new ByteBuffer[] {237ByteBuffer.allocateDirect(1 << 19),238ByteBuffer.allocateDirect(1 << 19)239};240241static void clearBuffers() {242buffers[0].clear();243buffers[1].clear();244}245246static void show(Channel ch) {247log.print("Channel " + (ch.isOpen() ? "open" : "closed"));248if (ch.isOpen() && (ch instanceof SocketChannel)) {249SocketChannel sc = (SocketChannel)ch;250if (sc.socket().isInputShutdown())251log.print(", input shutdown");252if (sc.socket().isOutputShutdown())253log.print(", output shutdown");254}255log.println();256}257258static final Op READ = new Op("read") {259void doIO(InterruptibleChannel ich) throws IOException {260ReadableByteChannel rbc = (ReadableByteChannel)ich;261buffer.clear();262int n = rbc.read(buffer);263log.println("Read returned " + n);264show(rbc);265if (rbc.isOpen()266&& (n == -1)267&& (rbc instanceof SocketChannel)268&& ((SocketChannel)rbc).socket().isInputShutdown()) {269return;270}271throw new RuntimeException("Read succeeded");272}273};274275static final Op READV = new Op("readv") {276void doIO(InterruptibleChannel ich) throws IOException {277ScatteringByteChannel sbc = (ScatteringByteChannel)ich;278clearBuffers();279int n = (int)sbc.read(buffers);280log.println("Read returned " + n);281show(sbc);282if (sbc.isOpen()283&& (n == -1)284&& (sbc instanceof SocketChannel)285&& ((SocketChannel)sbc).socket().isInputShutdown()) {286return;287}288throw new RuntimeException("Read succeeded");289}290};291292static final Op RECEIVE = new Op("receive") {293void doIO(InterruptibleChannel ich) throws IOException {294DatagramChannel dc = (DatagramChannel)ich;295buffer.clear();296dc.receive(buffer);297show(dc);298throw new RuntimeException("Read succeeded");299}300};301302static final Op WRITE = new Op("write") {303void doIO(InterruptibleChannel ich) throws IOException {304305WritableByteChannel wbc = (WritableByteChannel)ich;306307SocketChannel sc = null;308if (wbc instanceof SocketChannel)309sc = (SocketChannel)wbc;310311int n = 0;312for (;;) {313buffer.clear();314int d = wbc.write(buffer);315n += d;316if (!wbc.isOpen())317break;318if ((sc != null) && sc.socket().isOutputShutdown())319break;320}321log.println("Wrote " + n + " bytes");322show(wbc);323}324};325326static final Op WRITEV = new Op("writev") {327void doIO(InterruptibleChannel ich) throws IOException {328329GatheringByteChannel gbc = (GatheringByteChannel)ich;330331SocketChannel sc = null;332if (gbc instanceof SocketChannel)333sc = (SocketChannel)gbc;334335int n = 0;336for (;;) {337clearBuffers();338int d = (int)gbc.write(buffers);339n += d;340if (!gbc.isOpen())341break;342if ((sc != null) && sc.socket().isOutputShutdown())343break;344}345log.println("Wrote " + n + " bytes");346show(gbc);347348}349};350351static final Op CONNECT = new Op("connect") {352void setup() {353waitPump("connect waiting for pumping refuser ...");354}355void doIO(InterruptibleChannel ich) throws IOException {356SocketChannel sc = (SocketChannel)ich;357if (sc.connect(refuser.socket().getLocalSocketAddress()))358throw new RuntimeException("Connection succeeded");359throw new RuntimeException("Connection did not block");360}361};362363static final Op FINISH_CONNECT = new Op("finishConnect") {364void setup() {365waitPump("finishConnect waiting for pumping refuser ...");366}367void doIO(InterruptibleChannel ich) throws IOException {368SocketChannel sc = (SocketChannel)ich;369sc.configureBlocking(false);370SocketAddress sa = refuser.socket().getLocalSocketAddress();371if (sc.connect(sa))372throw new RuntimeException("Connection succeeded");373sc.configureBlocking(true);374if (sc.finishConnect())375throw new RuntimeException("Connection succeeded");376throw new RuntimeException("Connection did not block");377}378};379380static final Op ACCEPT = new Op("accept") {381void doIO(InterruptibleChannel ich) throws IOException {382ServerSocketChannel ssc = (ServerSocketChannel)ich;383ssc.accept();384throw new RuntimeException("Accept succeeded");385}386};387388// Use only with diskFileChannelFactory389static final Op TRANSFER_TO = new Op("transferTo") {390void doIO(InterruptibleChannel ich) throws IOException {391FileChannel fc = (FileChannel)ich;392long n = fc.transferTo(0, fc.size(), deadSink);393log.println("Transferred " + n + " bytes");394show(fc);395}396};397398// Use only with diskFileChannelFactory399static final Op TRANSFER_FROM = new Op("transferFrom") {400void doIO(InterruptibleChannel ich) throws IOException {401FileChannel fc = (FileChannel)ich;402long n = fc.transferFrom(deadSource, 0, 1 << 20);403log.println("Transferred " + n + " bytes");404show(fc);405}406};407408409410// Test modes411412static final int TEST_PREINTR = 0; // Interrupt thread before I/O413static final int TEST_INTR = 1; // Interrupt thread during I/O414static final int TEST_CLOSE = 2; // Close channel during I/O415static final int TEST_SHUTI = 3; // Shutdown input during I/O416static final int TEST_SHUTO = 4; // Shutdown output during I/O417418static final String[] testName = new String[] {419"pre-interrupt", "interrupt", "close",420"shutdown-input", "shutdown-output"421};422423424static class Tester extends TestThread {425426private InterruptibleChannel ch;427private Op op;428private int test;429volatile boolean ready = false;430431protected Tester(ChannelFactory cf, InterruptibleChannel ch,432Op op, int test)433{434super(cf + "/" + op + "/" + testName[test]);435this.ch = ch;436this.op = op;437this.test = test;438}439440@SuppressWarnings("fallthrough")441private void caught(Channel ch, IOException x) {442String xn = x.getClass().getName();443switch (test) {444445case TEST_PREINTR:446case TEST_INTR:447if (!xn.equals("java.nio.channels.ClosedByInterruptException"))448throw new RuntimeException("Wrong exception thrown: " + x);449break;450451case TEST_CLOSE:452case TEST_SHUTO:453if (!xn.equals("java.nio.channels.AsynchronousCloseException"))454throw new RuntimeException("Wrong exception thrown: " + x);455break;456457case TEST_SHUTI:458if (TestUtil.onWindows())459break;460// FALL THROUGH461462default:463throw new Error(x);464}465466if (ch.isOpen()) {467if (test == TEST_SHUTO) {468SocketChannel sc = (SocketChannel)ch;469if (!sc.socket().isOutputShutdown())470throw new RuntimeException("Output not shutdown");471} else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {472// Let this case pass -- CBIE applies to other channel473} else {474throw new RuntimeException("Channel still open");475}476}477478log.println("Thrown as expected: " + x);479}480481final void go() throws Exception {482if (test == TEST_PREINTR)483Thread.currentThread().interrupt();484ready = true;485try {486op.doIO(ch);487} catch (ClosedByInterruptException x) {488caught(ch, x);489} catch (AsynchronousCloseException x) {490caught(ch, x);491} finally {492ch.close();493}494}495496}497498private static volatile boolean pumpDone = false;499private static volatile boolean pumpReady = false;500501private static void waitPump(String msg){502log.println(msg);503while (!pumpReady){504sleep(200);505}506log.println(msg + " done");507}508509// Create a pump thread dedicated to saturate refuser's connection backlog510private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {511512Callable<Integer> pumpTask = new Callable<Integer>() {513514@Override515public Integer call() throws IOException {516// Can't reliably saturate connection backlog on Windows Server editions517assert !TestUtil.onWindows();518log.println("Start pumping refuser ...");519List<SocketChannel> refuserClients = new ArrayList<>();520521// Saturate the refuser's connection backlog so that further connection522// attempts will be blocked523pumpReady = false;524while (!pumpDone) {525SocketChannel sc = SocketChannel.open();526sc.configureBlocking(false);527boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());528529// Assume that the connection backlog is saturated if a530// client cannot connect to the refuser within 50 milliseconds531long start = System.currentTimeMillis();532while (!pumpReady && !connected533&& (System.currentTimeMillis() - start < 50)) {534connected = sc.finishConnect();535}536537if (connected) {538// Retain so that finalizer doesn't close539refuserClients.add(sc);540} else {541sc.close();542pumpReady = true;543}544}545546for (SocketChannel sc : refuserClients) {547sc.close();548}549refuser.close();550551log.println("Stop pumping refuser ...");552return refuserClients.size();553}554};555556return pumperExecutor.submit(pumpTask);557}558559// Test560static void test(ChannelFactory cf, Op op, int test) throws Exception {561test(cf, op, test, true);562}563564static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)565throws Exception566{567log.println();568initPipes();569InterruptibleChannel ch = cf.create();570Tester t = new Tester(cf, ch, op, test);571log.println(t);572op.setup();573t.start();574do {575sleep(50);576} while (!t.ready);577578if (extraSleep) {579sleep(100);580}581582switch (test) {583584case TEST_INTR:585t.interrupt();586break;587588case TEST_CLOSE:589ch.close();590break;591592case TEST_SHUTI:593if (TestUtil.onWindows()) {594log.println("WARNING: Asynchronous shutdown not working on Windows");595ch.close();596} else {597((SocketChannel)ch).socket().shutdownInput();598}599break;600601case TEST_SHUTO:602if (TestUtil.onWindows()) {603log.println("WARNING: Asynchronous shutdown not working on Windows");604ch.close();605} else {606((SocketChannel)ch).socket().shutdownOutput();607}608break;609610default:611break;612}613614t.finishAndThrow(10000);615}616617static void test(ChannelFactory cf, Op op) throws Exception {618test(cf, op, true);619}620621static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {622// Test INTR cases before PREINTER cases since sometimes623// interrupted threads can't load classes624test(cf, op, TEST_INTR, extraSleep);625test(cf, op, TEST_PREINTR, extraSleep);626627// Bugs, see FileChannelImpl for details628if (op == TRANSFER_FROM) {629log.println("WARNING: transferFrom/close not tested");630return;631}632if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {633log.println("WARNING: transferTo/close not tested");634return;635}636637test(cf, op, TEST_CLOSE, extraSleep);638}639640static void test(ChannelFactory cf)641throws Exception642{643InterruptibleChannel ch = cf.create(); // Sample channel644ch.close();645646if (ch instanceof ReadableByteChannel) {647test(cf, READ);648if (ch instanceof SocketChannel)649test(cf, READ, TEST_SHUTI);650}651652if (ch instanceof ScatteringByteChannel) {653test(cf, READV);654if (ch instanceof SocketChannel)655test(cf, READV, TEST_SHUTI);656}657658if (ch instanceof DatagramChannel) {659test(cf, RECEIVE);660661// Return here: We can't effectively test writes since, if they662// block, they do so only for a fleeting moment unless the network663// interface is overloaded.664return;665666}667668if (ch instanceof WritableByteChannel) {669test(cf, WRITE);670if (ch instanceof SocketChannel)671test(cf, WRITE, TEST_SHUTO);672}673674if (ch instanceof GatheringByteChannel) {675test(cf, WRITEV);676if (ch instanceof SocketChannel)677test(cf, WRITEV, TEST_SHUTO);678}679680}681682public static void main(String[] args) throws Exception {683684wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);685initAcceptor();686if (!TestUtil.onWindows())687initRefuser();688initPipes();689initFile();690691if (TestUtil.onWindows()) {692log.println("WARNING: Cannot test FileChannel transfer operations"693+ " on Windows");694} else {695test(diskFileChannelFactory, TRANSFER_TO);696test(diskFileChannelFactory, TRANSFER_FROM);697}698if (fifoFile != null)699test(fifoFileChannelFactory);700701// Testing positional file reads and writes is impractical: It requires702// access to a large file soft-mounted via NFS, and even then isn't703// completely guaranteed to work.704//705// Testing map is impractical and arguably unnecessary: It's706// unclear under what conditions mmap(2) will actually block.707708test(connectedSocketChannelFactory);709710if (TestUtil.onWindows()) {711log.println("WARNING Cannot reliably test connect/finishConnect"712+ " operations on this platform");713} else {714// Only the following tests need refuser's connection backlog715// to be saturated716ExecutorService pumperExecutor =717Executors.newSingleThreadExecutor(718new ThreadFactory() {719720@Override721public Thread newThread(Runnable r) {722Thread t = new Thread(r);723t.setDaemon(true);724t.setName("Pumper");725return t;726}727});728729pumpDone = false;730try {731Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);732waitPump("\nWait for initial Pump");733734test(socketChannelFactory, CONNECT, false);735test(socketChannelFactory, FINISH_CONNECT, false);736737pumpDone = true;738Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);739log.println("Pump " + newConn + " connections.");740} finally {741pumperExecutor.shutdown();742}743}744745test(serverSocketChannelFactory, ACCEPT);746test(datagramChannelFactory);747test(pipeSourceChannelFactory);748test(pipeSinkChannelFactory);749}750}751752753