Path: blob/master/test/jdk/java/nio/channels/AsynchronousChannelGroup/Identity.java
41152 views
/*1* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @bug 4607272 684268725* @summary Unit test for AsynchronousChannelGroup26* @key randomness27*/2829import java.nio.ByteBuffer;30import java.nio.channels.*;31import java.net.*;32import java.util.*;33import java.util.concurrent.*;34import java.util.concurrent.atomic.*;35import java.io.IOException;3637/**38* Tests that the completion handler is invoked by a thread with39* the expected identity.40*/4142public class Identity {43static final Random rand = new Random();44static final CountDownLatch done = new CountDownLatch(1);45static final AtomicBoolean failed = new AtomicBoolean(false);4647static void fail(String msg) {48failed.set(true);49done.countDown();50throw new RuntimeException(msg);51}5253// thread-local identifies the thread54private static final ThreadLocal<Integer> myGroup =55new ThreadLocal<Integer>() {56@Override protected Integer initialValue() {57return Integer.valueOf(-1);58}59};6061// creates a ThreadFactory that constructs groups with the given identity62static final ThreadFactory createThreadFactory(final int groupId) {63return new ThreadFactory() {64@Override65public Thread newThread(final Runnable r) {66Thread t = new Thread(new Runnable() {67public void run() {68myGroup.set(groupId);69r.run();70}});71t.setDaemon(true);72return t;73}74};75}7677public static void main(String[] args) throws Exception {78// create 3-10 channels, each in its own group79final int groupCount = 3 + rand.nextInt(8);80final AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount];81final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount];8283// create listener to accept connections84try (final AsynchronousServerSocketChannel listener =85AsynchronousServerSocketChannel.open()) {8687listener.bind(new InetSocketAddress(0));88listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {89public void completed(final AsynchronousSocketChannel ch, Void att) {90listener.accept((Void)null, this);91final ByteBuffer buf = ByteBuffer.allocate(100);92ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {93public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {94if (bytesRead < 0) {95try { ch.close(); } catch (IOException ignore) { }96} else {97buf.clear();98ch.read(buf, ch, this);99}100}101public void failed(Throwable exc, AsynchronousSocketChannel ch) {102try { ch.close(); } catch (IOException ignore) { }103}104});105}106public void failed(Throwable exc, Void att) {107}108});109int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();110SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);111112for (int i=0; i<groupCount; i++) {113ThreadFactory factory = createThreadFactory(i);114AsynchronousChannelGroup group;115if (rand.nextBoolean()) {116int nThreads = 1 + rand.nextInt(10);117group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);118} else {119ExecutorService pool = Executors.newCachedThreadPool(factory);120group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));121}122groups[i] = group;123124// create channel in group and connect it to the server125AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group);126ch.connect(sa).get();127channels[i] = ch;128}129130// randomly write to each channel, ensuring that the completion handler131// is always invoked by a thread with the right identity.132final AtomicInteger writeCount = new AtomicInteger(100);133channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() {134public void completed(Integer bytesWritten, Integer groupId) {135if (bytesWritten != 1)136fail("Expected 1 byte to be written");137if (!myGroup.get().equals(groupId))138fail("Handler invoked by thread with the wrong identity");139if (writeCount.decrementAndGet() > 0) {140int id = rand.nextInt(groupCount);141channels[id].write(getBuffer(), id, this);142} else {143done.countDown();144}145}146public void failed(Throwable exc, Integer groupId) {147fail(exc.getMessage());148}149});150151// wait until done152done.await();153} finally {154// clean-up155for (AsynchronousSocketChannel ch: channels)156ch.close();157for (AsynchronousChannelGroup group: groups)158group.shutdownNow();159160if (failed.get())161throw new RuntimeException("Test failed - see log for details");162}163}164165static ByteBuffer getBuffer() {166ByteBuffer buf;167if (rand.nextBoolean()) {168buf = ByteBuffer.allocateDirect(1);169} else {170buf = ByteBuffer.allocate(1);171}172buf.put((byte)0);173buf.flip();174return buf;175}176}177178179