Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/sun/net/www/protocol/https/HttpsURLConnection/TunnelProxy.java
41161 views
1
/*
2
* Copyright (c) 2005, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/*
25
*
26
*/
27
28
import java.net.*;
29
import java.io.*;
30
import java.nio.*;
31
import java.nio.channels.*;
32
import sun.net.www.MessageHeader;
33
import java.util.*;
34
35
public class TunnelProxy {
36
37
ServerSocketChannel schan;
38
int threads;
39
int cperthread;
40
Server[] servers;
41
42
/**
43
* Create a <code>TunnelProxy<code> instance with the specified callback object
44
* for handling requests. One thread is created to handle requests,
45
* and up to ten TCP connections will be handled simultaneously.
46
* @param cb the callback object which is invoked to handle each
47
* incoming request
48
*/
49
50
public TunnelProxy () throws IOException {
51
this (1, 10, 0);
52
}
53
54
/**
55
* Create a <code>TunnelProxy<code> instance with the specified number of
56
* threads and maximum number of connections per thread. This functions
57
* the same as the 4 arg constructor, where the port argument is set to zero.
58
* @param cb the callback object which is invoked to handle each
59
* incoming request
60
* @param threads the number of threads to create to handle requests
61
* in parallel
62
* @param cperthread the number of simultaneous TCP connections to
63
* handle per thread
64
*/
65
66
public TunnelProxy (int threads, int cperthread)
67
throws IOException {
68
this (threads, cperthread, 0);
69
}
70
71
/**
72
* Create a <code>TunnelProxy<code> instance with the specified number
73
* of threads and maximum number of connections per thread and running on
74
* the specified port. The specified number of threads are created to
75
* handle incoming requests, and each thread is allowed
76
* to handle a number of simultaneous TCP connections.
77
* @param cb the callback object which is invoked to handle
78
* each incoming request
79
* @param threads the number of threads to create to handle
80
* requests in parallel
81
* @param cperthread the number of simultaneous TCP connections
82
* to handle per thread
83
* @param port the port number to bind the server to. <code>Zero</code>
84
* means choose any free port.
85
*/
86
87
public TunnelProxy (int threads, int cperthread, int port)
88
throws IOException {
89
this(threads, cperthread, null, 0);
90
}
91
92
/**
93
* Create a <code>TunnelProxy<code> instance with the specified number
94
* of threads and maximum number of connections per thread and running on
95
* the specified port. The specified number of threads are created to
96
* handle incoming requests, and each thread is allowed
97
* to handle a number of simultaneous TCP connections.
98
* @param cb the callback object which is invoked to handle
99
* each incoming request
100
* @param threads the number of threads to create to handle
101
* requests in parallel
102
* @param cperthread the number of simultaneous TCP connections
103
* to handle per thread
104
* @param address the address to bind to. null means all addresses.
105
* @param port the port number to bind the server to. <code>Zero</code>
106
* means choose any free port.
107
*/
108
public TunnelProxy (int threads, int cperthread, InetAddress address, int port)
109
throws IOException {
110
schan = ServerSocketChannel.open ();
111
InetSocketAddress addr = new InetSocketAddress (address, port);
112
schan.socket().bind (addr);
113
this.threads = threads;
114
this.cperthread = cperthread;
115
servers = new Server [threads];
116
for (int i=0; i<threads; i++) {
117
servers[i] = new Server (schan, cperthread);
118
servers[i].start();
119
}
120
}
121
122
/** Tell all threads in the server to exit within 5 seconds.
123
* This is an abortive termination. Just prior to the thread exiting
124
* all channels in that thread waiting to be closed are forceably closed.
125
*/
126
127
public void terminate () {
128
for (int i=0; i<threads; i++) {
129
servers[i].terminate ();
130
}
131
}
132
133
/**
134
* return the local port number to which the server is bound.
135
* @return the local port number
136
*/
137
138
public int getLocalPort () {
139
return schan.socket().getLocalPort ();
140
}
141
142
static class Server extends Thread {
143
144
ServerSocketChannel schan;
145
Selector selector;
146
SelectionKey listenerKey;
147
SelectionKey key; /* the current key being processed */
148
ByteBuffer consumeBuffer;
149
int maxconn;
150
int nconn;
151
ClosedChannelList clist;
152
boolean shutdown;
153
Pipeline pipe1 = null;
154
Pipeline pipe2 = null;
155
156
Server (ServerSocketChannel schan, int maxconn) {
157
this.schan = schan;
158
this.maxconn = maxconn;
159
nconn = 0;
160
consumeBuffer = ByteBuffer.allocate (512);
161
clist = new ClosedChannelList ();
162
try {
163
selector = Selector.open ();
164
schan.configureBlocking (false);
165
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
166
} catch (IOException e) {
167
System.err.println ("Server could not start: " + e);
168
}
169
}
170
171
/* Stop the thread as soon as possible */
172
public synchronized void terminate () {
173
shutdown = true;
174
if (pipe1 != null) pipe1.terminate();
175
if (pipe2 != null) pipe2.terminate();
176
}
177
178
public void run () {
179
try {
180
while (true) {
181
selector.select (1000);
182
Set selected = selector.selectedKeys();
183
Iterator iter = selected.iterator();
184
while (iter.hasNext()) {
185
key = (SelectionKey)iter.next();
186
if (key.equals (listenerKey)) {
187
SocketChannel sock = schan.accept ();
188
if (sock == null) {
189
/* false notification */
190
iter.remove();
191
continue;
192
}
193
sock.configureBlocking (false);
194
sock.register (selector, SelectionKey.OP_READ);
195
nconn ++;
196
if (nconn == maxconn) {
197
/* deregister */
198
listenerKey.cancel ();
199
listenerKey = null;
200
}
201
} else {
202
if (key.isReadable()) {
203
boolean closed;
204
SocketChannel chan = (SocketChannel) key.channel();
205
if (key.attachment() != null) {
206
closed = consume (chan);
207
} else {
208
closed = read (chan, key);
209
}
210
if (closed) {
211
chan.close ();
212
key.cancel ();
213
if (nconn == maxconn) {
214
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
215
}
216
nconn --;
217
}
218
}
219
}
220
iter.remove();
221
}
222
clist.check();
223
if (shutdown) {
224
clist.terminate ();
225
return;
226
}
227
}
228
} catch (IOException e) {
229
System.out.println ("Server exception: " + e);
230
// TODO finish
231
}
232
}
233
234
/* read all the data off the channel without looking at it
235
* return true if connection closed
236
*/
237
boolean consume (SocketChannel chan) {
238
try {
239
consumeBuffer.clear ();
240
int c = chan.read (consumeBuffer);
241
if (c == -1)
242
return true;
243
} catch (IOException e) {
244
return true;
245
}
246
return false;
247
}
248
249
/* return true if the connection is closed, false otherwise */
250
251
private boolean read (SocketChannel chan, SelectionKey key) {
252
HttpTransaction msg;
253
boolean res;
254
try {
255
InputStream is = new BufferedInputStream (new NioInputStream (chan));
256
String requestline = readLine (is);
257
MessageHeader mhead = new MessageHeader (is);
258
String[] req = requestline.split (" ");
259
if (req.length < 2) {
260
/* invalid request line */
261
return false;
262
}
263
String cmd = req[0];
264
URI uri = null;
265
if (!("CONNECT".equalsIgnoreCase(cmd))) {
266
// we expect CONNECT command
267
return false;
268
}
269
try {
270
uri = new URI("http://" + req[1]);
271
} catch (URISyntaxException e) {
272
System.err.println ("Invalid URI: " + e);
273
res = true;
274
}
275
276
// CONNECT ack
277
OutputStream os = new BufferedOutputStream(new NioOutputStream(chan));
278
byte[] ack = "HTTP/1.1 200 Connection established\r\n\r\n".getBytes();
279
os.write(ack, 0, ack.length);
280
os.flush();
281
282
// tunnel anything else
283
tunnel(is, os, uri);
284
285
res = false;
286
} catch (IOException e) {
287
res = true;
288
}
289
return res;
290
}
291
292
private void tunnel(InputStream fromClient, OutputStream toClient, URI serverURI) throws IOException {
293
Socket sockToServer = new Socket(serverURI.getHost(), serverURI.getPort());
294
OutputStream toServer = sockToServer.getOutputStream();
295
InputStream fromServer = sockToServer.getInputStream();
296
297
pipe1 = new Pipeline(fromClient, toServer);
298
pipe2 = new Pipeline(fromServer, toClient);
299
// start pump
300
pipe1.start();
301
pipe2.start();
302
// wait them to end
303
try {
304
pipe1.join();
305
} catch (InterruptedException e) {
306
// No-op
307
} finally {
308
sockToServer.close();
309
}
310
}
311
312
private String readLine (InputStream is) throws IOException {
313
boolean done=false, readCR=false;
314
byte[] b = new byte [512];
315
int c, l = 0;
316
317
while (!done) {
318
c = is.read ();
319
if (c == '\n' && readCR) {
320
done = true;
321
} else {
322
if (c == '\r' && !readCR) {
323
readCR = true;
324
} else {
325
b[l++] = (byte)c;
326
}
327
}
328
}
329
return new String (b);
330
}
331
332
/** close the channel associated with the current key by:
333
* 1. shutdownOutput (send a FIN)
334
* 2. mark the key so that incoming data is to be consumed and discarded
335
* 3. After a period, close the socket
336
*/
337
338
synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
339
SocketChannel ch = (SocketChannel)key.channel ();
340
ch.socket().shutdownOutput();
341
key.attach (this);
342
clist.add (key);
343
}
344
345
synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
346
SocketChannel ch = (SocketChannel)key.channel ();
347
Socket s = ch.socket ();
348
s.setSoLinger (true, 0);
349
ch.close();
350
}
351
}
352
353
354
/**
355
* Implements blocking reading semantics on top of a non-blocking channel
356
*/
357
358
static class NioInputStream extends InputStream {
359
SocketChannel channel;
360
Selector selector;
361
ByteBuffer chanbuf;
362
SelectionKey key;
363
int available;
364
byte[] one;
365
boolean closed;
366
ByteBuffer markBuf; /* reads may be satisifed from this buffer */
367
boolean marked;
368
boolean reset;
369
int readlimit;
370
371
public NioInputStream (SocketChannel chan) throws IOException {
372
this.channel = chan;
373
selector = Selector.open();
374
chanbuf = ByteBuffer.allocate (1024);
375
key = chan.register (selector, SelectionKey.OP_READ);
376
available = 0;
377
one = new byte[1];
378
closed = marked = reset = false;
379
}
380
381
public synchronized int read (byte[] b) throws IOException {
382
return read (b, 0, b.length);
383
}
384
385
public synchronized int read () throws IOException {
386
return read (one, 0, 1);
387
}
388
389
public synchronized int read (byte[] b, int off, int srclen) throws IOException {
390
391
int canreturn, willreturn;
392
393
if (closed)
394
return -1;
395
396
if (reset) { /* satisfy from markBuf */
397
canreturn = markBuf.remaining ();
398
willreturn = canreturn>srclen ? srclen : canreturn;
399
markBuf.get(b, off, willreturn);
400
if (canreturn == willreturn) {
401
reset = false;
402
}
403
} else { /* satisfy from channel */
404
canreturn = available();
405
if (canreturn == 0) {
406
block ();
407
canreturn = available();
408
}
409
willreturn = canreturn>srclen ? srclen : canreturn;
410
chanbuf.get(b, off, willreturn);
411
available -= willreturn;
412
413
if (marked) { /* copy into markBuf */
414
try {
415
markBuf.put (b, off, willreturn);
416
} catch (BufferOverflowException e) {
417
marked = false;
418
}
419
}
420
}
421
return willreturn;
422
}
423
424
public synchronized int available () throws IOException {
425
if (closed)
426
throw new IOException ("Stream is closed");
427
428
if (reset)
429
return markBuf.remaining();
430
431
if (available > 0)
432
return available;
433
434
chanbuf.clear ();
435
available = channel.read (chanbuf);
436
if (available > 0)
437
chanbuf.flip();
438
else if (available == -1)
439
throw new IOException ("Stream is closed");
440
return available;
441
}
442
443
/**
444
* block() only called when available==0 and buf is empty
445
*/
446
private synchronized void block () throws IOException {
447
//assert available == 0;
448
int n = selector.select ();
449
//assert n == 1;
450
selector.selectedKeys().clear();
451
available ();
452
}
453
454
public void close () throws IOException {
455
if (closed)
456
return;
457
channel.close ();
458
closed = true;
459
}
460
461
public synchronized void mark (int readlimit) {
462
if (closed)
463
return;
464
this.readlimit = readlimit;
465
markBuf = ByteBuffer.allocate (readlimit);
466
marked = true;
467
reset = false;
468
}
469
470
public synchronized void reset () throws IOException {
471
if (closed )
472
return;
473
if (!marked)
474
throw new IOException ("Stream not marked");
475
marked = false;
476
reset = true;
477
markBuf.flip ();
478
}
479
}
480
481
static class NioOutputStream extends OutputStream {
482
SocketChannel channel;
483
ByteBuffer buf;
484
SelectionKey key;
485
Selector selector;
486
boolean closed;
487
byte[] one;
488
489
public NioOutputStream (SocketChannel channel) throws IOException {
490
this.channel = channel;
491
selector = Selector.open ();
492
key = channel.register (selector, SelectionKey.OP_WRITE);
493
closed = false;
494
one = new byte [1];
495
}
496
497
public synchronized void write (int b) throws IOException {
498
one[0] = (byte)b;
499
write (one, 0, 1);
500
}
501
502
public synchronized void write (byte[] b) throws IOException {
503
write (b, 0, b.length);
504
}
505
506
public synchronized void write (byte[] b, int off, int len) throws IOException {
507
if (closed)
508
throw new IOException ("stream is closed");
509
510
buf = ByteBuffer.allocate (len);
511
buf.put (b, off, len);
512
buf.flip ();
513
int n;
514
while ((n = channel.write (buf)) < len) {
515
len -= n;
516
if (len == 0)
517
return;
518
selector.select ();
519
selector.selectedKeys().clear ();
520
}
521
}
522
523
public void close () throws IOException {
524
if (closed)
525
return;
526
channel.close ();
527
closed = true;
528
}
529
}
530
531
/*
532
* Pipeline object :-
533
* 1) Will pump every byte from its input stream to output stream
534
* 2) Is an 'active object'
535
*/
536
static class Pipeline implements Runnable {
537
InputStream in;
538
OutputStream out;
539
Thread t;
540
541
public Pipeline(InputStream is, OutputStream os) {
542
in = is;
543
out = os;
544
}
545
546
public void start() {
547
t = new Thread(this);
548
t.start();
549
}
550
551
public void join() throws InterruptedException {
552
t.join();
553
}
554
555
public void terminate() {
556
t.interrupt();
557
}
558
559
public void run() {
560
byte[] buffer = new byte[10000];
561
try {
562
while (!Thread.interrupted()) {
563
int len;
564
while ((len = in.read(buffer)) != -1) {
565
out.write(buffer, 0, len);
566
out.flush();
567
}
568
}
569
} catch(IOException e) {
570
// No-op
571
} finally {
572
}
573
}
574
}
575
576
/**
577
* Utilities for synchronization. A condition is
578
* identified by a string name, and is initialized
579
* upon first use (ie. setCondition() or waitForCondition()). Threads
580
* are blocked until some thread calls (or has called) setCondition() for the same
581
* condition.
582
* <P>
583
* A rendezvous built on a condition is also provided for synchronizing
584
* N threads.
585
*/
586
587
private static HashMap conditions = new HashMap();
588
589
/*
590
* Modifiable boolean object
591
*/
592
private static class BValue {
593
boolean v;
594
}
595
596
/*
597
* Modifiable int object
598
*/
599
private static class IValue {
600
int v;
601
IValue (int i) {
602
v =i;
603
}
604
}
605
606
607
private static BValue getCond (String condition) {
608
synchronized (conditions) {
609
BValue cond = (BValue) conditions.get (condition);
610
if (cond == null) {
611
cond = new BValue();
612
conditions.put (condition, cond);
613
}
614
return cond;
615
}
616
}
617
618
/**
619
* Set the condition to true. Any threads that are currently blocked
620
* waiting on the condition, will be unblocked and allowed to continue.
621
* Threads that subsequently call waitForCondition() will not block.
622
* If the named condition did not exist prior to the call, then it is created
623
* first.
624
*/
625
626
public static void setCondition (String condition) {
627
BValue cond = getCond (condition);
628
synchronized (cond) {
629
if (cond.v) {
630
return;
631
}
632
cond.v = true;
633
cond.notifyAll();
634
}
635
}
636
637
/**
638
* If the named condition does not exist, then it is created and initialized
639
* to false. If the condition exists or has just been created and its value
640
* is false, then the thread blocks until another thread sets the condition.
641
* If the condition exists and is already set to true, then this call returns
642
* immediately without blocking.
643
*/
644
645
public static void waitForCondition (String condition) {
646
BValue cond = getCond (condition);
647
synchronized (cond) {
648
if (!cond.v) {
649
try {
650
cond.wait();
651
} catch (InterruptedException e) {}
652
}
653
}
654
}
655
656
/* conditions must be locked when accessing this */
657
static HashMap rv = new HashMap();
658
659
/**
660
* Force N threads to rendezvous (ie. wait for each other) before proceeding.
661
* The first thread(s) to call are blocked until the last
662
* thread makes the call. Then all threads continue.
663
* <p>
664
* All threads that call with the same condition name, must use the same value
665
* for N (or the results may be not be as expected).
666
* <P>
667
* Obviously, if fewer than N threads make the rendezvous then the result
668
* will be a hang.
669
*/
670
671
public static void rendezvous (String condition, int N) {
672
BValue cond;
673
IValue iv;
674
String name = "RV_"+condition;
675
676
/* get the condition */
677
678
synchronized (conditions) {
679
cond = (BValue)conditions.get (name);
680
if (cond == null) {
681
/* we are first caller */
682
if (N < 2) {
683
throw new RuntimeException ("rendezvous must be called with N >= 2");
684
}
685
cond = new BValue ();
686
conditions.put (name, cond);
687
iv = new IValue (N-1);
688
rv.put (name, iv);
689
} else {
690
/* already initialised, just decrement the counter */
691
iv = (IValue) rv.get (name);
692
iv.v --;
693
}
694
}
695
696
if (iv.v > 0) {
697
waitForCondition (name);
698
} else {
699
setCondition (name);
700
synchronized (conditions) {
701
clearCondition (name);
702
rv.remove (name);
703
}
704
}
705
}
706
707
/**
708
* If the named condition exists and is set then remove it, so it can
709
* be re-initialized and used again. If the condition does not exist, or
710
* exists but is not set, then the call returns without doing anything.
711
* Note, some higher level synchronization
712
* may be needed between clear and the other operations.
713
*/
714
715
public static void clearCondition(String condition) {
716
BValue cond;
717
synchronized (conditions) {
718
cond = (BValue) conditions.get (condition);
719
if (cond == null) {
720
return;
721
}
722
synchronized (cond) {
723
if (cond.v) {
724
conditions.remove (condition);
725
}
726
}
727
}
728
}
729
}
730
731