Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/nio/channels/AsyncCloseAndInterrupt.java
41149 views
1
/*
2
* Copyright (c) 2002, 2020, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/* @test
25
* @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
26
* 8151582 8068693 8153209
27
* @run main/othervm AsyncCloseAndInterrupt
28
* @key intermittent
29
* @summary Comprehensive test of asynchronous closing and interruption
30
* @author Mark Reinhold
31
*/
32
33
import java.io.*;
34
import java.net.*;
35
import java.nio.channels.*;
36
import java.nio.ByteBuffer;
37
import java.util.ArrayList;
38
import java.util.List;
39
import java.util.concurrent.ExecutorService;
40
import java.util.concurrent.Executors;
41
import java.util.concurrent.ThreadFactory;
42
import java.util.concurrent.Callable;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.TimeUnit;
45
46
public class AsyncCloseAndInterrupt {
47
48
static PrintStream log = System.err;
49
50
static void sleep(int ms) {
51
try {
52
Thread.sleep(ms);
53
} catch (InterruptedException x) { }
54
}
55
56
// Wildcard address localized to this machine -- Windoze doesn't allow
57
// connecting to a server socket that was previously bound to a true
58
// wildcard, namely new InetSocketAddress((InetAddress)null, 0).
59
//
60
private static InetSocketAddress wildcardAddress;
61
62
63
// Server socket that blindly accepts all connections
64
65
static ServerSocketChannel acceptor;
66
67
private static void initAcceptor() throws IOException {
68
acceptor = ServerSocketChannel.open();
69
acceptor.socket().bind(wildcardAddress);
70
71
Thread th = new Thread("Acceptor") {
72
public void run() {
73
try {
74
for (;;) {
75
SocketChannel sc = acceptor.accept();
76
}
77
} catch (IOException x) {
78
x.printStackTrace();
79
}
80
}
81
};
82
83
th.setDaemon(true);
84
th.start();
85
}
86
87
88
// Server socket that refuses all connections
89
90
static ServerSocketChannel refuser;
91
92
private static void initRefuser() throws IOException {
93
refuser = ServerSocketChannel.open();
94
refuser.bind(wildcardAddress, 1); // use minimum backlog
95
}
96
97
// Dead pipe source and sink
98
99
static Pipe.SourceChannel deadSource;
100
static Pipe.SinkChannel deadSink;
101
102
private static void initPipes() throws IOException {
103
if (deadSource != null)
104
deadSource.close();
105
deadSource = Pipe.open().source();
106
if (deadSink != null)
107
deadSink.close();
108
deadSink = Pipe.open().sink();
109
}
110
111
112
// Files
113
114
private static File fifoFile = null; // File that blocks on reads and writes
115
private static File diskFile = null; // Disk file
116
117
private static void initFile() throws Exception {
118
119
diskFile = File.createTempFile("aci", ".tmp");
120
diskFile.deleteOnExit();
121
FileChannel fc = new FileOutputStream(diskFile).getChannel();
122
buffer.clear();
123
if (fc.write(buffer) != buffer.capacity())
124
throw new RuntimeException("Cannot create disk file");
125
fc.close();
126
127
if (TestUtil.onWindows()) {
128
log.println("WARNING: Cannot completely test FileChannels on Windows");
129
return;
130
}
131
fifoFile = new File("x.fifo");
132
if (fifoFile.exists()) {
133
if (!fifoFile.delete())
134
throw new IOException("Cannot delete existing fifo " + fifoFile);
135
}
136
Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
137
if (p.waitFor() != 0)
138
throw new IOException("Error creating fifo");
139
new RandomAccessFile(fifoFile, "rw").close();
140
141
}
142
143
144
// Channel factories
145
146
static abstract class ChannelFactory {
147
private final String name;
148
ChannelFactory(String name) {
149
this.name = name;
150
}
151
public String toString() {
152
return name;
153
}
154
abstract InterruptibleChannel create() throws IOException;
155
}
156
157
static ChannelFactory socketChannelFactory
158
= new ChannelFactory("SocketChannel") {
159
InterruptibleChannel create() throws IOException {
160
return SocketChannel.open();
161
}
162
};
163
164
static ChannelFactory connectedSocketChannelFactory
165
= new ChannelFactory("SocketChannel") {
166
InterruptibleChannel create() throws IOException {
167
SocketAddress sa = acceptor.socket().getLocalSocketAddress();
168
return SocketChannel.open(sa);
169
}
170
};
171
172
static ChannelFactory serverSocketChannelFactory
173
= new ChannelFactory("ServerSocketChannel") {
174
InterruptibleChannel create() throws IOException {
175
ServerSocketChannel ssc = ServerSocketChannel.open();
176
ssc.socket().bind(wildcardAddress);
177
return ssc;
178
}
179
};
180
181
static ChannelFactory datagramChannelFactory
182
= new ChannelFactory("DatagramChannel") {
183
InterruptibleChannel create() throws IOException {
184
DatagramChannel dc = DatagramChannel.open();
185
InetAddress lb = InetAddress.getLoopbackAddress();
186
dc.bind(new InetSocketAddress(lb, 0));
187
dc.connect(new InetSocketAddress(lb, 80));
188
return dc;
189
}
190
};
191
192
static ChannelFactory pipeSourceChannelFactory
193
= new ChannelFactory("Pipe.SourceChannel") {
194
InterruptibleChannel create() throws IOException {
195
// ## arrange to close sink
196
return Pipe.open().source();
197
}
198
};
199
200
static ChannelFactory pipeSinkChannelFactory
201
= new ChannelFactory("Pipe.SinkChannel") {
202
InterruptibleChannel create() throws IOException {
203
// ## arrange to close source
204
return Pipe.open().sink();
205
}
206
};
207
208
static ChannelFactory fifoFileChannelFactory
209
= new ChannelFactory("FileChannel") {
210
InterruptibleChannel create() throws IOException {
211
return new RandomAccessFile(fifoFile, "rw").getChannel();
212
}
213
};
214
215
static ChannelFactory diskFileChannelFactory
216
= new ChannelFactory("FileChannel") {
217
InterruptibleChannel create() throws IOException {
218
return new RandomAccessFile(diskFile, "rw").getChannel();
219
}
220
};
221
222
223
// I/O operations
224
225
static abstract class Op {
226
private final String name;
227
protected Op(String name) {
228
this.name = name;
229
}
230
abstract void doIO(InterruptibleChannel ich) throws IOException;
231
void setup() throws IOException { }
232
public String toString() { return name; }
233
}
234
235
static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
236
237
static ByteBuffer[] buffers = new ByteBuffer[] {
238
ByteBuffer.allocateDirect(1 << 19),
239
ByteBuffer.allocateDirect(1 << 19)
240
};
241
242
static void clearBuffers() {
243
buffers[0].clear();
244
buffers[1].clear();
245
}
246
247
static void show(Channel ch) {
248
log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
249
if (ch.isOpen() && (ch instanceof SocketChannel)) {
250
SocketChannel sc = (SocketChannel)ch;
251
if (sc.socket().isInputShutdown())
252
log.print(", input shutdown");
253
if (sc.socket().isOutputShutdown())
254
log.print(", output shutdown");
255
}
256
log.println();
257
}
258
259
static final Op READ = new Op("read") {
260
void doIO(InterruptibleChannel ich) throws IOException {
261
ReadableByteChannel rbc = (ReadableByteChannel)ich;
262
buffer.clear();
263
int n = rbc.read(buffer);
264
log.println("Read returned " + n);
265
show(rbc);
266
if (rbc.isOpen()
267
&& (n == -1)
268
&& (rbc instanceof SocketChannel)
269
&& ((SocketChannel)rbc).socket().isInputShutdown()) {
270
return;
271
}
272
throw new RuntimeException("Read succeeded");
273
}
274
};
275
276
static final Op READV = new Op("readv") {
277
void doIO(InterruptibleChannel ich) throws IOException {
278
ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
279
clearBuffers();
280
int n = (int)sbc.read(buffers);
281
log.println("Read returned " + n);
282
show(sbc);
283
if (sbc.isOpen()
284
&& (n == -1)
285
&& (sbc instanceof SocketChannel)
286
&& ((SocketChannel)sbc).socket().isInputShutdown()) {
287
return;
288
}
289
throw new RuntimeException("Read succeeded");
290
}
291
};
292
293
static final Op RECEIVE = new Op("receive") {
294
void doIO(InterruptibleChannel ich) throws IOException {
295
DatagramChannel dc = (DatagramChannel)ich;
296
buffer.clear();
297
dc.receive(buffer);
298
show(dc);
299
throw new RuntimeException("Read succeeded");
300
}
301
};
302
303
static final Op WRITE = new Op("write") {
304
void doIO(InterruptibleChannel ich) throws IOException {
305
306
WritableByteChannel wbc = (WritableByteChannel)ich;
307
308
SocketChannel sc = null;
309
if (wbc instanceof SocketChannel)
310
sc = (SocketChannel)wbc;
311
312
int n = 0;
313
for (;;) {
314
buffer.clear();
315
int d = wbc.write(buffer);
316
n += d;
317
if (!wbc.isOpen())
318
break;
319
if ((sc != null) && sc.socket().isOutputShutdown())
320
break;
321
}
322
log.println("Wrote " + n + " bytes");
323
show(wbc);
324
}
325
};
326
327
static final Op WRITEV = new Op("writev") {
328
void doIO(InterruptibleChannel ich) throws IOException {
329
330
GatheringByteChannel gbc = (GatheringByteChannel)ich;
331
332
SocketChannel sc = null;
333
if (gbc instanceof SocketChannel)
334
sc = (SocketChannel)gbc;
335
336
int n = 0;
337
for (;;) {
338
clearBuffers();
339
int d = (int)gbc.write(buffers);
340
n += d;
341
if (!gbc.isOpen())
342
break;
343
if ((sc != null) && sc.socket().isOutputShutdown())
344
break;
345
}
346
log.println("Wrote " + n + " bytes");
347
show(gbc);
348
349
}
350
};
351
352
static final Op CONNECT = new Op("connect") {
353
void setup() {
354
waitPump("connect waiting for pumping refuser ...");
355
}
356
void doIO(InterruptibleChannel ich) throws IOException {
357
SocketChannel sc = (SocketChannel)ich;
358
if (sc.connect(refuser.socket().getLocalSocketAddress()))
359
throw new RuntimeException("Connection succeeded");
360
throw new RuntimeException("Connection did not block");
361
}
362
};
363
364
static final Op FINISH_CONNECT = new Op("finishConnect") {
365
void setup() {
366
waitPump("finishConnect waiting for pumping refuser ...");
367
}
368
void doIO(InterruptibleChannel ich) throws IOException {
369
SocketChannel sc = (SocketChannel)ich;
370
sc.configureBlocking(false);
371
SocketAddress sa = refuser.socket().getLocalSocketAddress();
372
if (sc.connect(sa))
373
throw new RuntimeException("Connection succeeded");
374
sc.configureBlocking(true);
375
if (sc.finishConnect())
376
throw new RuntimeException("Connection succeeded");
377
throw new RuntimeException("Connection did not block");
378
}
379
};
380
381
static final Op ACCEPT = new Op("accept") {
382
void doIO(InterruptibleChannel ich) throws IOException {
383
ServerSocketChannel ssc = (ServerSocketChannel)ich;
384
ssc.accept();
385
throw new RuntimeException("Accept succeeded");
386
}
387
};
388
389
// Use only with diskFileChannelFactory
390
static final Op TRANSFER_TO = new Op("transferTo") {
391
void doIO(InterruptibleChannel ich) throws IOException {
392
FileChannel fc = (FileChannel)ich;
393
long n = fc.transferTo(0, fc.size(), deadSink);
394
log.println("Transferred " + n + " bytes");
395
show(fc);
396
}
397
};
398
399
// Use only with diskFileChannelFactory
400
static final Op TRANSFER_FROM = new Op("transferFrom") {
401
void doIO(InterruptibleChannel ich) throws IOException {
402
FileChannel fc = (FileChannel)ich;
403
long n = fc.transferFrom(deadSource, 0, 1 << 20);
404
log.println("Transferred " + n + " bytes");
405
show(fc);
406
}
407
};
408
409
410
411
// Test modes
412
413
static final int TEST_PREINTR = 0; // Interrupt thread before I/O
414
static final int TEST_INTR = 1; // Interrupt thread during I/O
415
static final int TEST_CLOSE = 2; // Close channel during I/O
416
static final int TEST_SHUTI = 3; // Shutdown input during I/O
417
static final int TEST_SHUTO = 4; // Shutdown output during I/O
418
419
static final String[] testName = new String[] {
420
"pre-interrupt", "interrupt", "close",
421
"shutdown-input", "shutdown-output"
422
};
423
424
425
static class Tester extends TestThread {
426
427
private InterruptibleChannel ch;
428
private Op op;
429
private int test;
430
volatile boolean ready = false;
431
432
protected Tester(ChannelFactory cf, InterruptibleChannel ch,
433
Op op, int test)
434
{
435
super(cf + "/" + op + "/" + testName[test]);
436
this.ch = ch;
437
this.op = op;
438
this.test = test;
439
}
440
441
@SuppressWarnings("fallthrough")
442
private void caught(Channel ch, IOException x) {
443
String xn = x.getClass().getName();
444
switch (test) {
445
446
case TEST_PREINTR:
447
case TEST_INTR:
448
if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
449
throw new RuntimeException("Wrong exception thrown: " + x);
450
break;
451
452
case TEST_CLOSE:
453
case TEST_SHUTO:
454
if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
455
throw new RuntimeException("Wrong exception thrown: " + x);
456
break;
457
458
case TEST_SHUTI:
459
if (TestUtil.onWindows())
460
break;
461
// FALL THROUGH
462
463
default:
464
throw new Error(x);
465
}
466
467
if (ch.isOpen()) {
468
if (test == TEST_SHUTO) {
469
SocketChannel sc = (SocketChannel)ch;
470
if (!sc.socket().isOutputShutdown())
471
throw new RuntimeException("Output not shutdown");
472
} else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
473
// Let this case pass -- CBIE applies to other channel
474
} else {
475
throw new RuntimeException("Channel still open");
476
}
477
}
478
479
log.println("Thrown as expected: " + x);
480
}
481
482
final void go() throws Exception {
483
if (test == TEST_PREINTR)
484
Thread.currentThread().interrupt();
485
ready = true;
486
try {
487
op.doIO(ch);
488
} catch (ClosedByInterruptException x) {
489
caught(ch, x);
490
} catch (AsynchronousCloseException x) {
491
caught(ch, x);
492
} finally {
493
ch.close();
494
}
495
}
496
497
}
498
499
private static volatile boolean pumpDone = false;
500
private static volatile boolean pumpReady = false;
501
502
private static void waitPump(String msg){
503
log.println(msg);
504
while (!pumpReady){
505
sleep(200);
506
}
507
log.println(msg + " done");
508
}
509
510
// Create a pump thread dedicated to saturate refuser's connection backlog
511
private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
512
513
Callable<Integer> pumpTask = new Callable<Integer>() {
514
515
@Override
516
public Integer call() throws IOException {
517
// Can't reliably saturate connection backlog on Windows Server editions
518
assert !TestUtil.onWindows();
519
log.println("Start pumping refuser ...");
520
List<SocketChannel> refuserClients = new ArrayList<>();
521
522
// Saturate the refuser's connection backlog so that further connection
523
// attempts will be blocked
524
pumpReady = false;
525
while (!pumpDone) {
526
SocketChannel sc = SocketChannel.open();
527
sc.configureBlocking(false);
528
boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
529
530
// Assume that the connection backlog is saturated if a
531
// client cannot connect to the refuser within 50 milliseconds
532
long start = System.currentTimeMillis();
533
while (!pumpReady && !connected
534
&& (System.currentTimeMillis() - start < 50)) {
535
connected = sc.finishConnect();
536
}
537
538
if (connected) {
539
// Retain so that finalizer doesn't close
540
refuserClients.add(sc);
541
} else {
542
sc.close();
543
pumpReady = true;
544
}
545
}
546
547
for (SocketChannel sc : refuserClients) {
548
sc.close();
549
}
550
refuser.close();
551
552
log.println("Stop pumping refuser ...");
553
return refuserClients.size();
554
}
555
};
556
557
return pumperExecutor.submit(pumpTask);
558
}
559
560
// Test
561
static void test(ChannelFactory cf, Op op, int test) throws Exception {
562
test(cf, op, test, true);
563
}
564
565
static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)
566
throws Exception
567
{
568
log.println();
569
initPipes();
570
InterruptibleChannel ch = cf.create();
571
Tester t = new Tester(cf, ch, op, test);
572
log.println(t);
573
op.setup();
574
t.start();
575
do {
576
sleep(50);
577
} while (!t.ready);
578
579
if (extraSleep) {
580
sleep(100);
581
}
582
583
switch (test) {
584
585
case TEST_INTR:
586
t.interrupt();
587
break;
588
589
case TEST_CLOSE:
590
ch.close();
591
break;
592
593
case TEST_SHUTI:
594
if (TestUtil.onWindows()) {
595
log.println("WARNING: Asynchronous shutdown not working on Windows");
596
ch.close();
597
} else {
598
((SocketChannel)ch).socket().shutdownInput();
599
}
600
break;
601
602
case TEST_SHUTO:
603
if (TestUtil.onWindows()) {
604
log.println("WARNING: Asynchronous shutdown not working on Windows");
605
ch.close();
606
} else {
607
((SocketChannel)ch).socket().shutdownOutput();
608
}
609
break;
610
611
default:
612
break;
613
}
614
615
t.finishAndThrow(10000);
616
}
617
618
static void test(ChannelFactory cf, Op op) throws Exception {
619
test(cf, op, true);
620
}
621
622
static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {
623
// Test INTR cases before PREINTER cases since sometimes
624
// interrupted threads can't load classes
625
test(cf, op, TEST_INTR, extraSleep);
626
test(cf, op, TEST_PREINTR, extraSleep);
627
628
// Bugs, see FileChannelImpl for details
629
if (op == TRANSFER_FROM) {
630
log.println("WARNING: transferFrom/close not tested");
631
return;
632
}
633
if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
634
log.println("WARNING: transferTo/close not tested");
635
return;
636
}
637
638
test(cf, op, TEST_CLOSE, extraSleep);
639
}
640
641
static void test(ChannelFactory cf)
642
throws Exception
643
{
644
InterruptibleChannel ch = cf.create(); // Sample channel
645
ch.close();
646
647
if (ch instanceof ReadableByteChannel) {
648
test(cf, READ);
649
if (ch instanceof SocketChannel)
650
test(cf, READ, TEST_SHUTI);
651
}
652
653
if (ch instanceof ScatteringByteChannel) {
654
test(cf, READV);
655
if (ch instanceof SocketChannel)
656
test(cf, READV, TEST_SHUTI);
657
}
658
659
if (ch instanceof DatagramChannel) {
660
test(cf, RECEIVE);
661
662
// Return here: We can't effectively test writes since, if they
663
// block, they do so only for a fleeting moment unless the network
664
// interface is overloaded.
665
return;
666
667
}
668
669
if (ch instanceof WritableByteChannel) {
670
test(cf, WRITE);
671
if (ch instanceof SocketChannel)
672
test(cf, WRITE, TEST_SHUTO);
673
}
674
675
if (ch instanceof GatheringByteChannel) {
676
test(cf, WRITEV);
677
if (ch instanceof SocketChannel)
678
test(cf, WRITEV, TEST_SHUTO);
679
}
680
681
}
682
683
public static void main(String[] args) throws Exception {
684
685
wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
686
initAcceptor();
687
if (!TestUtil.onWindows())
688
initRefuser();
689
initPipes();
690
initFile();
691
692
if (TestUtil.onWindows()) {
693
log.println("WARNING: Cannot test FileChannel transfer operations"
694
+ " on Windows");
695
} else {
696
test(diskFileChannelFactory, TRANSFER_TO);
697
test(diskFileChannelFactory, TRANSFER_FROM);
698
}
699
if (fifoFile != null)
700
test(fifoFileChannelFactory);
701
702
// Testing positional file reads and writes is impractical: It requires
703
// access to a large file soft-mounted via NFS, and even then isn't
704
// completely guaranteed to work.
705
//
706
// Testing map is impractical and arguably unnecessary: It's
707
// unclear under what conditions mmap(2) will actually block.
708
709
test(connectedSocketChannelFactory);
710
711
if (TestUtil.onWindows()) {
712
log.println("WARNING Cannot reliably test connect/finishConnect"
713
+ " operations on this platform");
714
} else {
715
// Only the following tests need refuser's connection backlog
716
// to be saturated
717
ExecutorService pumperExecutor =
718
Executors.newSingleThreadExecutor(
719
new ThreadFactory() {
720
721
@Override
722
public Thread newThread(Runnable r) {
723
Thread t = new Thread(r);
724
t.setDaemon(true);
725
t.setName("Pumper");
726
return t;
727
}
728
});
729
730
pumpDone = false;
731
try {
732
Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
733
waitPump("\nWait for initial Pump");
734
735
test(socketChannelFactory, CONNECT, false);
736
test(socketChannelFactory, FINISH_CONNECT, false);
737
738
pumpDone = true;
739
Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
740
log.println("Pump " + newConn + " connections.");
741
} finally {
742
pumperExecutor.shutdown();
743
}
744
}
745
746
test(serverSocketChannelFactory, ACCEPT);
747
test(datagramChannelFactory);
748
test(pipeSourceChannelFactory);
749
test(pipeSinkChannelFactory);
750
}
751
}
752
753