Path: blob/master/test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
41153 views
/*1* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/* @test24* @summary Unit test for Selector.select/selectNow(Consumer)25* @bug 8199433 820878026* @run testng SelectWithConsumer27*/2829/* @test30* @requires (os.family == "windows")31* @run testng/othervm -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.WindowsSelectorProvider SelectWithConsumer32*/3334import java.io.Closeable;35import java.io.IOException;36import java.net.InetSocketAddress;37import java.nio.ByteBuffer;38import java.nio.channels.ClosedSelectorException;39import java.nio.channels.Pipe;40import java.nio.channels.SelectionKey;41import java.nio.channels.Selector;42import java.nio.channels.ServerSocketChannel;43import java.nio.channels.SocketChannel;44import java.nio.channels.WritableByteChannel;45import java.util.concurrent.Executors;46import java.util.concurrent.ScheduledExecutorService;47import java.util.concurrent.TimeUnit;48import java.util.concurrent.atomic.AtomicInteger;49import static java.util.concurrent.TimeUnit.*;5051import org.testng.annotations.AfterTest;52import org.testng.annotations.Test;53import static org.testng.Assert.*;5455@Test56public class SelectWithConsumer {5758/**59* Invoke the select methods that take an action and check that the60* accumulated ready ops notified to the action matches the expected ops.61*/62void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {63var callerThread = Thread.currentThread();64var sel = key.selector();65var interestOps = key.interestOps();66var notifiedOps = new AtomicInteger();6768if (expectedOps == 0) {69// ensure select(Consumer) does not block indefinitely70sel.wakeup();71} else {72// ensure that the channel is ready for all expected operations73sel.select();74while ((key.readyOps() & interestOps) != expectedOps) {75Thread.sleep(100);76sel.select();77}78}7980// select(Consumer)81notifiedOps.set(0);82int n = sel.select(k -> {83assertTrue(Thread.currentThread() == callerThread);84assertTrue(k == key);85int readyOps = key.readyOps();86assertTrue((readyOps & interestOps) != 0);87assertTrue((readyOps & notifiedOps.get()) == 0);88notifiedOps.set(notifiedOps.get() | readyOps);89});90assertTrue((n == 1) ^ (expectedOps == 0));91assertTrue(notifiedOps.get() == expectedOps);9293// select(Consumer, timeout)94notifiedOps.set(0);95n = sel.select(k -> {96assertTrue(Thread.currentThread() == callerThread);97assertTrue(k == key);98int readyOps = key.readyOps();99assertTrue((readyOps & interestOps) != 0);100assertTrue((readyOps & notifiedOps.get()) == 0);101notifiedOps.set(notifiedOps.get() | readyOps);102}, 1000);103assertTrue((n == 1) ^ (expectedOps == 0));104assertTrue(notifiedOps.get() == expectedOps);105106// selectNow(Consumer)107notifiedOps.set(0);108n = sel.selectNow(k -> {109assertTrue(Thread.currentThread() == callerThread);110assertTrue(k == key);111int readyOps = key.readyOps();112assertTrue((readyOps & interestOps) != 0);113assertTrue((readyOps & notifiedOps.get()) == 0);114notifiedOps.set(notifiedOps.get() | readyOps);115});116assertTrue((n == 1) ^ (expectedOps == 0));117assertTrue(notifiedOps.get() == expectedOps);118}119120/**121* Test that an action is performed when a channel is ready for reading.122*/123public void testReadable() throws Exception {124Pipe p = Pipe.open();125try (Selector sel = Selector.open()) {126Pipe.SinkChannel sink = p.sink();127Pipe.SourceChannel source = p.source();128source.configureBlocking(false);129SelectionKey key = source.register(sel, SelectionKey.OP_READ);130131// write to sink to ensure source is readable132scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);133134// test that action is invoked135testActionInvoked(key, SelectionKey.OP_READ);136} finally {137closePipe(p);138}139}140141/**142* Test that an action is performed when a channel is ready for writing.143*/144public void testWritable() throws Exception {145Pipe p = Pipe.open();146try (Selector sel = Selector.open()) {147Pipe.SourceChannel source = p.source();148Pipe.SinkChannel sink = p.sink();149sink.configureBlocking(false);150SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);151152// test that action is invoked153testActionInvoked(key, SelectionKey.OP_WRITE);154} finally {155closePipe(p);156}157}158159/**160* Test that an action is performed when a channel is ready for both161* reading and writing.162*/163public void testReadableAndWriteable() throws Exception {164ServerSocketChannel ssc = null;165SocketChannel sc = null;166SocketChannel peer = null;167try (Selector sel = Selector.open()) {168ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));169sc = SocketChannel.open(ssc.getLocalAddress());170sc.configureBlocking(false);171SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |172SelectionKey.OP_WRITE));173174// accept connection and write data so the source is readable175peer = ssc.accept();176peer.write(messageBuffer());177178// test that action is invoked179testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));180} finally {181if (ssc != null) ssc.close();182if (sc != null) sc.close();183if (peer != null) peer.close();184}185}186187/**188* Test that the action is called for two selected channels189*/190public void testTwoChannels() throws Exception {191Pipe p = Pipe.open();192try (Selector sel = Selector.open()) {193Pipe.SourceChannel source = p.source();194Pipe.SinkChannel sink = p.sink();195source.configureBlocking(false);196sink.configureBlocking(false);197SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);198SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);199200// write to sink to ensure that the source is readable201sink.write(messageBuffer());202203// wait for key1 to be readable204sel.select();205assertTrue(key2.isWritable());206while (!key1.isReadable()) {207Thread.sleep(20);208sel.select();209}210211var counter = new AtomicInteger();212213// select(Consumer)214counter.set(0);215int n = sel.select(k -> {216assertTrue(k == key1 || k == key2);217counter.incrementAndGet();218});219assertTrue(n == 2);220assertTrue(counter.get() == 2);221222// select(Consumer, timeout)223counter.set(0);224n = sel.select(k -> {225assertTrue(k == key1 || k == key2);226counter.incrementAndGet();227}, 1000);228assertTrue(n == 2);229assertTrue(counter.get() == 2);230231// selectNow(Consumer)232counter.set(0);233n = sel.selectNow(k -> {234assertTrue(k == key1 || k == key2);235counter.incrementAndGet();236});237assertTrue(n == 2);238assertTrue(counter.get() == 2);239} finally {240closePipe(p);241}242}243244/**245* Test calling select twice, the action should be invoked each time246*/247public void testRepeatedSelect1() throws Exception {248Pipe p = Pipe.open();249try (Selector sel = Selector.open()) {250Pipe.SourceChannel source = p.source();251Pipe.SinkChannel sink = p.sink();252source.configureBlocking(false);253SelectionKey key = source.register(sel, SelectionKey.OP_READ);254255// write to sink to ensure that the source is readable256sink.write(messageBuffer());257258// test that action is invoked259testActionInvoked(key, SelectionKey.OP_READ);260testActionInvoked(key, SelectionKey.OP_READ);261262} finally {263closePipe(p);264}265}266267/**268* Test calling select twice. An I/O operation is performed after the269* first select so the channel will not be selected by the second select.270*/271public void testRepeatedSelect2() throws Exception {272Pipe p = Pipe.open();273try (Selector sel = Selector.open()) {274Pipe.SourceChannel source = p.source();275Pipe.SinkChannel sink = p.sink();276source.configureBlocking(false);277SelectionKey key = source.register(sel, SelectionKey.OP_READ);278279// write to sink to ensure that the source is readable280sink.write(messageBuffer());281282// test that action is invoked283testActionInvoked(key, SelectionKey.OP_READ);284285// read all bytes286int n;287ByteBuffer bb = ByteBuffer.allocate(100);288do {289n = source.read(bb);290bb.clear();291} while (n > 0);292293// test that action is not invoked294testActionInvoked(key, 0);295} finally {296closePipe(p);297}298}299300/**301* Test timeout302*/303public void testTimeout() throws Exception {304Pipe p = Pipe.open();305try (Selector sel = Selector.open()) {306Pipe.SourceChannel source = p.source();307Pipe.SinkChannel sink = p.sink();308source.configureBlocking(false);309source.register(sel, SelectionKey.OP_READ);310long start = System.currentTimeMillis();311int n = sel.select(k -> assertTrue(false), 1000L);312long duration = System.currentTimeMillis() - start;313assertTrue(n == 0);314assertTrue(duration > 500, "select took " + duration + " ms");315} finally {316closePipe(p);317}318}319320/**321* Test wakeup prior to select322*/323public void testWakeupBeforeSelect() throws Exception {324// select(Consumer)325try (Selector sel = Selector.open()) {326sel.wakeup();327int n = sel.select(k -> assertTrue(false));328assertTrue(n == 0);329}330331// select(Consumer, timeout)332try (Selector sel = Selector.open()) {333sel.wakeup();334long start = System.currentTimeMillis();335int n = sel.select(k -> assertTrue(false), 60*1000);336long duration = System.currentTimeMillis() - start;337assertTrue(n == 0);338assertTrue(duration < 5000, "select took " + duration + " ms");339}340}341342/**343* Test wakeup during select344*/345public void testWakeupDuringSelect() throws Exception {346// select(Consumer)347try (Selector sel = Selector.open()) {348scheduleWakeup(sel, 1, SECONDS);349int n = sel.select(k -> assertTrue(false));350assertTrue(n == 0);351}352353// select(Consumer, timeout)354try (Selector sel = Selector.open()) {355scheduleWakeup(sel, 1, SECONDS);356long start = System.currentTimeMillis();357int n = sel.select(k -> assertTrue(false), 60*1000);358long duration = System.currentTimeMillis() - start;359assertTrue(n == 0);360assertTrue(duration > 500 && duration < 10*1000,361"select took " + duration + " ms");362}363}364365/**366* Test invoking select with interrupt status set367*/368public void testInterruptBeforeSelect() throws Exception {369// select(Consumer)370try (Selector sel = Selector.open()) {371Thread.currentThread().interrupt();372int n = sel.select(k -> assertTrue(false));373assertTrue(n == 0);374assertTrue(Thread.currentThread().isInterrupted());375assertTrue(sel.isOpen());376} finally {377Thread.currentThread().interrupted(); // clear interrupt status378}379380// select(Consumer, timeout)381try (Selector sel = Selector.open()) {382Thread.currentThread().interrupt();383long start = System.currentTimeMillis();384int n = sel.select(k -> assertTrue(false), 60*1000);385long duration = System.currentTimeMillis() - start;386assertTrue(n == 0);387assertTrue(duration < 5000, "select took " + duration + " ms");388assertTrue(Thread.currentThread().isInterrupted());389assertTrue(sel.isOpen());390} finally {391Thread.currentThread().interrupted(); // clear interrupt status392}393}394395/**396* Test interrupt thread during select397*/398public void testInterruptDuringSelect() throws Exception {399// select(Consumer)400try (Selector sel = Selector.open()) {401scheduleInterrupt(Thread.currentThread(), 1, SECONDS);402int n = sel.select(k -> assertTrue(false));403assertTrue(n == 0);404assertTrue(Thread.currentThread().isInterrupted());405assertTrue(sel.isOpen());406} finally {407Thread.currentThread().interrupted(); // clear interrupt status408}409410// select(Consumer, timeout)411try (Selector sel = Selector.open()) {412scheduleInterrupt(Thread.currentThread(), 1, SECONDS);413long start = System.currentTimeMillis();414int n = sel.select(k -> assertTrue(false), 60*1000);415long duration = System.currentTimeMillis() - start;416assertTrue(n == 0);417assertTrue(Thread.currentThread().isInterrupted());418assertTrue(sel.isOpen());419} finally {420Thread.currentThread().interrupted(); // clear interrupt status421}422}423424/**425* Test invoking select on a closed selector426*/427@Test(expectedExceptions = ClosedSelectorException.class)428public void testClosedSelector1() throws Exception {429Selector sel = Selector.open();430sel.close();431sel.select(k -> assertTrue(false));432}433@Test(expectedExceptions = ClosedSelectorException.class)434public void testClosedSelector2() throws Exception {435Selector sel = Selector.open();436sel.close();437sel.select(k -> assertTrue(false), 1000);438}439@Test(expectedExceptions = ClosedSelectorException.class)440public void testClosedSelector3() throws Exception {441Selector sel = Selector.open();442sel.close();443sel.selectNow(k -> assertTrue(false));444}445446/**447* Test closing selector while in a selection operation448*/449public void testCloseDuringSelect() throws Exception {450// select(Consumer)451try (Selector sel = Selector.open()) {452scheduleClose(sel, 3, SECONDS);453int n = sel.select(k -> assertTrue(false));454assertTrue(n == 0);455assertFalse(sel.isOpen());456}457458// select(Consumer, timeout)459try (Selector sel = Selector.open()) {460long before = System.nanoTime();461scheduleClose(sel, 3, SECONDS);462long start = System.nanoTime();463int n = sel.select(k -> assertTrue(false), 60*1000);464long after = System.nanoTime();465long selectDuration = (after - start) / 1000000;466long scheduleDuration = (start - before) / 1000000;467assertTrue(n == 0);468assertTrue(selectDuration > 2000 && selectDuration < 10*1000,469"select took " + selectDuration + " ms schedule took " +470scheduleDuration + " ms");471assertFalse(sel.isOpen());472}473}474475/**476* Test action closing selector477*/478@Test(expectedExceptions = ClosedSelectorException.class)479public void testActionClosingSelector() throws Exception {480Pipe p = Pipe.open();481try (Selector sel = Selector.open()) {482Pipe.SourceChannel source = p.source();483Pipe.SinkChannel sink = p.sink();484source.configureBlocking(false);485SelectionKey key = source.register(sel, SelectionKey.OP_READ);486487// write to sink to ensure that the source is readable488sink.write(messageBuffer());489490// should relay ClosedSelectorException491sel.select(k -> {492assertTrue(k == key);493try {494sel.close();495} catch (IOException ioe) { }496});497} finally {498closePipe(p);499}500}501502/**503* Test that the action is invoked while synchronized on the selector and504* its selected-key set.505*/506public void testLocks() throws Exception {507Pipe p = Pipe.open();508try (Selector sel = Selector.open()) {509Pipe.SourceChannel source = p.source();510Pipe.SinkChannel sink = p.sink();511source.configureBlocking(false);512SelectionKey key = source.register(sel, SelectionKey.OP_READ);513514// write to sink to ensure that the source is readable515sink.write(messageBuffer());516517// select(Consumer)518sel.select(k -> {519assertTrue(k == key);520assertTrue(Thread.holdsLock(sel));521assertFalse(Thread.holdsLock(sel.keys()));522assertTrue(Thread.holdsLock(sel.selectedKeys()));523});524525// select(Consumer, timeout)526sel.select(k -> {527assertTrue(k == key);528assertTrue(Thread.holdsLock(sel));529assertFalse(Thread.holdsLock(sel.keys()));530assertTrue(Thread.holdsLock(sel.selectedKeys()));531}, 1000L);532533// selectNow(Consumer)534sel.selectNow(k -> {535assertTrue(k == key);536assertTrue(Thread.holdsLock(sel));537assertFalse(Thread.holdsLock(sel.keys()));538assertTrue(Thread.holdsLock(sel.selectedKeys()));539});540} finally {541closePipe(p);542}543}544545/**546* Test that selection operations remove cancelled keys from the selector's547* key and selected-key sets.548*/549public void testCancel() throws Exception {550Pipe p = Pipe.open();551try (Selector sel = Selector.open()) {552Pipe.SinkChannel sink = p.sink();553Pipe.SourceChannel source = p.source();554555// write to sink to ensure that the source is readable556sink.write(messageBuffer());557558source.configureBlocking(false);559SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);560// make sure pipe source is readable before we do following checks.561// this is sometime necessary on windows where pipe is implemented562// as a pair of connected socket, so there is no guarantee that written563// bytes on sink side is immediately available on source side.564sel.select();565566sink.configureBlocking(false);567SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);568sel.selectNow();569570assertTrue(sel.keys().contains(key1));571assertTrue(sel.keys().contains(key2));572assertTrue(sel.selectedKeys().contains(key1));573assertTrue(sel.selectedKeys().contains(key2));574575// cancel key1576key1.cancel();577int n = sel.selectNow(k -> assertTrue(k == key2));578assertTrue(n == 1);579assertFalse(sel.keys().contains(key1));580assertTrue(sel.keys().contains(key2));581assertFalse(sel.selectedKeys().contains(key1));582assertTrue(sel.selectedKeys().contains(key2));583584// cancel key2585key2.cancel();586n = sel.selectNow(k -> assertTrue(false));587assertTrue(n == 0);588assertFalse(sel.keys().contains(key1));589assertFalse(sel.keys().contains(key2));590assertFalse(sel.selectedKeys().contains(key1));591assertFalse(sel.selectedKeys().contains(key2));592} finally {593closePipe(p);594}595}596597/**598* Test an action invoking select()599*/600public void testReentrantSelect1() throws Exception {601Pipe p = Pipe.open();602try (Selector sel = Selector.open()) {603Pipe.SinkChannel sink = p.sink();604Pipe.SourceChannel source = p.source();605source.configureBlocking(false);606source.register(sel, SelectionKey.OP_READ);607608// write to sink to ensure that the source is readable609scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);610611int n = sel.select(k -> {612try {613sel.select();614assertTrue(false);615} catch (IOException ioe) {616throw new RuntimeException(ioe);617} catch (IllegalStateException expected) {618}619});620assertTrue(n == 1);621} finally {622closePipe(p);623}624}625626/**627* Test an action invoking selectNow()628*/629public void testReentrantSelect2() throws Exception {630Pipe p = Pipe.open();631try (Selector sel = Selector.open()) {632Pipe.SinkChannel sink = p.sink();633Pipe.SourceChannel source = p.source();634635// write to sink to ensure that the source is readable636scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);637638source.configureBlocking(false);639source.register(sel, SelectionKey.OP_READ);640int n = sel.select(k -> {641try {642sel.selectNow();643assertTrue(false);644} catch (IOException ioe) {645throw new RuntimeException(ioe);646} catch (IllegalStateException expected) {647}648});649assertTrue(n == 1);650} finally {651closePipe(p);652}653}654655/**656* Test an action invoking select(Consumer)657*/658public void testReentrantSelect3() throws Exception {659Pipe p = Pipe.open();660try (Selector sel = Selector.open()) {661Pipe.SinkChannel sink = p.sink();662Pipe.SourceChannel source = p.source();663664// write to sink to ensure that the source is readable665scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);666667source.configureBlocking(false);668source.register(sel, SelectionKey.OP_READ);669int n = sel.select(k -> {670try {671sel.select(x -> assertTrue(false));672assertTrue(false);673} catch (IOException ioe) {674throw new RuntimeException(ioe);675} catch (IllegalStateException expected) {676}677});678assertTrue(n == 1);679} finally {680closePipe(p);681}682}683684/**685* Negative timeout686*/687@Test(expectedExceptions = IllegalArgumentException.class)688public void testNegativeTimeout() throws Exception {689try (Selector sel = Selector.open()) {690sel.select(k -> { }, -1L);691}692}693694/**695* Null action696*/697@Test(expectedExceptions = NullPointerException.class)698public void testNull1() throws Exception {699try (Selector sel = Selector.open()) {700sel.select(null);701}702}703@Test(expectedExceptions = NullPointerException.class)704public void testNull2() throws Exception {705try (Selector sel = Selector.open()) {706sel.select(null, 1000);707}708}709@Test(expectedExceptions = NullPointerException.class)710public void testNull3() throws Exception {711try (Selector sel = Selector.open()) {712sel.selectNow(null);713}714}715716717// -- support methods ---718719private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);720721@AfterTest722void shutdownThreadPool() {723POOL.shutdown();724}725726void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {727POOL.schedule(() -> sel.wakeup(), delay, unit);728}729730void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {731POOL.schedule(() -> t.interrupt(), delay, unit);732}733734void scheduleClose(Closeable c, long delay, TimeUnit unit) {735POOL.schedule(() -> {736try {737c.close();738} catch (IOException ioe) {739ioe.printStackTrace();740}741}, delay, unit);742}743744void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {745POOL.schedule(() -> {746try {747sink.write(buf);748} catch (IOException ioe) {749ioe.printStackTrace();750}751}, delay, unit);752}753754static void closePipe(Pipe p) {755try { p.sink().close(); } catch (IOException ignore) { }756try { p.source().close(); } catch (IOException ignore) { }757}758759static ByteBuffer messageBuffer() {760try {761return ByteBuffer.wrap("message".getBytes("UTF-8"));762} catch (Exception e) {763throw new RuntimeException(e);764}765}766}767768769