Path: blob/master/test/jdk/java/nio/channels/Selector/SelectorTest.java
41153 views
/*1* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @summary Test selectors and socketchannels25* @library ..26* @key randomness27*/2829import java.io.*;30import java.net.*;31import java.nio.*;32import java.nio.channels.*;33import java.nio.channels.spi.SelectorProvider;34import java.util.*;353637public class SelectorTest {38private static List clientList = new LinkedList();39private static Random rnd = new Random();40public static int NUM_CLIENTS = 30;41public static int TEST_PORT = 31452;42static PrintStream log = System.err;43private static int FINISH_TIME = 30000;4445/*46* Usage note47*48* java SelectorTest [server] [client <host>] [<port>]49*50* No arguments runs both client and server in separate threads51* using the default port of 31452.52*53* client runs the client on this machine and connects to server54* at the given IP address.55*56* server runs the server on localhost.57*/58public static void main(String[] args) throws Exception {59if (args.length == 0) {60Server server = new Server(0);61server.start();62try {63Thread.sleep(1000);64} catch (InterruptedException e) { }65InetSocketAddress isa66= new InetSocketAddress(InetAddress.getLocalHost(), server.port());67Client client = new Client(isa);68client.start();69if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)70throw new Exception("Failure");71log.println();7273} else if (args[0].equals("server")) {7475if (args.length > 1)76TEST_PORT = Integer.parseInt(args[1]);77Server server = new Server(TEST_PORT);78server.start();79if (server.finish(FINISH_TIME) == 0)80throw new Exception("Failure");81log.println();8283} else if (args[0].equals("client")) {8485if (args.length < 2) {86log.println("No host specified: terminating.");87return;88}89String ip = args[1];90if (args.length > 2)91TEST_PORT = Integer.parseInt(args[2]);92InetAddress ia = InetAddress.getByName(ip);93InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);94Client client = new Client(isa);95client.start();96if (client.finish(FINISH_TIME) == 0)97throw new Exception("Failure");98log.println();99100} else {101System.out.println("Usage note:");102System.out.println("java SelectorTest [server] [client <host>] [<port>]");103System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");104System.out.println("client runs the client on this machine and connects to the server specified.");105System.out.println("server runs the server on localhost.");106}107}108109static class Client extends TestThread {110InetSocketAddress isa;111Client(InetSocketAddress isa) {112super("Client", SelectorTest.log);113this.isa = isa;114}115116public void go() throws Exception {117log.println("starting client...");118for (int i=0; i<NUM_CLIENTS; i++)119clientList.add(new RemoteEntity(i, isa, log));120121Collections.shuffle(clientList);122123log.println("created "+NUM_CLIENTS+" clients");124do {125for (Iterator i = clientList.iterator(); i.hasNext(); ) {126RemoteEntity re = (RemoteEntity) i.next();127if (re.cycle()) {128i.remove();129}130}131Collections.shuffle(clientList);132} while (clientList.size() > 0);133}134}135136static class Server extends TestThread {137private final ServerSocketChannel ssc;138private List socketList = new ArrayList();139private ServerSocket ss;140private int connectionsAccepted = 0;141private Selector pollSelector;142private Selector acceptSelector;143private Set pkeys;144private Set pskeys;145146Server(int port) throws IOException {147super("Server", SelectorTest.log);148this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port));149}150151int port() {152return ssc.socket().getLocalPort();153}154155public void go() throws Exception {156log.println("starting server...");157acceptSelector = SelectorProvider.provider().openSelector();158pollSelector = SelectorProvider.provider().openSelector();159pkeys = pollSelector.keys();160pskeys = pollSelector.selectedKeys();161Set readyKeys = acceptSelector.selectedKeys();162RequestHandler rh = new RequestHandler(pollSelector, log);163Thread requestThread = new Thread(rh);164165requestThread.start();166167ssc.configureBlocking(false);168SelectionKey acceptKey = ssc.register(acceptSelector,169SelectionKey.OP_ACCEPT);170while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {171int keysAdded = acceptSelector.select(100);172if (keysAdded > 0) {173Iterator i = readyKeys.iterator();174while(i.hasNext()) {175SelectionKey sk = (SelectionKey)i.next();176i.remove();177ServerSocketChannel nextReady =178(ServerSocketChannel)sk.channel();179SocketChannel sc = nextReady.accept();180connectionsAccepted++;181if (sc != null) {182sc.configureBlocking(false);183synchronized (pkeys) {184sc.register(pollSelector, SelectionKey.OP_READ);185}186} else {187throw new RuntimeException(188"Socket does not support Channels");189}190}191}192}193acceptKey.cancel();194requestThread.join();195acceptSelector.close();196pollSelector.close();197}198}199}200201class RemoteEntity {202private static Random rnd = new Random();203int id;204ByteBuffer data;205int dataWrittenIndex;206int totalDataLength;207boolean initiated = false;208boolean connected = false;209boolean written = false;210boolean acked = false;211boolean closed = false;212private SocketChannel sc;213ByteBuffer ackBuffer;214PrintStream log;215InetSocketAddress server;216217RemoteEntity(int id, InetSocketAddress server, PrintStream log)218throws Exception219{220int connectFailures = 0;221this.id = id;222this.log = log;223this.server = server;224225sc = SocketChannel.open();226sc.configureBlocking(false);227228// Prepare the data buffer to write out from this entity229// Let's use both slow and fast buffers230if (rnd.nextBoolean())231data = ByteBuffer.allocateDirect(100);232else233data = ByteBuffer.allocate(100);234String number = Integer.toString(id);235if (number.length() == 1)236number = "0"+number;237String source = "Testing from " + number;238data.put(source.getBytes("8859_1"));239data.flip();240totalDataLength = source.length();241242// Allocate an ack buffer243ackBuffer = ByteBuffer.allocateDirect(10);244}245246private void reset() throws Exception {247sc.close();248sc = SocketChannel.open();249sc.configureBlocking(false);250}251252private void connect() throws Exception {253try {254connected = sc.connect(server);255initiated = true;256} catch (ConnectException e) {257initiated = false;258reset();259}260}261262private void finishConnect() throws Exception {263try {264connected = sc.finishConnect();265} catch (IOException e) {266initiated = false;267reset();268}269}270271int id() {272return id;273}274275boolean cycle() throws Exception {276if (!initiated)277connect();278else if (!connected)279finishConnect();280else if (!written)281writeCycle();282else if (!acked)283ackCycle();284else if (!closed)285close();286return closed;287}288289private void ackCycle() throws Exception {290//log.println("acking from "+id);291int bytesRead = sc.read(ackBuffer);292if (bytesRead > 0) {293acked = true;294}295}296297private void close() throws Exception {298sc.close();299closed = true;300}301302private void writeCycle() throws Exception {303log.println("writing from "+id);304int numBytesToWrite = rnd.nextInt(10)+1;305int newWriteTarget = dataWrittenIndex + numBytesToWrite;306if (newWriteTarget > totalDataLength)307newWriteTarget = totalDataLength;308data.limit(newWriteTarget);309int bytesWritten = sc.write(data);310if (bytesWritten > 0)311dataWrittenIndex += bytesWritten;312if (dataWrittenIndex == totalDataLength) {313written = true;314sc.socket().shutdownOutput();315}316}317318}319320321class RequestHandler implements Runnable {322private static Random rnd = new Random();323private Selector selector;324private int connectionsHandled = 0;325private HashMap dataBin = new HashMap();326PrintStream log;327328public RequestHandler(Selector selector, PrintStream log) {329this.selector = selector;330this.log = log;331}332333public void run() {334log.println("starting request handler...");335int connectionsAccepted = 0;336337Set nKeys = selector.keys();338Set readyKeys = selector.selectedKeys();339340try {341while(connectionsHandled < SelectorTest.NUM_CLIENTS) {342int numKeys = selector.select(100);343344// Process channels with data345synchronized (nKeys) {346if (readyKeys.size() > 0) {347Iterator i = readyKeys.iterator();348while(i.hasNext()) {349SelectionKey sk = (SelectionKey)i.next();350i.remove();351SocketChannel sc = (SocketChannel)sk.channel();352if (sc.isOpen())353read(sk, sc);354}355}356}357358// Give other threads a chance to run359if (numKeys == 0) {360try {361Thread.sleep(1);362} catch (Exception x) {}363}364}365} catch (Exception e) {366log.println("Unexpected error 1: "+e);367e.printStackTrace();368}369}370371private void read(SelectionKey sk, SocketChannel sc) throws Exception {372ByteBuffer bin = (ByteBuffer)dataBin.get(sc);373if (bin == null) {374if (rnd.nextBoolean())375bin = ByteBuffer.allocateDirect(100);376else377bin = ByteBuffer.allocate(100);378dataBin.put(sc, bin);379}380381int bytesRead = 0;382do {383bytesRead = sc.read(bin);384} while(bytesRead > 0);385386if (bytesRead == -1) {387sk.interestOps(0);388bin.flip();389int size = bin.limit();390byte[] data = new byte[size];391for(int j=0; j<size; j++)392data[j] = bin.get();393String message = new String(data, "8859_1");394connectionsHandled++;395acknowledge(sc);396log.println("Received >>>"+message + "<<<");397log.println("Handled: "+connectionsHandled);398}399}400401private void acknowledge(SocketChannel sc) throws Exception {402ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);403String s = "ack";404ackBuffer.put(s.getBytes("8859_1"));405ackBuffer.flip();406int bytesWritten = 0;407while(bytesWritten == 0) {408bytesWritten += sc.write(ackBuffer);409}410sc.close();411}412}413414415