Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/nio/channels/AsynchronousSocketChannel/Basic.java
41153 views
1
/*
2
* Copyright (c) 2008, 2020, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/* @test
25
* @bug 4607272 6842687 6878369 6944810 7023403
26
* @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed)
27
* @library /test/lib
28
* @modules jdk.net
29
* @key randomness intermittent
30
* @build jdk.test.lib.RandomFactory jdk.test.lib.Utils
31
* @run main/othervm/timeout=600 Basic -skipSlowConnectTest
32
*/
33
34
import java.io.Closeable;
35
import java.io.IOException;
36
import java.net.*;
37
import static java.net.StandardSocketOptions.*;
38
import java.nio.ByteBuffer;
39
import java.nio.channels.*;
40
import java.util.Random;
41
import java.util.Set;
42
import java.util.concurrent.*;
43
import java.util.concurrent.atomic.*;
44
import jdk.test.lib.RandomFactory;
45
import java.util.List;
46
import static jdk.net.ExtendedSocketOptions.TCP_KEEPCOUNT;
47
import static jdk.net.ExtendedSocketOptions.TCP_KEEPIDLE;
48
import static jdk.net.ExtendedSocketOptions.TCP_KEEPINTERVAL;
49
50
public class Basic {
51
private static final Random RAND = RandomFactory.getRandom();
52
53
static boolean skipSlowConnectTest = false;
54
55
public static void main(String[] args) throws Exception {
56
for (String arg: args) {
57
switch (arg) {
58
case "-skipSlowConnectTest" :
59
skipSlowConnectTest = true;
60
break;
61
default:
62
throw new RuntimeException("Unrecognized argument: " + arg);
63
}
64
}
65
66
testBind();
67
testSocketOptions();
68
testConnect();
69
testCloseWhenPending();
70
testCancel();
71
testRead1();
72
testRead2();
73
testRead3();
74
testWrite1();
75
testWrite2();
76
// skip timeout tests until 7052549 is fixed
77
if (!System.getProperty("os.name").startsWith("Windows"))
78
testTimeout();
79
testShutdown();
80
}
81
82
static class Server implements Closeable {
83
private final ServerSocketChannel ssc;
84
private final InetSocketAddress address;
85
86
Server() throws IOException {
87
this(0);
88
}
89
90
Server(int recvBufSize) throws IOException {
91
ssc = ServerSocketChannel.open();
92
if (recvBufSize > 0) {
93
ssc.setOption(SO_RCVBUF, recvBufSize);
94
}
95
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
96
address = (InetSocketAddress)ssc.getLocalAddress();
97
}
98
99
InetSocketAddress address() {
100
return address;
101
}
102
103
SocketChannel accept() throws IOException {
104
return ssc.accept();
105
}
106
107
public void close() throws IOException {
108
ssc.close();
109
}
110
111
}
112
113
static void testBind() throws Exception {
114
System.out.println("-- bind --");
115
116
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
117
if (ch.getLocalAddress() != null)
118
throw new RuntimeException("Local address should be 'null'");
119
ch.bind(new InetSocketAddress(0));
120
121
// check local address after binding
122
InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
123
if (local.getPort() == 0)
124
throw new RuntimeException("Unexpected port");
125
if (!local.getAddress().isAnyLocalAddress())
126
throw new RuntimeException("Not bound to a wildcard address");
127
128
// try to re-bind
129
try {
130
ch.bind(new InetSocketAddress(0));
131
throw new RuntimeException("AlreadyBoundException expected");
132
} catch (AlreadyBoundException x) {
133
}
134
}
135
136
// check ClosedChannelException
137
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
138
ch.close();
139
try {
140
ch.bind(new InetSocketAddress(0));
141
throw new RuntimeException("ClosedChannelException expected");
142
} catch (ClosedChannelException x) {
143
}
144
}
145
146
static void testSocketOptions() throws Exception {
147
System.out.println("-- socket options --");
148
149
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
150
ch.setOption(SO_RCVBUF, 128*1024)
151
.setOption(SO_SNDBUF, 128*1024)
152
.setOption(SO_REUSEADDR, true);
153
154
// check SO_SNDBUF/SO_RCVBUF limits
155
int before, after;
156
before = ch.getOption(SO_SNDBUF);
157
after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
158
if (after < before)
159
throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
160
before = ch.getOption(SO_RCVBUF);
161
after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
162
if (after < before)
163
throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
164
165
ch.bind(new InetSocketAddress(0));
166
167
// default values
168
if (ch.getOption(SO_KEEPALIVE))
169
throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
170
if (ch.getOption(TCP_NODELAY))
171
throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
172
173
// set and check
174
if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
175
throw new RuntimeException("SO_KEEPALIVE did not change");
176
if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
177
throw new RuntimeException("SO_KEEPALIVE did not change");
178
179
// read others (can't check as actual value is implementation dependent)
180
ch.getOption(SO_RCVBUF);
181
ch.getOption(SO_SNDBUF);
182
183
Set<SocketOption<?>> options = ch.supportedOptions();
184
boolean reuseport = options.contains(SO_REUSEPORT);
185
if (reuseport) {
186
if (ch.getOption(SO_REUSEPORT))
187
throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");
188
if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))
189
throw new RuntimeException("SO_REUSEPORT did not change");
190
}
191
List<? extends SocketOption> extOptions = List.of(TCP_KEEPCOUNT,
192
TCP_KEEPIDLE, TCP_KEEPINTERVAL);
193
if (options.containsAll(extOptions)) {
194
ch.setOption(TCP_KEEPIDLE, 1234);
195
checkOption(ch, TCP_KEEPIDLE, 1234);
196
ch.setOption(TCP_KEEPINTERVAL, 123);
197
checkOption(ch, TCP_KEEPINTERVAL, 123);
198
ch.setOption(TCP_KEEPCOUNT, 7);
199
checkOption(ch, TCP_KEEPCOUNT, 7);
200
}
201
}
202
}
203
204
static void checkOption(AsynchronousSocketChannel sc, SocketOption name, Object expectedValue)
205
throws IOException {
206
Object value = sc.getOption(name);
207
if (!value.equals(expectedValue)) {
208
throw new RuntimeException("value not as expected");
209
}
210
}
211
static void testConnect() throws Exception {
212
System.out.println("-- connect --");
213
214
SocketAddress address;
215
216
try (Server server = new Server()) {
217
address = server.address();
218
219
// connect to server and check local/remote addresses
220
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
221
ch.connect(address).get();
222
// check local address
223
if (ch.getLocalAddress() == null)
224
throw new RuntimeException("Not bound to local address");
225
226
// check remote address
227
InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
228
if (remote.getPort() != server.address().getPort())
229
throw new RuntimeException("Connected to unexpected port");
230
if (!remote.getAddress().equals(server.address().getAddress()))
231
throw new RuntimeException("Connected to unexpected address");
232
233
// try to connect again
234
try {
235
ch.connect(server.address()).get();
236
throw new RuntimeException("AlreadyConnectedException expected");
237
} catch (AlreadyConnectedException x) {
238
}
239
240
// clean-up
241
server.accept().close();
242
}
243
244
// check that connect fails with ClosedChannelException
245
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
246
ch.close();
247
try {
248
ch.connect(server.address()).get();
249
throw new RuntimeException("ExecutionException expected");
250
} catch (ExecutionException x) {
251
if (!(x.getCause() instanceof ClosedChannelException))
252
throw new RuntimeException("Cause of ClosedChannelException expected",
253
x.getCause());
254
}
255
final AtomicReference<Throwable> connectException = new AtomicReference<>();
256
ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
257
public void completed(Void result, Void att) {
258
}
259
public void failed(Throwable exc, Void att) {
260
connectException.set(exc);
261
}
262
});
263
while (connectException.get() == null) {
264
Thread.sleep(100);
265
}
266
if (!(connectException.get() instanceof ClosedChannelException))
267
throw new RuntimeException("ClosedChannelException expected",
268
connectException.get());
269
}
270
271
// test that failure to connect closes the channel
272
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
273
try {
274
ch.connect(address).get();
275
} catch (ExecutionException x) {
276
// failed to establish connection
277
if (ch.isOpen())
278
throw new RuntimeException("Channel should be closed");
279
}
280
}
281
282
// repeat test by connecting to a (probably) non-existent host. This
283
// improves the chance that the connect will not fail immediately.
284
if (!skipSlowConnectTest) {
285
try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
286
try {
287
ch.connect(genSocketAddress()).get();
288
} catch (ExecutionException x) {
289
// failed to establish connection
290
if (ch.isOpen())
291
throw new RuntimeException("Channel should be closed");
292
}
293
}
294
}
295
}
296
297
static void testCloseWhenPending() throws Exception {
298
System.out.println("-- asynchronous close when connecting --");
299
300
AsynchronousSocketChannel ch;
301
302
// asynchronous close while connecting
303
ch = AsynchronousSocketChannel.open();
304
Future<Void> connectResult = ch.connect(genSocketAddress());
305
306
// give time to initiate the connect (SYN)
307
Thread.sleep(50);
308
309
// close
310
ch.close();
311
312
// check that exception is thrown in timely manner
313
try {
314
connectResult.get(5, TimeUnit.SECONDS);
315
} catch (TimeoutException x) {
316
throw new RuntimeException("AsynchronousCloseException not thrown");
317
} catch (ExecutionException x) {
318
// expected
319
}
320
321
System.out.println("-- asynchronous close when reading --");
322
323
try (Server server = new Server(1)) {
324
ch = AsynchronousSocketChannel.open();
325
ch.connect(server.address()).get();
326
327
ByteBuffer dst = ByteBuffer.allocateDirect(100);
328
Future<Integer> result = ch.read(dst);
329
330
// attempt a second read - should fail with ReadPendingException
331
ByteBuffer buf = ByteBuffer.allocateDirect(100);
332
try {
333
ch.read(buf);
334
throw new RuntimeException("ReadPendingException expected");
335
} catch (ReadPendingException x) {
336
}
337
338
// close channel (should cause initial read to complete)
339
SocketChannel peer = server.accept();
340
ch.close();
341
peer.close();
342
343
// check that AsynchronousCloseException is thrown
344
try {
345
result.get();
346
throw new RuntimeException("Should not read");
347
} catch (ExecutionException x) {
348
if (!(x.getCause() instanceof AsynchronousCloseException))
349
throw new RuntimeException(x);
350
}
351
352
System.out.println("-- asynchronous close when writing --");
353
354
ch = AsynchronousSocketChannel.open();
355
ch.connect(server.address()).get();
356
peer = server.accept();
357
peer.setOption(SO_RCVBUF, 1);
358
359
final AtomicReference<Throwable> writeException =
360
new AtomicReference<Throwable>();
361
362
// write bytes to fill socket buffer
363
final AtomicInteger numCompleted = new AtomicInteger();
364
ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
365
public void completed(Integer result, AsynchronousSocketChannel ch) {
366
System.out.println("completed write to async channel: " + result);
367
numCompleted.incrementAndGet();
368
ch.write(genBuffer(), ch, this);
369
System.out.println("started another write to async channel: " + result);
370
}
371
public void failed(Throwable x, AsynchronousSocketChannel ch) {
372
System.out.println("failed write to async channel");
373
writeException.set(x);
374
}
375
});
376
377
// give time for socket buffer to fill up -
378
// take pauses until the handler is no longer being invoked
379
// because all writes are being pended which guarantees that
380
// the internal channel state indicates it is writing
381
int prevNumCompleted = numCompleted.get();
382
do {
383
Thread.sleep((long)(1000 * jdk.test.lib.Utils.TIMEOUT_FACTOR));
384
System.out.println("check if buffer is filled up");
385
if (numCompleted.get() == prevNumCompleted) {
386
break;
387
}
388
prevNumCompleted = numCompleted.get();
389
} while (true);
390
391
// attempt a concurrent write -
392
// should fail with WritePendingException
393
try {
394
System.out.println("concurrent write to async channel");
395
ch.write(genBuffer());
396
System.out.format("prevNumCompleted: %d, numCompleted: %d%n",
397
prevNumCompleted, numCompleted.get());
398
throw new RuntimeException("WritePendingException expected");
399
} catch (WritePendingException x) {
400
}
401
402
// close channel - should cause initial write to complete
403
System.out.println("closing async channel...");
404
ch.close();
405
System.out.println("closed async channel");
406
peer.close();
407
408
// wait for exception
409
while (writeException.get() == null) {
410
Thread.sleep(100);
411
}
412
if (!(writeException.get() instanceof AsynchronousCloseException))
413
throw new RuntimeException("AsynchronousCloseException expected",
414
writeException.get());
415
}
416
}
417
418
static void testCancel() throws Exception {
419
System.out.println("-- cancel --");
420
421
try (Server server = new Server()) {
422
for (int i=0; i<2; i++) {
423
boolean mayInterruptIfRunning = (i == 0) ? false : true;
424
425
// establish loopback connection
426
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
427
ch.connect(server.address()).get();
428
SocketChannel peer = server.accept();
429
430
// start read operation
431
ByteBuffer buf = ByteBuffer.allocate(1);
432
Future<Integer> res = ch.read(buf);
433
434
// cancel operation
435
boolean cancelled = res.cancel(mayInterruptIfRunning);
436
437
// check post-conditions
438
if (!res.isDone())
439
throw new RuntimeException("isDone should return true");
440
if (res.isCancelled() != cancelled)
441
throw new RuntimeException("isCancelled not consistent");
442
try {
443
res.get();
444
throw new RuntimeException("CancellationException expected");
445
} catch (CancellationException x) {
446
}
447
try {
448
res.get(1, TimeUnit.SECONDS);
449
throw new RuntimeException("CancellationException expected");
450
} catch (CancellationException x) {
451
}
452
453
// check that the cancel doesn't impact writing to the channel
454
if (!mayInterruptIfRunning) {
455
buf = ByteBuffer.wrap("a".getBytes());
456
ch.write(buf).get();
457
}
458
459
ch.close();
460
peer.close();
461
}
462
}
463
}
464
465
static void testRead1() throws Exception {
466
System.out.println("-- read (1) --");
467
468
try (Server server = new Server()) {
469
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
470
ch.connect(server.address()).get();
471
472
// read with 0 bytes remaining should complete immediately
473
ByteBuffer buf = ByteBuffer.allocate(1);
474
buf.put((byte)0);
475
int n = ch.read(buf).get();
476
if (n != 0)
477
throw new RuntimeException("0 expected");
478
479
// write bytes and close connection
480
ByteBuffer src = genBuffer();
481
try (SocketChannel sc = server.accept()) {
482
sc.setOption(SO_SNDBUF, src.remaining());
483
while (src.hasRemaining())
484
sc.write(src);
485
}
486
487
// reads should complete immediately
488
final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
489
final CountDownLatch latch = new CountDownLatch(1);
490
ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
491
public void completed(Integer result, Void att) {
492
int n = result;
493
if (n > 0) {
494
ch.read(dst, (Void)null, this);
495
} else {
496
latch.countDown();
497
}
498
}
499
public void failed(Throwable exc, Void att) {
500
}
501
});
502
503
latch.await();
504
505
// check buffers
506
src.flip();
507
dst.flip();
508
if (!src.equals(dst)) {
509
throw new RuntimeException("Contents differ");
510
}
511
512
// close channel
513
ch.close();
514
515
// check read fails with ClosedChannelException
516
try {
517
ch.read(dst).get();
518
throw new RuntimeException("ExecutionException expected");
519
} catch (ExecutionException x) {
520
if (!(x.getCause() instanceof ClosedChannelException))
521
throw new RuntimeException("Cause of ClosedChannelException expected",
522
x.getCause());
523
}
524
}
525
}
526
527
static void testRead2() throws Exception {
528
System.out.println("-- read (2) --");
529
530
try (Server server = new Server()) {
531
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
532
ch.connect(server.address()).get();
533
SocketChannel sc = server.accept();
534
535
ByteBuffer src = genBuffer();
536
537
// read until the buffer is full
538
final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
539
final CountDownLatch latch = new CountDownLatch(1);
540
ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
541
public void completed(Integer result, Void att) {
542
if (dst.hasRemaining()) {
543
ch.read(dst, (Void)null, this);
544
} else {
545
latch.countDown();
546
}
547
}
548
public void failed(Throwable exc, Void att) {
549
}
550
});
551
552
// trickle the writing
553
do {
554
int rem = src.remaining();
555
int size = (rem <= 100) ? rem : 50 + RAND.nextInt(rem - 100);
556
ByteBuffer buf = ByteBuffer.allocate(size);
557
for (int i=0; i<size; i++)
558
buf.put(src.get());
559
buf.flip();
560
Thread.sleep(50 + RAND.nextInt(1500));
561
while (buf.hasRemaining())
562
sc.write(buf);
563
} while (src.hasRemaining());
564
565
// wait until ascynrhonous reading has completed
566
latch.await();
567
568
// check buffers
569
src.flip();
570
dst.flip();
571
if (!src.equals(dst)) {
572
throw new RuntimeException("Contents differ");
573
}
574
575
sc.close();
576
ch.close();
577
}
578
}
579
580
// exercise scattering read
581
static void testRead3() throws Exception {
582
System.out.println("-- read (3) --");
583
584
try (Server server = new Server()) {
585
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
586
ch.connect(server.address()).get();
587
SocketChannel sc = server.accept();
588
589
ByteBuffer[] dsts = new ByteBuffer[3];
590
for (int i=0; i<dsts.length; i++) {
591
dsts[i] = ByteBuffer.allocateDirect(100);
592
}
593
594
// scattering read that completes ascynhronously
595
final CountDownLatch l1 = new CountDownLatch(1);
596
ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
597
new CompletionHandler<Long,Void>() {
598
public void completed(Long result, Void att) {
599
long n = result;
600
if (n <= 0)
601
throw new RuntimeException("No bytes read");
602
l1.countDown();
603
}
604
public void failed(Throwable exc, Void att) {
605
}
606
});
607
608
// write some bytes
609
sc.write(genBuffer());
610
611
// read should now complete
612
l1.await();
613
614
// write more bytes
615
sc.write(genBuffer());
616
617
// read should complete immediately
618
for (int i=0; i<dsts.length; i++) {
619
dsts[i].rewind();
620
}
621
622
final CountDownLatch l2 = new CountDownLatch(1);
623
ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
624
new CompletionHandler<Long,Void>() {
625
public void completed(Long result, Void att) {
626
long n = result;
627
if (n <= 0)
628
throw new RuntimeException("No bytes read");
629
l2.countDown();
630
}
631
public void failed(Throwable exc, Void att) {
632
}
633
});
634
l2.await();
635
636
ch.close();
637
sc.close();
638
}
639
}
640
641
static void testWrite1() throws Exception {
642
System.out.println("-- write (1) --");
643
644
try (Server server = new Server()) {
645
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
646
ch.connect(server.address()).get();
647
SocketChannel sc = server.accept();
648
649
// write with 0 bytes remaining should complete immediately
650
ByteBuffer buf = ByteBuffer.allocate(1);
651
buf.put((byte)0);
652
int n = ch.write(buf).get();
653
if (n != 0)
654
throw new RuntimeException("0 expected");
655
656
// write all bytes and close connection when done
657
final ByteBuffer src = genBuffer();
658
ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
659
public void completed(Integer result, Void att) {
660
if (src.hasRemaining()) {
661
ch.write(src, (Void)null, this);
662
} else {
663
try {
664
ch.close();
665
} catch (IOException ignore) { }
666
}
667
}
668
public void failed(Throwable exc, Void att) {
669
}
670
});
671
672
// read to EOF or buffer full
673
ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
674
do {
675
n = sc.read(dst);
676
} while (n > 0);
677
sc.close();
678
679
// check buffers
680
src.flip();
681
dst.flip();
682
if (!src.equals(dst)) {
683
throw new RuntimeException("Contents differ");
684
}
685
686
// check write fails with ClosedChannelException
687
try {
688
ch.read(dst).get();
689
throw new RuntimeException("ExecutionException expected");
690
} catch (ExecutionException x) {
691
if (!(x.getCause() instanceof ClosedChannelException))
692
throw new RuntimeException("Cause of ClosedChannelException expected",
693
x.getCause());
694
}
695
}
696
}
697
698
// exercise gathering write
699
static void testWrite2() throws Exception {
700
System.out.println("-- write (2) --");
701
702
try (Server server = new Server()) {
703
final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
704
ch.connect(server.address()).get();
705
SocketChannel sc = server.accept();
706
707
// number of bytes written
708
final AtomicLong bytesWritten = new AtomicLong(0);
709
710
// write buffers (should complete immediately)
711
ByteBuffer[] srcs = genBuffers(1);
712
final CountDownLatch l1 = new CountDownLatch(1);
713
ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
714
new CompletionHandler<Long,Void>() {
715
public void completed(Long result, Void att) {
716
long n = result;
717
if (n <= 0)
718
throw new RuntimeException("No bytes read");
719
bytesWritten.addAndGet(n);
720
l1.countDown();
721
}
722
public void failed(Throwable exc, Void att) {
723
}
724
});
725
l1.await();
726
727
// set to true to signal that no more buffers should be written
728
final AtomicBoolean continueWriting = new AtomicBoolean(true);
729
730
// write until socket buffer is full so as to create the conditions
731
// for when a write does not complete immediately
732
srcs = genBuffers(1);
733
ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
734
new CompletionHandler<Long,Void>() {
735
public void completed(Long result, Void att) {
736
long n = result;
737
if (n <= 0)
738
throw new RuntimeException("No bytes written");
739
bytesWritten.addAndGet(n);
740
if (continueWriting.get()) {
741
ByteBuffer[] srcs = genBuffers(8);
742
ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
743
(Void)null, this);
744
}
745
}
746
public void failed(Throwable exc, Void att) {
747
}
748
});
749
750
// give time for socket buffer to fill up.
751
Thread.sleep(5*1000);
752
753
// signal handler to stop further writing
754
continueWriting.set(false);
755
756
// read until done
757
ByteBuffer buf = ByteBuffer.allocateDirect(4096);
758
long total = 0L;
759
do {
760
int n = sc.read(buf);
761
if (n <= 0)
762
throw new RuntimeException("No bytes read");
763
buf.rewind();
764
total += n;
765
} while (total < bytesWritten.get());
766
767
ch.close();
768
sc.close();
769
}
770
}
771
772
static void testShutdown() throws Exception {
773
System.out.println("-- shutdown --");
774
775
try (Server server = new Server();
776
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
777
{
778
ch.connect(server.address()).get();
779
try (SocketChannel peer = server.accept()) {
780
ByteBuffer buf = ByteBuffer.allocateDirect(1000);
781
int n;
782
783
// check read
784
ch.shutdownInput();
785
n = ch.read(buf).get();
786
if (n != -1)
787
throw new RuntimeException("-1 expected");
788
// check full with full buffer
789
buf.put(new byte[100]);
790
n = ch.read(buf).get();
791
if (n != -1)
792
throw new RuntimeException("-1 expected");
793
794
// check write
795
ch.shutdownOutput();
796
try {
797
ch.write(buf).get();
798
throw new RuntimeException("ClosedChannelException expected");
799
} catch (ExecutionException x) {
800
if (!(x.getCause() instanceof ClosedChannelException))
801
throw new RuntimeException("ClosedChannelException expected",
802
x.getCause());
803
}
804
}
805
}
806
}
807
808
static void testTimeout() throws Exception {
809
System.out.println("-- timeouts --");
810
testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
811
testTimeout(-1L, TimeUnit.SECONDS);
812
testTimeout(0L, TimeUnit.SECONDS);
813
testTimeout(2L, TimeUnit.SECONDS);
814
}
815
816
static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
817
System.out.printf("---- timeout: %d ms%n", unit.toMillis(timeout));
818
try (Server server = new Server()) {
819
AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
820
ch.connect(server.address()).get();
821
822
ByteBuffer dst = ByteBuffer.allocate(512);
823
824
final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
825
826
// this read should timeout if value is > 0
827
ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
828
public void completed(Integer result, Void att) {
829
readException.set(new RuntimeException("Should not complete"));
830
}
831
public void failed(Throwable exc, Void att) {
832
readException.set(exc);
833
}
834
});
835
if (timeout > 0L) {
836
// wait for exception
837
while (readException.get() == null) {
838
Thread.sleep(100);
839
}
840
if (!(readException.get() instanceof InterruptedByTimeoutException))
841
throw new RuntimeException("InterruptedByTimeoutException expected",
842
readException.get());
843
844
// after a timeout then further reading should throw unspecified runtime exception
845
boolean exceptionThrown = false;
846
try {
847
ch.read(dst);
848
} catch (RuntimeException x) {
849
exceptionThrown = true;
850
}
851
if (!exceptionThrown)
852
throw new RuntimeException("RuntimeException expected after timeout.");
853
} else {
854
Thread.sleep(1000);
855
Throwable exc = readException.get();
856
if (exc != null)
857
throw new RuntimeException(exc);
858
}
859
860
final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
861
862
// write bytes to fill socket buffer
863
ch.write(genBuffer(), timeout, unit, ch,
864
new CompletionHandler<Integer,AsynchronousSocketChannel>()
865
{
866
public void completed(Integer result, AsynchronousSocketChannel ch) {
867
ch.write(genBuffer(), timeout, unit, ch, this);
868
}
869
public void failed(Throwable exc, AsynchronousSocketChannel ch) {
870
writeException.set(exc);
871
}
872
});
873
if (timeout > 0) {
874
// wait for exception
875
while (writeException.get() == null) {
876
Thread.sleep(100);
877
}
878
if (!(writeException.get() instanceof InterruptedByTimeoutException))
879
throw new RuntimeException("InterruptedByTimeoutException expected",
880
writeException.get());
881
882
// after a timeout then further writing should throw unspecified runtime exception
883
boolean exceptionThrown = false;
884
try {
885
ch.write(genBuffer());
886
} catch (RuntimeException x) {
887
exceptionThrown = true;
888
}
889
if (!exceptionThrown)
890
throw new RuntimeException("RuntimeException expected after timeout.");
891
} else {
892
Thread.sleep(1000);
893
Throwable exc = writeException.get();
894
if (exc != null)
895
throw new RuntimeException(exc);
896
}
897
898
// clean-up
899
server.accept().close();
900
ch.close();
901
}
902
}
903
904
// returns ByteBuffer with random bytes
905
static ByteBuffer genBuffer() {
906
int size = 1024 + RAND.nextInt(16000);
907
byte[] buf = new byte[size];
908
RAND.nextBytes(buf);
909
boolean useDirect = RAND.nextBoolean();
910
if (useDirect) {
911
ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
912
bb.put(buf);
913
bb.flip();
914
return bb;
915
} else {
916
return ByteBuffer.wrap(buf);
917
}
918
}
919
920
// return ByteBuffer[] with random bytes
921
static ByteBuffer[] genBuffers(int max) {
922
int len = 1;
923
if (max > 1)
924
len += RAND.nextInt(max);
925
ByteBuffer[] bufs = new ByteBuffer[len];
926
for (int i=0; i<len; i++)
927
bufs[i] = genBuffer();
928
return bufs;
929
}
930
931
// return random SocketAddress
932
static SocketAddress genSocketAddress() {
933
StringBuilder sb = new StringBuilder("10.");
934
sb.append(RAND.nextInt(256));
935
sb.append('.');
936
sb.append(RAND.nextInt(256));
937
sb.append('.');
938
sb.append(RAND.nextInt(256));
939
InetAddress rh;
940
try {
941
rh = InetAddress.getByName(sb.toString());
942
} catch (UnknownHostException x) {
943
throw new InternalError("Should not happen");
944
}
945
return new InetSocketAddress(rh, RAND.nextInt(65535)+1);
946
}
947
}
948
949