Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java
41154 views
1
/*
2
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/* @test
25
* @bug 8222774 4430139
26
* @run testng AdaptorStreams
27
* @summary Exercise socket adaptor input/output streams
28
*/
29
30
import java.io.Closeable;
31
import java.io.IOException;
32
import java.io.InputStream;
33
import java.io.OutputStream;
34
import java.net.InetAddress;
35
import java.net.InetSocketAddress;
36
import java.net.ServerSocket;
37
import java.net.Socket;
38
import java.net.SocketTimeoutException;
39
import java.nio.channels.IllegalBlockingModeException;
40
import java.nio.channels.SocketChannel;
41
import java.util.concurrent.ExecutorService;
42
import java.util.concurrent.Executors;
43
import java.util.concurrent.Future;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.concurrent.TimeUnit;
46
47
import org.testng.annotations.Test;
48
import static org.testng.Assert.*;
49
50
@Test
51
public class AdaptorStreams {
52
53
/**
54
* Test read when bytes are available
55
*/
56
public void testRead1() throws Exception {
57
withConnection((sc, peer) -> {
58
peer.getOutputStream().write(99);
59
int n = sc.socket().getInputStream().read();
60
assertEquals(n, 99);
61
});
62
}
63
64
/**
65
* Test read blocking before bytes are available
66
*/
67
public void testRead2() throws Exception {
68
withConnection((sc, peer) -> {
69
scheduleWrite(peer.getOutputStream(), 99, 1000);
70
int n = sc.socket().getInputStream().read();
71
assertEquals(n, 99);
72
});
73
}
74
75
/**
76
* Test read when peer has closed connection
77
*/
78
public void testRead3() throws Exception {
79
withConnection((sc, peer) -> {
80
peer.close();
81
int n = sc.socket().getInputStream().read();
82
assertEquals(n, -1);
83
});
84
}
85
86
/**
87
* Test read blocking before peer closes connection
88
*/
89
public void testRead4() throws Exception {
90
withConnection((sc, peer) -> {
91
scheduleClose(peer, 1000);
92
int n = sc.socket().getInputStream().read();
93
assertEquals(n, -1);
94
});
95
}
96
97
/**
98
* Test async close of socket when thread blocked in read
99
*/
100
public void testRead5() throws Exception {
101
withConnection((sc, peer) -> {
102
scheduleClose(sc, 2000);
103
InputStream in = sc.socket().getInputStream();
104
expectThrows(IOException.class, () -> in.read());
105
});
106
}
107
108
/**
109
* Test interrupt status set before read
110
*/
111
public void testRead6() throws Exception {
112
withConnection((sc, peer) -> {
113
Socket s = sc.socket();
114
Thread.currentThread().interrupt();
115
try {
116
InputStream in = s.getInputStream();
117
expectThrows(IOException.class, () -> in.read());
118
} finally {
119
Thread.interrupted(); // clear interrupt
120
}
121
assertTrue(s.isClosed());
122
});
123
}
124
125
/**
126
* Test interrupt of thread blocked in read
127
*/
128
public void testRead7() throws Exception {
129
withConnection((sc, peer) -> {
130
Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
131
Socket s = sc.socket();
132
try {
133
InputStream in = s.getInputStream();
134
expectThrows(IOException.class, () -> in.read());
135
} finally {
136
interrupter.cancel(true);
137
Thread.interrupted(); // clear interrupt
138
}
139
assertTrue(s.isClosed());
140
});
141
}
142
143
/**
144
* Test read when channel is configured non-blocking
145
*/
146
public void testRead8() throws Exception {
147
withConnection((sc, peer) -> {
148
sc.configureBlocking(false);
149
InputStream in = sc.socket().getInputStream();
150
expectThrows(IllegalBlockingModeException.class, () -> in.read());
151
});
152
}
153
154
/**
155
* Test timed read when bytes are available
156
*/
157
public void testTimedRead1() throws Exception {
158
withConnection((sc, peer) -> {
159
peer.getOutputStream().write(99);
160
Socket s = sc.socket();
161
s.setSoTimeout(1000);
162
int n = s.getInputStream().read();
163
assertEquals(n, 99);
164
});
165
}
166
167
/**
168
* Test timed read blocking before bytes are available
169
*/
170
public void testTimedRead2() throws Exception {
171
withConnection((sc, peer) -> {
172
scheduleWrite(peer.getOutputStream(), 99, 1000);
173
Socket s = sc.socket();
174
s.setSoTimeout(5000);
175
int n = s.getInputStream().read();
176
assertEquals(n, 99);
177
});
178
}
179
180
/**
181
* Test timed read when the read times out
182
*/
183
public void testTimedRead3() throws Exception {
184
withConnection((sc, peer) -> {
185
Socket s = sc.socket();
186
s.setSoTimeout(1000);
187
InputStream in = s.getInputStream();
188
expectThrows(SocketTimeoutException.class, () -> in.read());
189
});
190
}
191
192
/**
193
* Test async close of socket when thread blocked in timed read
194
*/
195
public void testTimedRead4() throws Exception {
196
withConnection((sc, peer) -> {
197
scheduleClose(sc, 2000);
198
Socket s = sc.socket();
199
s.setSoTimeout(60*1000);
200
InputStream in = s.getInputStream();
201
expectThrows(IOException.class, () -> in.read());
202
});
203
}
204
205
/**
206
* Test interrupt status set before timed read
207
*/
208
public void testTimedRead5() throws Exception {
209
withConnection((sc, peer) -> {
210
Socket s = sc.socket();
211
Thread.currentThread().interrupt();
212
try {
213
s.setSoTimeout(60*1000);
214
InputStream in = s.getInputStream();
215
expectThrows(IOException.class, () -> in.read());
216
} finally {
217
Thread.interrupted(); // clear interrupt
218
}
219
assertTrue(s.isClosed());
220
});
221
}
222
223
/**
224
* Test interrupt of thread blocked in timed read
225
*/
226
public void testTimedRead6() throws Exception {
227
withConnection((sc, peer) -> {
228
Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
229
Socket s = sc.socket();
230
try {
231
s.setSoTimeout(60*1000);
232
InputStream in = s.getInputStream();
233
expectThrows(IOException.class, () -> in.read());
234
assertTrue(s.isClosed());
235
} finally {
236
interrupter.cancel(true);
237
Thread.interrupted(); // clear interrupt
238
}
239
assertTrue(s.isClosed());
240
});
241
}
242
243
/**
244
* Test async close of socket when thread blocked in write
245
*/
246
public void testWrite1() throws Exception {
247
withConnection((sc, peer) -> {
248
scheduleClose(sc, 2000);
249
expectThrows(IOException.class, () -> {
250
OutputStream out = sc.socket().getOutputStream();
251
byte[] data = new byte[64*1000];
252
while (true) {
253
out.write(data);
254
}
255
});
256
});
257
}
258
259
/**
260
* Test interrupt status set before write
261
*/
262
public void testWrite2() throws Exception {
263
withConnection((sc, peer) -> {
264
Socket s = sc.socket();
265
Thread.currentThread().interrupt();
266
try {
267
OutputStream out = s.getOutputStream();
268
expectThrows(IOException.class, () -> out.write(99));
269
} finally {
270
Thread.interrupted(); // clear interrupt
271
}
272
assertTrue(s.isClosed());
273
});
274
}
275
276
/**
277
* Test interrupt of thread blocked in write
278
*/
279
public void testWrite3() throws Exception {
280
withConnection((sc, peer) -> {
281
Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
282
Socket s = sc.socket();
283
try {
284
expectThrows(IOException.class, () -> {
285
OutputStream out = sc.socket().getOutputStream();
286
byte[] data = new byte[64*1000];
287
while (true) {
288
out.write(data);
289
}
290
});
291
} finally {
292
interrupter.cancel(true);
293
Thread.interrupted(); // clear interrupt
294
}
295
assertTrue(s.isClosed());
296
});
297
}
298
299
/**
300
* Test write when channel is configured non-blocking
301
*/
302
public void testWrite4() throws Exception {
303
withConnection((sc, peer) -> {
304
sc.configureBlocking(false);
305
OutputStream out = sc.socket().getOutputStream();
306
expectThrows(IllegalBlockingModeException.class, () -> out.write(99));
307
});
308
}
309
310
/**
311
* Test read when there are bytes available and another thread is blocked
312
* in write
313
*/
314
public void testConcurrentReadWrite1() throws Exception {
315
withConnection((sc, peer) -> {
316
Socket s = sc.socket();
317
318
// block thread in write
319
execute(() -> {
320
var data = new byte[64*1024];
321
OutputStream out = s.getOutputStream();
322
for (;;) {
323
out.write(data);
324
}
325
});
326
Thread.sleep(1000); // give writer time to block
327
328
// test read when bytes are available
329
peer.getOutputStream().write(99);
330
int n = s.getInputStream().read();
331
assertEquals(n, 99);
332
});
333
}
334
335
/**
336
* Test read blocking when another thread is blocked in write
337
*/
338
public void testConcurrentReadWrite2() throws Exception {
339
withConnection((sc, peer) -> {
340
Socket s = sc.socket();
341
342
// block thread in write
343
execute(() -> {
344
var data = new byte[64*1024];
345
OutputStream out = s.getOutputStream();
346
for (;;) {
347
out.write(data);
348
}
349
});
350
Thread.sleep(1000); // give writer time to block
351
352
// test read blocking until bytes are available
353
scheduleWrite(peer.getOutputStream(), 99, 500);
354
int n = s.getInputStream().read();
355
assertEquals(n, 99);
356
});
357
}
358
359
/**
360
* Test writing when another thread is blocked in read
361
*/
362
public void testConcurrentReadWrite3() throws Exception {
363
withConnection((sc, peer) -> {
364
Socket s = sc.socket();
365
366
// block thread in read
367
execute(() -> {
368
s.getInputStream().read();
369
});
370
Thread.sleep(100); // give reader time to block
371
372
// test write
373
s.getOutputStream().write(99);
374
int n = peer.getInputStream().read();
375
assertEquals(n, 99);
376
});
377
}
378
379
/**
380
* Test timed read when there are bytes available and another thread is
381
* blocked in write
382
*/
383
public void testConcurrentTimedReadWrite1() throws Exception {
384
withConnection((sc, peer) -> {
385
Socket s = sc.socket();
386
387
// block thread in write
388
execute(() -> {
389
var data = new byte[64*1024];
390
OutputStream out = s.getOutputStream();
391
for (;;) {
392
out.write(data);
393
}
394
});
395
Thread.sleep(1000); // give writer time to block
396
397
// test read when bytes are available
398
peer.getOutputStream().write(99);
399
s.setSoTimeout(60*1000);
400
int n = s.getInputStream().read();
401
assertEquals(n, 99);
402
});
403
}
404
405
/**
406
* Test timed read blocking when another thread is blocked in write
407
*/
408
public void testConcurrentTimedReadWrite2() throws Exception {
409
withConnection((sc, peer) -> {
410
Socket s = sc.socket();
411
412
// block thread in write
413
execute(() -> {
414
var data = new byte[64*1024];
415
OutputStream out = s.getOutputStream();
416
for (;;) {
417
out.write(data);
418
}
419
});
420
Thread.sleep(1000); // give writer time to block
421
422
// test read blocking until bytes are available
423
scheduleWrite(peer.getOutputStream(), 99, 500);
424
s.setSoTimeout(60*1000);
425
int n = s.getInputStream().read();
426
assertEquals(n, 99);
427
});
428
}
429
430
/**
431
* Test writing when another thread is blocked in read
432
*/
433
public void testConcurrentTimedReadWrite3() throws Exception {
434
withConnection((sc, peer) -> {
435
Socket s = sc.socket();
436
437
// block thread in read
438
execute(() -> {
439
s.setSoTimeout(60*1000);
440
s.getInputStream().read();
441
});
442
Thread.sleep(100); // give reader time to block
443
444
// test write
445
s.getOutputStream().write(99);
446
int n = peer.getInputStream().read();
447
assertEquals(n, 99);
448
});
449
}
450
451
// -- test infrastructure --
452
453
interface ThrowingTask {
454
void run() throws Exception;
455
}
456
457
interface ThrowingBiConsumer<T, U> {
458
void accept(T t, U u) throws Exception;
459
}
460
461
/**
462
* Invokes the consumer with a connected pair of socket channel and socket
463
*/
464
static void withConnection(ThrowingBiConsumer<SocketChannel, Socket> consumer)
465
throws Exception
466
{
467
var loopback = InetAddress.getLoopbackAddress();
468
try (ServerSocket ss = new ServerSocket()) {
469
ss.bind(new InetSocketAddress(loopback, 0));
470
try (SocketChannel sc = SocketChannel.open(ss.getLocalSocketAddress())) {
471
try (Socket peer = ss.accept()) {
472
consumer.accept(sc, peer);
473
}
474
}
475
}
476
}
477
478
static Future<?> scheduleWrite(OutputStream out, byte[] data, long delay) {
479
return schedule(() -> {
480
try {
481
out.write(data);
482
} catch (IOException ioe) { }
483
}, delay);
484
}
485
486
static Future<?> scheduleWrite(OutputStream out, int b, long delay) {
487
return scheduleWrite(out, new byte[] { (byte)b }, delay);
488
}
489
490
static Future<?> scheduleClose(Closeable c, long delay) {
491
return schedule(() -> {
492
try {
493
c.close();
494
} catch (IOException ioe) { }
495
}, delay);
496
}
497
498
static Future<?> scheduleInterrupt(Thread t, long delay) {
499
return schedule(() -> t.interrupt(), delay);
500
}
501
502
static Future<?> schedule(Runnable task, long delay) {
503
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
504
try {
505
return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
506
} finally {
507
executor.shutdown();
508
}
509
}
510
511
static Future<?> execute(ThrowingTask task) {
512
ExecutorService pool = Executors.newFixedThreadPool(1);
513
try {
514
return pool.submit(() -> {
515
task.run();
516
return null;
517
});
518
} finally {
519
pool.shutdown();
520
}
521
}
522
}
523
524