Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/sun/net/www/protocol/https/TestHttpsServer.java
41159 views
1
/*
2
* Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
import java.net.*;
25
import java.io.*;
26
import java.nio.*;
27
import java.nio.channels.*;
28
import sun.net.www.MessageHeader;
29
import java.util.*;
30
import javax.net.ssl.*;
31
import javax.net.ssl.SSLEngineResult.*;
32
import java.security.*;
33
34
/**
35
* This class implements a simple HTTPS server. It uses multiple threads to
36
* handle connections in parallel, and will spin off a new thread to handle
37
* each request. (this is easier to implement with SSLEngine)
38
* <p>
39
* It must be instantiated with a {@link HttpCallback} object to which
40
* requests are given and must be handled.
41
* <p>
42
* Simple synchronization between the client(s) and server can be done
43
* using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
44
* {@link #rendezvous(String,int)} methods.
45
*
46
* NOTE NOTE NOTE NOTE NOTE NOTE NOTE
47
*
48
* If you make a change in here, please don't forget to make the
49
* corresponding change in the J2SE equivalent.
50
*
51
* NOTE NOTE NOTE NOTE NOTE NOTE NOTE
52
*/
53
54
public class TestHttpsServer {
55
56
ServerSocketChannel schan;
57
int threads;
58
int cperthread;
59
HttpCallback cb;
60
Server[] servers;
61
62
// ssl related fields
63
static SSLContext sslCtx;
64
65
/**
66
* Create a <code>TestHttpsServer<code> instance with the specified callback object
67
* for handling requests. One thread is created to handle requests,
68
* and up to ten TCP connections will be handled simultaneously.
69
* @param cb the callback object which is invoked to handle each
70
* incoming request
71
*/
72
73
public TestHttpsServer(HttpCallback cb) throws IOException {
74
this(cb, 1, 10, 0);
75
}
76
77
/**
78
* Create a <code>TestHttpsServer<code> instance with the specified number of
79
* threads and maximum number of connections per thread. This functions
80
* the same as the 4 arg constructor, where the port argument is set to zero.
81
* @param cb the callback object which is invoked to handle each
82
* incoming request
83
* @param threads the number of threads to create to handle requests
84
* in parallel
85
* @param cperthread the number of simultaneous TCP connections to
86
* handle per thread
87
*/
88
89
public TestHttpsServer(HttpCallback cb, int threads, int cperthread)
90
throws IOException {
91
this(cb, threads, cperthread, 0);
92
}
93
94
/**
95
* Create a <code>TestHttpsServer<code> instance with the specified number
96
* of threads and maximum number of connections per thread and running on
97
* the specified port. The specified number of threads are created to
98
* handle incoming requests, and each thread is allowed
99
* to handle a number of simultaneous TCP connections.
100
* @param cb the callback object which is invoked to handle
101
* each incoming request
102
* @param threads the number of threads to create to handle
103
* requests in parallel
104
* @param cperthread the number of simultaneous TCP connections
105
* to handle per thread
106
* @param port the port number to bind the server to. <code>Zero</code>
107
* means choose any free port.
108
*/
109
public TestHttpsServer(HttpCallback cb, int threads, int cperthread, int port)
110
throws IOException {
111
this(cb, threads, cperthread, null, port);
112
}
113
114
/**
115
* Create a <code>TestHttpsServer<code> instance with the specified number
116
* of threads and maximum number of connections per thread and running on
117
* the specified port. The specified number of threads are created to
118
* handle incoming requests, and each thread is allowed
119
* to handle a number of simultaneous TCP connections.
120
* @param cb the callback object which is invoked to handle
121
* each incoming request
122
* @param threads the number of threads to create to handle
123
* requests in parallel
124
* @param cperthread the number of simultaneous TCP connections
125
* to handle per thread
126
* @param address the InetAddress to bind to. {@code Null} means the
127
* wildcard address.
128
* @param port the port number to bind the server to. {@code Zero}
129
* means choose any free port.
130
*/
131
132
public TestHttpsServer(HttpCallback cb, int threads, int cperthread, InetAddress address, int port)
133
throws IOException {
134
schan = ServerSocketChannel.open();
135
InetSocketAddress addr = new InetSocketAddress(address, port);
136
schan.socket().bind(addr);
137
this.threads = threads;
138
this.cb = cb;
139
this.cperthread = cperthread;
140
141
try {
142
// create and initialize a SSLContext
143
KeyStore ks = KeyStore.getInstance("JKS");
144
KeyStore ts = KeyStore.getInstance("JKS");
145
char[] passphrase = "passphrase".toCharArray();
146
147
ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase);
148
ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase);
149
150
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
151
kmf.init(ks, passphrase);
152
153
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
154
tmf.init(ts);
155
156
sslCtx = SSLContext.getInstance("TLS");
157
158
sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
159
160
servers = new Server[threads];
161
for (int i=0; i<threads; i++) {
162
servers[i] = new Server(cb, schan, cperthread);
163
servers[i].start();
164
}
165
} catch (Exception ex) {
166
throw new RuntimeException("test failed. cause: "+ex.getMessage());
167
}
168
}
169
170
/** Tell all threads in the server to exit within 5 seconds.
171
* This is an abortive termination. Just prior to the thread exiting
172
* all channels in that thread waiting to be closed are forceably closed.
173
*/
174
175
public void terminate() {
176
for (int i=0; i<threads; i++) {
177
servers[i].terminate ();
178
}
179
}
180
181
/**
182
* return the local port number to which the server is bound.
183
* @return the local port number
184
*/
185
186
public int getLocalPort () {
187
return schan.socket().getLocalPort ();
188
}
189
190
public String getAuthority() {
191
InetAddress address = schan.socket().getInetAddress();
192
String hostaddr = address.getHostAddress();
193
if (address.isAnyLocalAddress()) hostaddr = "localhost";
194
if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";
195
return hostaddr + ":" + getLocalPort();
196
}
197
198
static class Server extends Thread {
199
200
ServerSocketChannel schan;
201
Selector selector;
202
SelectionKey listenerKey;
203
SelectionKey key; /* the current key being processed */
204
HttpCallback cb;
205
ByteBuffer consumeBuffer;
206
int maxconn;
207
int nconn;
208
ClosedChannelList clist;
209
boolean shutdown;
210
211
Server(HttpCallback cb, ServerSocketChannel schan, int maxconn) {
212
this.schan = schan;
213
this.maxconn = maxconn;
214
this.cb = cb;
215
nconn = 0;
216
consumeBuffer = ByteBuffer.allocate(512);
217
clist = new ClosedChannelList();
218
try {
219
selector = Selector.open();
220
schan.configureBlocking(false);
221
listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
222
} catch (IOException e) {
223
System.err.println("Server could not start: " + e);
224
}
225
}
226
227
/* Stop the thread as soon as possible */
228
public synchronized void terminate() {
229
shutdown = true;
230
}
231
232
public void run() {
233
try {
234
while (true) {
235
selector.select(1000);
236
Set selected = selector.selectedKeys();
237
Iterator iter = selected.iterator();
238
while (iter.hasNext()) {
239
key = (SelectionKey)iter.next();
240
if (key.equals (listenerKey)) {
241
SocketChannel sock = schan.accept();
242
if (sock == null) {
243
/* false notification */
244
iter.remove();
245
continue;
246
}
247
sock.configureBlocking(true);
248
SSLEngine sslEng = sslCtx.createSSLEngine();
249
sslEng.setUseClientMode(false);
250
new ServerWorker(cb, sock, sslEng).start();
251
nconn ++;
252
if (nconn == maxconn) {
253
/* deregister */
254
listenerKey.cancel();
255
listenerKey = null;
256
}
257
} else {
258
if (key.isReadable()) {
259
boolean closed = false;
260
SocketChannel chan = (SocketChannel)key.channel();
261
if (key.attachment() != null) {
262
closed = consume(chan);
263
}
264
265
if (closed) {
266
chan.close();
267
key.cancel();
268
if (nconn == maxconn) {
269
listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
270
}
271
nconn --;
272
}
273
}
274
}
275
iter.remove();
276
}
277
clist.check();
278
279
synchronized (this) {
280
if (shutdown) {
281
clist.terminate();
282
return;
283
}
284
}
285
}
286
} catch (IOException e) {
287
System.out.println("Server exception: " + e);
288
// TODO finish
289
}
290
}
291
292
/* read all the data off the channel without looking at it
293
* return true if connection closed
294
*/
295
boolean consume(SocketChannel chan) {
296
try {
297
consumeBuffer.clear();
298
int c = chan.read(consumeBuffer);
299
if (c == -1)
300
return true;
301
} catch (IOException e) {
302
return true;
303
}
304
return false;
305
}
306
}
307
308
static class ServerWorker extends Thread {
309
private ByteBuffer inNetBB;
310
private ByteBuffer outNetBB;
311
private ByteBuffer inAppBB;
312
private ByteBuffer outAppBB;
313
314
SSLEngine sslEng;
315
SocketChannel schan;
316
HttpCallback cb;
317
HandshakeStatus currentHSStatus;
318
boolean initialHSComplete;
319
boolean handshakeStarted;
320
/*
321
* All inbound data goes through this buffer.
322
*
323
* It might be nice to use a cache of ByteBuffers so we're
324
* not alloc/dealloc'ing all over the place.
325
*/
326
327
/*
328
* Application buffers, also used for handshaking
329
*/
330
private int appBBSize;
331
332
ServerWorker(HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
333
this.sslEng = sslEng;
334
this.schan = schan;
335
this.cb = cb;
336
currentHSStatus = HandshakeStatus.NEED_UNWRAP;
337
initialHSComplete = false;
338
int netBBSize = sslEng.getSession().getPacketBufferSize();
339
inNetBB = ByteBuffer.allocate(netBBSize);
340
outNetBB = ByteBuffer.allocate(netBBSize);
341
appBBSize = sslEng.getSession().getApplicationBufferSize();
342
inAppBB = ByteBuffer.allocate(appBBSize);
343
outAppBB = ByteBuffer.allocate(appBBSize);
344
}
345
346
public SSLEngine getSSLEngine() {
347
return sslEng;
348
}
349
350
public ByteBuffer outNetBB() {
351
return outNetBB;
352
}
353
354
public ByteBuffer outAppBB() {
355
return outAppBB;
356
}
357
358
public void run () {
359
try {
360
SSLEngineResult result;
361
362
while (!initialHSComplete) {
363
364
switch (currentHSStatus) {
365
366
case NEED_UNWRAP:
367
int bytes = schan.read(inNetBB);
368
if (!handshakeStarted && bytes > 0) {
369
handshakeStarted = true;
370
int byte0 = inNetBB.get(0);
371
if (byte0 != 0x16) {
372
// first byte of a TLS connection is supposed to be
373
// 0x16. If not it may be a plain text connection.
374
//
375
// Sometime a rogue client may try to open a plain
376
// connection with our server. Calling this method
377
// gives a chance to the test logic to ignore such
378
// rogue connections.
379
//
380
if (cb.dropPlainTextConnections()) {
381
try { schan.close(); } catch (IOException x) { };
382
return;
383
}
384
// else sslEng.unwrap will throw later on...
385
}
386
}
387
388
needIO:
389
while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) {
390
/*
391
* Don't need to resize requestBB, since no app data should
392
* be generated here.
393
*/
394
inNetBB.flip();
395
result = sslEng.unwrap(inNetBB, inAppBB);
396
inNetBB.compact();
397
currentHSStatus = result.getHandshakeStatus();
398
399
switch (result.getStatus()) {
400
401
case OK:
402
switch (currentHSStatus) {
403
case NOT_HANDSHAKING:
404
throw new IOException(
405
"Not handshaking during initial handshake");
406
407
case NEED_TASK:
408
Runnable task;
409
while ((task = sslEng.getDelegatedTask()) != null) {
410
task.run();
411
currentHSStatus = sslEng.getHandshakeStatus();
412
}
413
break;
414
}
415
416
break;
417
418
case BUFFER_UNDERFLOW:
419
break needIO;
420
421
default: // BUFFER_OVERFLOW/CLOSED:
422
throw new IOException("Received" + result.getStatus() +
423
"during initial handshaking");
424
}
425
}
426
427
/*
428
* Just transitioned from read to write.
429
*/
430
if (currentHSStatus != HandshakeStatus.NEED_WRAP) {
431
break;
432
}
433
434
// Fall through and fill the write buffer.
435
436
case NEED_WRAP:
437
/*
438
* The flush above guarantees the out buffer to be empty
439
*/
440
outNetBB.clear();
441
result = sslEng.wrap(inAppBB, outNetBB);
442
outNetBB.flip();
443
schan.write (outNetBB);
444
outNetBB.compact();
445
currentHSStatus = result.getHandshakeStatus();
446
447
switch (result.getStatus()) {
448
case OK:
449
450
if (currentHSStatus == HandshakeStatus.NEED_TASK) {
451
Runnable task;
452
while ((task = sslEng.getDelegatedTask()) != null) {
453
task.run();
454
currentHSStatus = sslEng.getHandshakeStatus();
455
}
456
}
457
458
break;
459
460
default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED:
461
throw new IOException("Received" + result.getStatus() +
462
"during initial handshaking");
463
}
464
break;
465
466
case FINISHED:
467
initialHSComplete = true;
468
break;
469
default: // NOT_HANDSHAKING/NEED_TASK
470
throw new RuntimeException("Invalid Handshaking State" +
471
currentHSStatus);
472
} // switch
473
}
474
// read the application data; using non-blocking mode
475
schan.configureBlocking(false);
476
read(schan, sslEng);
477
} catch (Exception ex) {
478
throw new RuntimeException(ex);
479
}
480
}
481
482
/* return true if the connection is closed, false otherwise */
483
484
private boolean read(SocketChannel chan, SSLEngine sslEng) {
485
HttpTransaction msg;
486
boolean res;
487
try {
488
InputStream is = new BufferedInputStream(new NioInputStream(chan, sslEng, inNetBB, inAppBB));
489
String requestline = readLine(is);
490
MessageHeader mhead = new MessageHeader(is);
491
String clen = mhead.findValue("Content-Length");
492
String trferenc = mhead.findValue("Transfer-Encoding");
493
String data = null;
494
if (trferenc != null && trferenc.equals("chunked"))
495
data = new String(readChunkedData(is));
496
else if (clen != null)
497
data = new String(readNormalData(is, Integer.parseInt(clen)));
498
String[] req = requestline.split(" ");
499
if (req.length < 2) {
500
/* invalid request line */
501
return false;
502
}
503
String cmd = req[0];
504
URI uri = null;
505
try {
506
uri = new URI(req[1]);
507
msg = new HttpTransaction(this, cmd, uri, mhead, data, null, chan);
508
cb.request(msg);
509
} catch (URISyntaxException e) {
510
System.err.println ("Invalid URI: " + e);
511
msg = new HttpTransaction(this, cmd, null, null, null, null, chan);
512
msg.sendResponse(501, "Whatever");
513
}
514
res = false;
515
} catch (IOException e) {
516
res = true;
517
}
518
return res;
519
}
520
521
byte[] readNormalData(InputStream is, int len) throws IOException {
522
byte[] buf = new byte[len];
523
int c, off=0, remain=len;
524
while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
525
remain -= c;
526
off += c;
527
}
528
return buf;
529
}
530
531
private void readCRLF(InputStream is) throws IOException {
532
int cr = is.read();
533
int lf = is.read();
534
535
if (((cr & 0xff) != 0x0d) ||
536
((lf & 0xff) != 0x0a)) {
537
throw new IOException(
538
"Expected <CR><LF>: got '" + cr + "/" + lf + "'");
539
}
540
}
541
542
byte[] readChunkedData(InputStream is) throws IOException {
543
LinkedList l = new LinkedList();
544
int total = 0;
545
for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
546
l.add(readNormalData(is, len));
547
total += len;
548
readCRLF(is); // CRLF at end of chunk
549
}
550
readCRLF(is); // CRLF at end of Chunked Stream.
551
byte[] buf = new byte[total];
552
Iterator i = l.iterator();
553
int x = 0;
554
while (i.hasNext()) {
555
byte[] b = (byte[])i.next();
556
System.arraycopy(b, 0, buf, x, b.length);
557
x += b.length;
558
}
559
return buf;
560
}
561
562
private int readChunkLen(InputStream is) throws IOException {
563
int c, len=0;
564
boolean done=false, readCR=false;
565
while (!done) {
566
c = is.read();
567
if (c == '\n' && readCR) {
568
done = true;
569
} else {
570
if (c == '\r' && !readCR) {
571
readCR = true;
572
} else {
573
int x=0;
574
if (c >= 'a' && c <= 'f') {
575
x = c - 'a' + 10;
576
} else if (c >= 'A' && c <= 'F') {
577
x = c - 'A' + 10;
578
} else if (c >= '0' && c <= '9') {
579
x = c - '0';
580
}
581
len = len * 16 + x;
582
}
583
}
584
}
585
return len;
586
}
587
588
private String readLine(InputStream is) throws IOException {
589
boolean done=false, readCR=false;
590
byte[] b = new byte[512];
591
int c, l = 0;
592
593
while (!done) {
594
c = is.read();
595
if (c == '\n' && readCR) {
596
done = true;
597
} else {
598
if (c == '\r' && !readCR) {
599
readCR = true;
600
} else {
601
b[l++] = (byte)c;
602
}
603
}
604
}
605
return new String(b);
606
}
607
608
/** close the channel associated with the current key by:
609
* 1. shutdownOutput (send a FIN)
610
* 2. mark the key so that incoming data is to be consumed and discarded
611
* 3. After a period, close the socket
612
*/
613
614
synchronized void orderlyCloseChannel(SocketChannel ch) throws IOException {
615
ch.socket().shutdownOutput();
616
}
617
618
synchronized void abortiveCloseChannel(SocketChannel ch) throws IOException {
619
Socket s = ch.socket();
620
s.setSoLinger(true, 0);
621
ch.close();
622
}
623
}
624
625
626
/**
627
* Implements blocking reading semantics on top of a non-blocking channel
628
*/
629
630
static class NioInputStream extends InputStream {
631
SSLEngine sslEng;
632
SocketChannel channel;
633
Selector selector;
634
ByteBuffer inNetBB;
635
ByteBuffer inAppBB;
636
SelectionKey key;
637
int available;
638
byte[] one;
639
boolean closed;
640
ByteBuffer markBuf; /* reads may be satisifed from this buffer */
641
boolean marked;
642
boolean reset;
643
int readlimit;
644
645
public NioInputStream(SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
646
this.sslEng = sslEng;
647
this.channel = chan;
648
selector = Selector.open();
649
this.inNetBB = inNetBB;
650
this.inAppBB = inAppBB;
651
key = chan.register(selector, SelectionKey.OP_READ);
652
available = 0;
653
one = new byte[1];
654
closed = marked = reset = false;
655
}
656
657
public synchronized int read(byte[] b) throws IOException {
658
return read(b, 0, b.length);
659
}
660
661
public synchronized int read() throws IOException {
662
return read(one, 0, 1);
663
}
664
665
public synchronized int read(byte[] b, int off, int srclen) throws IOException {
666
667
int canreturn, willreturn;
668
669
if (closed)
670
return -1;
671
672
if (reset) { /* satisfy from markBuf */
673
canreturn = markBuf.remaining();
674
willreturn = canreturn > srclen ? srclen : canreturn;
675
markBuf.get(b, off, willreturn);
676
if (canreturn == willreturn) {
677
reset = false;
678
}
679
} else { /* satisfy from channel */
680
canreturn = available();
681
if (canreturn == 0) {
682
block();
683
canreturn = available();
684
}
685
willreturn = canreturn > srclen ? srclen : canreturn;
686
inAppBB.get(b, off, willreturn);
687
available -= willreturn;
688
689
if (marked) { /* copy into markBuf */
690
try {
691
markBuf.put(b, off, willreturn);
692
} catch (BufferOverflowException e) {
693
marked = false;
694
}
695
}
696
}
697
return willreturn;
698
}
699
700
public synchronized int available() throws IOException {
701
if (closed)
702
throw new IOException("Stream is closed");
703
704
if (reset)
705
return markBuf.remaining();
706
707
if (available > 0)
708
return available;
709
710
inAppBB.clear();
711
int bytes = channel.read(inNetBB);
712
713
int needed = sslEng.getSession().getApplicationBufferSize();
714
if (needed > inAppBB.remaining()) {
715
inAppBB = ByteBuffer.allocate(needed);
716
}
717
inNetBB.flip();
718
SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB);
719
inNetBB.compact();
720
available = result.bytesProduced();
721
722
if (available > 0)
723
inAppBB.flip();
724
else if (available == -1)
725
throw new IOException("Stream is closed");
726
return available;
727
}
728
729
/**
730
* block() only called when available==0 and buf is empty
731
*/
732
private synchronized void block() throws IOException {
733
//assert available == 0;
734
int n = selector.select();
735
//assert n == 1;
736
selector.selectedKeys().clear();
737
available();
738
}
739
740
public void close() throws IOException {
741
if (closed)
742
return;
743
channel.close();
744
closed = true;
745
}
746
747
public synchronized void mark(int readlimit) {
748
if (closed)
749
return;
750
this.readlimit = readlimit;
751
markBuf = ByteBuffer.allocate(readlimit);
752
marked = true;
753
reset = false;
754
}
755
756
public synchronized void reset() throws IOException {
757
if (closed )
758
return;
759
if (!marked)
760
throw new IOException("Stream not marked");
761
marked = false;
762
reset = true;
763
markBuf.flip();
764
}
765
}
766
767
static class NioOutputStream extends OutputStream {
768
SSLEngine sslEng;
769
SocketChannel channel;
770
ByteBuffer outNetBB;
771
ByteBuffer outAppBB;
772
SelectionKey key;
773
Selector selector;
774
boolean closed;
775
byte[] one;
776
777
public NioOutputStream(SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
778
this.sslEng = sslEng;
779
this.channel = channel;
780
this.outNetBB = outNetBB;
781
this.outAppBB = outAppBB;
782
selector = Selector.open();
783
key = channel.register(selector, SelectionKey.OP_WRITE);
784
closed = false;
785
one = new byte[1];
786
}
787
788
public synchronized void write(int b) throws IOException {
789
one[0] = (byte)b;
790
write(one, 0, 1);
791
}
792
793
public synchronized void write(byte[] b) throws IOException {
794
write(b, 0, b.length);
795
}
796
797
public synchronized void write(byte[] b, int off, int len) throws IOException {
798
if (closed)
799
throw new IOException("stream is closed");
800
801
outAppBB = ByteBuffer.allocate(len);
802
outAppBB.put(b, off, len);
803
outAppBB.flip();
804
int n;
805
outNetBB.clear();
806
int needed = sslEng.getSession().getPacketBufferSize();
807
if (outNetBB.capacity() < needed) {
808
outNetBB = ByteBuffer.allocate(needed);
809
}
810
SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB);
811
outNetBB.flip();
812
int newLen = ret.bytesProduced();
813
while ((n = channel.write (outNetBB)) < newLen) {
814
newLen -= n;
815
if (newLen == 0)
816
return;
817
selector.select();
818
selector.selectedKeys().clear();
819
}
820
}
821
822
public void close() throws IOException {
823
if (closed)
824
return;
825
channel.close();
826
closed = true;
827
}
828
}
829
830
/**
831
* Utilities for synchronization. A condition is
832
* identified by a string name, and is initialized
833
* upon first use (ie. setCondition() or waitForCondition()). Threads
834
* are blocked until some thread calls (or has called) setCondition() for the same
835
* condition.
836
* <P>
837
* A rendezvous built on a condition is also provided for synchronizing
838
* N threads.
839
*/
840
841
private static HashMap conditions = new HashMap();
842
843
/*
844
* Modifiable boolean object
845
*/
846
private static class BValue {
847
boolean v;
848
}
849
850
/*
851
* Modifiable int object
852
*/
853
private static class IValue {
854
int v;
855
IValue(int i) {
856
v =i;
857
}
858
}
859
860
861
private static BValue getCond(String condition) {
862
synchronized (conditions) {
863
BValue cond = (BValue) conditions.get(condition);
864
if (cond == null) {
865
cond = new BValue();
866
conditions.put(condition, cond);
867
}
868
return cond;
869
}
870
}
871
872
/**
873
* Set the condition to true. Any threads that are currently blocked
874
* waiting on the condition, will be unblocked and allowed to continue.
875
* Threads that subsequently call waitForCondition() will not block.
876
* If the named condition did not exist prior to the call, then it is created
877
* first.
878
*/
879
880
public static void setCondition(String condition) {
881
BValue cond = getCond(condition);
882
synchronized (cond) {
883
if (cond.v) {
884
return;
885
}
886
cond.v = true;
887
cond.notifyAll();
888
}
889
}
890
891
/**
892
* If the named condition does not exist, then it is created and initialized
893
* to false. If the condition exists or has just been created and its value
894
* is false, then the thread blocks until another thread sets the condition.
895
* If the condition exists and is already set to true, then this call returns
896
* immediately without blocking.
897
*/
898
899
public static void waitForCondition(String condition) {
900
BValue cond = getCond(condition);
901
synchronized (cond) {
902
if (!cond.v) {
903
try {
904
cond.wait();
905
} catch (InterruptedException e) {}
906
}
907
}
908
}
909
910
/* conditions must be locked when accessing this */
911
static HashMap rv = new HashMap();
912
913
/**
914
* Force N threads to rendezvous (ie. wait for each other) before proceeding.
915
* The first thread(s) to call are blocked until the last
916
* thread makes the call. Then all threads continue.
917
* <p>
918
* All threads that call with the same condition name, must use the same value
919
* for N (or the results may be not be as expected).
920
* <P>
921
* Obviously, if fewer than N threads make the rendezvous then the result
922
* will be a hang.
923
*/
924
925
public static void rendezvous(String condition, int N) {
926
BValue cond;
927
IValue iv;
928
String name = "RV_"+condition;
929
930
/* get the condition */
931
932
synchronized (conditions) {
933
cond = (BValue)conditions.get(name);
934
if (cond == null) {
935
/* we are first caller */
936
if (N < 2) {
937
throw new RuntimeException("rendezvous must be called with N >= 2");
938
}
939
cond = new BValue();
940
conditions.put(name, cond);
941
iv = new IValue(N-1);
942
rv.put(name, iv);
943
} else {
944
/* already initialised, just decrement the counter */
945
iv = (IValue) rv.get(name);
946
iv.v--;
947
}
948
}
949
950
if (iv.v > 0) {
951
waitForCondition(name);
952
} else {
953
setCondition(name);
954
synchronized (conditions) {
955
clearCondition(name);
956
rv.remove(name);
957
}
958
}
959
}
960
961
/**
962
* If the named condition exists and is set then remove it, so it can
963
* be re-initialized and used again. If the condition does not exist, or
964
* exists but is not set, then the call returns without doing anything.
965
* Note, some higher level synchronization
966
* may be needed between clear and the other operations.
967
*/
968
969
public static void clearCondition(String condition) {
970
BValue cond;
971
synchronized (conditions) {
972
cond = (BValue) conditions.get(condition);
973
if (cond == null) {
974
return;
975
}
976
synchronized (cond) {
977
if (cond.v) {
978
conditions.remove(condition);
979
}
980
}
981
}
982
}
983
}
984
985