Path: blob/master/test/jdk/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java
41153 views
/*1* Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @bug 6834246 684268725* @summary Stress test connections through the loopback interface26* @run main StressLoopback27* @run main/othervm -Djdk.net.useFastTcpLoopback StressLoopback28* @key randomness29*/3031import java.nio.ByteBuffer;32import java.net.*;33import java.nio.channels.*;34import java.util.Random;35import java.io.IOException;3637public class StressLoopback {38static final Random rand = new Random();3940public static void main(String[] args) throws Exception {41// setup listener42AsynchronousServerSocketChannel listener =43AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));44int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();45InetAddress lh = InetAddress.getLocalHost();46SocketAddress remote = new InetSocketAddress(lh, port);4748// create sources and sinks49int count = 2 + rand.nextInt(9);50Source[] source = new Source[count];51Sink[] sink = new Sink[count];52for (int i=0; i<count; i++) {53AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();54ch.connect(remote).get();55source[i] = new Source(ch);56sink[i] = new Sink(listener.accept().get());57}5859// start the sinks and sources60for (int i=0; i<count; i++) {61sink[i].start();62source[i].start();63}6465// let the test run for a while66Thread.sleep(20*1000);6768// wait until everyone is done69boolean failed = false;70long total = 0L;71for (int i=0; i<count; i++) {72long nwrote = source[i].finish();73long nread = sink[i].finish();74if (nread != nwrote)75failed = true;76System.out.format("%d -> %d (%s)\n",77nwrote, nread, (failed) ? "FAIL" : "PASS");78total += nwrote;79}80if (failed)81throw new RuntimeException("Test failed - see log for details");82System.out.format("Total sent %d MB\n", total / (1024L * 1024L));83}8485/**86* Writes bytes to a channel until "done". When done the channel is closed.87*/88static class Source {89private final AsynchronousByteChannel channel;90private final ByteBuffer sentBuffer;91private volatile long bytesSent;92private volatile boolean finished;9394Source(AsynchronousByteChannel channel) {95this.channel = channel;96int size = 1024 + rand.nextInt(10000);97this.sentBuffer = (rand.nextBoolean()) ?98ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);99}100101void start() {102sentBuffer.position(0);103sentBuffer.limit(sentBuffer.capacity());104channel.write(sentBuffer, (Void)null, new CompletionHandler<Integer,Void> () {105public void completed(Integer nwrote, Void att) {106bytesSent += nwrote;107if (finished) {108closeUnchecked(channel);109} else {110sentBuffer.position(0);111sentBuffer.limit(sentBuffer.capacity());112channel.write(sentBuffer, (Void)null, this);113}114}115public void failed(Throwable exc, Void att) {116exc.printStackTrace();117closeUnchecked(channel);118}119});120}121122long finish() {123finished = true;124waitUntilClosed(channel);125return bytesSent;126}127}128129/**130* Read bytes from a channel until EOF is received.131*/132static class Sink {133private final AsynchronousByteChannel channel;134private final ByteBuffer readBuffer;135private volatile long bytesRead;136137Sink(AsynchronousByteChannel channel) {138this.channel = channel;139int size = 1024 + rand.nextInt(10000);140this.readBuffer = (rand.nextBoolean()) ?141ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);142}143144void start() {145channel.read(readBuffer, (Void)null, new CompletionHandler<Integer,Void> () {146public void completed(Integer nread, Void att) {147if (nread < 0) {148closeUnchecked(channel);149} else {150bytesRead += nread;151readBuffer.clear();152channel.read(readBuffer, (Void)null, this);153}154}155public void failed(Throwable exc, Void att) {156exc.printStackTrace();157closeUnchecked(channel);158}159});160}161162long finish() {163waitUntilClosed(channel);164return bytesRead;165}166}167168static void waitUntilClosed(Channel c) {169while (c.isOpen()) {170try {171Thread.sleep(100);172} catch (InterruptedException ignore) { }173}174}175176static void closeUnchecked(Channel c) {177try {178c.close();179} catch (IOException ignore) { }180}181}182183184