Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
41159 views
1
/*
2
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.io.FileDescriptor;
29
import java.io.IOException;
30
import java.net.InetAddress;
31
import java.net.Inet4Address;
32
import java.net.InetSocketAddress;
33
import java.net.ProtocolFamily;
34
import java.net.Socket;
35
import java.net.SocketAddress;
36
import java.net.SocketException;
37
import java.net.SocketOption;
38
import java.net.SocketTimeoutException;
39
import java.net.StandardSocketOptions;
40
import java.nio.ByteBuffer;
41
import java.nio.channels.AlreadyBoundException;
42
import java.nio.channels.AlreadyConnectedException;
43
import java.nio.channels.AsynchronousCloseException;
44
import java.nio.channels.ClosedChannelException;
45
import java.nio.channels.ConnectionPendingException;
46
import java.nio.channels.IllegalBlockingModeException;
47
import java.nio.channels.NoConnectionPendingException;
48
import java.nio.channels.NotYetConnectedException;
49
import java.nio.channels.SelectionKey;
50
import java.nio.channels.SocketChannel;
51
import java.nio.channels.spi.SelectorProvider;
52
import java.nio.file.Path;
53
import java.util.Collections;
54
import java.util.HashSet;
55
import java.util.Set;
56
import java.util.Objects;
57
import java.util.concurrent.locks.ReentrantLock;
58
import static java.net.StandardProtocolFamily.INET;
59
import static java.net.StandardProtocolFamily.INET6;
60
import static java.net.StandardProtocolFamily.UNIX;
61
62
import sun.net.ConnectionResetException;
63
import sun.net.NetHooks;
64
import sun.net.ext.ExtendedSocketOptions;
65
import sun.net.util.SocketExceptions;
66
67
/**
68
* An implementation of SocketChannels
69
*/
70
71
class SocketChannelImpl
72
extends SocketChannel
73
implements SelChImpl
74
{
75
// Used to make native read and write calls
76
private static final NativeDispatcher nd = new SocketDispatcher();
77
78
// The protocol family of the socket
79
private final ProtocolFamily family;
80
81
// Our file descriptor object
82
private final FileDescriptor fd;
83
private final int fdVal;
84
85
// Lock held by current reading or connecting thread
86
private final ReentrantLock readLock = new ReentrantLock();
87
88
// Lock held by current writing or connecting thread
89
private final ReentrantLock writeLock = new ReentrantLock();
90
91
// Lock held by any thread that modifies the state fields declared below
92
// DO NOT invoke a blocking I/O operation while holding this lock!
93
private final Object stateLock = new Object();
94
95
// Input/Output closed
96
private volatile boolean isInputClosed;
97
private volatile boolean isOutputClosed;
98
99
// Connection reset protected by readLock
100
private boolean connectionReset;
101
102
// -- The following fields are protected by stateLock
103
104
// set true when exclusive binding is on and SO_REUSEADDR is emulated
105
private boolean isReuseAddress;
106
107
// State, increases monotonically
108
private static final int ST_UNCONNECTED = 0;
109
private static final int ST_CONNECTIONPENDING = 1;
110
private static final int ST_CONNECTED = 2;
111
private static final int ST_CLOSING = 3;
112
private static final int ST_CLOSED = 4;
113
private volatile int state; // need stateLock to change
114
115
// IDs of native threads doing reads and writes, for signalling
116
private long readerThread;
117
private long writerThread;
118
119
// Binding
120
private SocketAddress localAddress;
121
private SocketAddress remoteAddress;
122
123
// Socket adaptor, created on demand
124
private Socket socket;
125
126
// -- End of fields protected by stateLock
127
128
SocketChannelImpl(SelectorProvider sp) throws IOException {
129
this(sp, Net.isIPv6Available() ? INET6 : INET);
130
}
131
132
SocketChannelImpl(SelectorProvider sp, ProtocolFamily family) throws IOException {
133
super(sp);
134
Objects.requireNonNull(family, "'family' is null");
135
if ((family != INET) && (family != INET6) && (family != UNIX)) {
136
throw new UnsupportedOperationException("Protocol family not supported");
137
}
138
if (family == INET6 && !Net.isIPv6Available()) {
139
throw new UnsupportedOperationException("IPv6 not available");
140
}
141
142
this.family = family;
143
if (family == UNIX) {
144
this.fd = UnixDomainSockets.socket();
145
} else {
146
this.fd = Net.socket(family, true);
147
}
148
this.fdVal = IOUtil.fdVal(fd);
149
}
150
151
// Constructor for sockets obtained from server sockets
152
//
153
SocketChannelImpl(SelectorProvider sp,
154
ProtocolFamily family,
155
FileDescriptor fd,
156
SocketAddress remoteAddress)
157
throws IOException
158
{
159
super(sp);
160
this.family = family;
161
this.fd = fd;
162
this.fdVal = IOUtil.fdVal(fd);
163
synchronized (stateLock) {
164
if (family == UNIX) {
165
this.localAddress = UnixDomainSockets.localAddress(fd);
166
} else {
167
this.localAddress = Net.localAddress(fd);
168
}
169
this.remoteAddress = remoteAddress;
170
this.state = ST_CONNECTED;
171
}
172
}
173
174
/**
175
* Returns true if this channel is to a INET or INET6 socket.
176
*/
177
boolean isNetSocket() {
178
return (family == INET) || (family == INET6);
179
}
180
181
/**
182
* Returns true if this channel is to a UNIX socket.
183
*/
184
boolean isUnixSocket() {
185
return (family == UNIX);
186
}
187
188
/**
189
* Checks that the channel is open.
190
*
191
* @throws ClosedChannelException if channel is closed (or closing)
192
*/
193
private void ensureOpen() throws ClosedChannelException {
194
if (!isOpen())
195
throw new ClosedChannelException();
196
}
197
198
/**
199
* Checks that the channel is open and connected.
200
*
201
* @apiNote This method uses the "state" field to check if the channel is
202
* open. It should never be used in conjuncion with isOpen or ensureOpen
203
* as these methods check AbstractInterruptibleChannel's closed field - that
204
* field is set before implCloseSelectableChannel is called and so before
205
* the state is changed.
206
*
207
* @throws ClosedChannelException if channel is closed (or closing)
208
* @throws NotYetConnectedException if open and not connected
209
*/
210
private void ensureOpenAndConnected() throws ClosedChannelException {
211
int state = this.state;
212
if (state < ST_CONNECTED) {
213
throw new NotYetConnectedException();
214
} else if (state > ST_CONNECTED) {
215
throw new ClosedChannelException();
216
}
217
}
218
219
@Override
220
public Socket socket() {
221
synchronized (stateLock) {
222
if (socket == null) {
223
if (isNetSocket()) {
224
socket = SocketAdaptor.create(this);
225
} else {
226
throw new UnsupportedOperationException("Not supported");
227
}
228
}
229
return socket;
230
}
231
}
232
233
@Override
234
public SocketAddress getLocalAddress() throws IOException {
235
synchronized (stateLock) {
236
ensureOpen();
237
if (isUnixSocket()) {
238
return UnixDomainSockets.getRevealedLocalAddress(localAddress);
239
} else {
240
return Net.getRevealedLocalAddress(localAddress);
241
}
242
}
243
}
244
245
@Override
246
public SocketAddress getRemoteAddress() throws IOException {
247
synchronized (stateLock) {
248
ensureOpen();
249
return remoteAddress;
250
}
251
}
252
253
@Override
254
public <T> SocketChannel setOption(SocketOption<T> name, T value)
255
throws IOException
256
{
257
Objects.requireNonNull(name);
258
if (!supportedOptions().contains(name))
259
throw new UnsupportedOperationException("'" + name + "' not supported");
260
if (!name.type().isInstance(value))
261
throw new IllegalArgumentException("Invalid value '" + value + "'");
262
263
synchronized (stateLock) {
264
ensureOpen();
265
266
if (isNetSocket()) {
267
if (name == StandardSocketOptions.IP_TOS) {
268
// special handling for IP_TOS
269
Net.setSocketOption(fd, family, name, value);
270
return this;
271
}
272
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
273
// SO_REUSEADDR emulated when using exclusive bind
274
isReuseAddress = (Boolean) value;
275
return this;
276
}
277
}
278
279
// no options that require special handling
280
Net.setSocketOption(fd, name, value);
281
return this;
282
}
283
}
284
285
@Override
286
@SuppressWarnings("unchecked")
287
public <T> T getOption(SocketOption<T> name)
288
throws IOException
289
{
290
Objects.requireNonNull(name);
291
if (!supportedOptions().contains(name))
292
throw new UnsupportedOperationException("'" + name + "' not supported");
293
294
synchronized (stateLock) {
295
ensureOpen();
296
297
if (isNetSocket()) {
298
if (name == StandardSocketOptions.IP_TOS) {
299
// special handling for IP_TOS
300
return (T) Net.getSocketOption(fd, family, name);
301
}
302
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
303
// SO_REUSEADDR emulated when using exclusive bind
304
return (T) Boolean.valueOf(isReuseAddress);
305
}
306
}
307
308
// no options that require special handling
309
return (T) Net.getSocketOption(fd, name);
310
}
311
}
312
313
private static class DefaultOptionsHolder {
314
static final Set<SocketOption<?>> defaultInetOptions = defaultInetOptions();
315
static final Set<SocketOption<?>> defaultUnixOptions = defaultUnixOptions();
316
317
private static Set<SocketOption<?>> defaultInetOptions() {
318
HashSet<SocketOption<?>> set = new HashSet<>();
319
set.add(StandardSocketOptions.SO_SNDBUF);
320
set.add(StandardSocketOptions.SO_RCVBUF);
321
set.add(StandardSocketOptions.SO_KEEPALIVE);
322
set.add(StandardSocketOptions.SO_REUSEADDR);
323
if (Net.isReusePortAvailable()) {
324
set.add(StandardSocketOptions.SO_REUSEPORT);
325
}
326
set.add(StandardSocketOptions.SO_LINGER);
327
set.add(StandardSocketOptions.TCP_NODELAY);
328
// additional options required by socket adaptor
329
set.add(StandardSocketOptions.IP_TOS);
330
set.add(ExtendedSocketOption.SO_OOBINLINE);
331
set.addAll(ExtendedSocketOptions.clientSocketOptions());
332
return Collections.unmodifiableSet(set);
333
}
334
335
private static Set<SocketOption<?>> defaultUnixOptions() {
336
HashSet<SocketOption<?>> set = new HashSet<>();
337
set.add(StandardSocketOptions.SO_SNDBUF);
338
set.add(StandardSocketOptions.SO_RCVBUF);
339
set.add(StandardSocketOptions.SO_LINGER);
340
set.addAll(ExtendedSocketOptions.unixDomainSocketOptions());
341
return Collections.unmodifiableSet(set);
342
}
343
}
344
345
@Override
346
public final Set<SocketOption<?>> supportedOptions() {
347
if (isUnixSocket()) {
348
return DefaultOptionsHolder.defaultUnixOptions;
349
} else {
350
return DefaultOptionsHolder.defaultInetOptions;
351
}
352
}
353
354
/**
355
* Marks the beginning of a read operation that might block.
356
*
357
* @throws ClosedChannelException if blocking and the channel is closed
358
*/
359
private void beginRead(boolean blocking) throws ClosedChannelException {
360
if (blocking) {
361
// set hook for Thread.interrupt
362
begin();
363
364
synchronized (stateLock) {
365
ensureOpen();
366
// record thread so it can be signalled if needed
367
readerThread = NativeThread.current();
368
}
369
}
370
}
371
372
/**
373
* Marks the end of a read operation that may have blocked.
374
*
375
* @throws AsynchronousCloseException if the channel was closed due to this
376
* thread being interrupted on a blocking read operation.
377
*/
378
private void endRead(boolean blocking, boolean completed)
379
throws AsynchronousCloseException
380
{
381
if (blocking) {
382
synchronized (stateLock) {
383
readerThread = 0;
384
if (state == ST_CLOSING) {
385
tryFinishClose();
386
}
387
}
388
// remove hook for Thread.interrupt
389
end(completed);
390
}
391
}
392
393
private void throwConnectionReset() throws SocketException {
394
throw new SocketException("Connection reset");
395
}
396
397
@Override
398
public int read(ByteBuffer buf) throws IOException {
399
Objects.requireNonNull(buf);
400
401
readLock.lock();
402
try {
403
ensureOpenAndConnected();
404
boolean blocking = isBlocking();
405
int n = 0;
406
try {
407
beginRead(blocking);
408
409
// check if connection has been reset
410
if (connectionReset)
411
throwConnectionReset();
412
413
// check if input is shutdown
414
if (isInputClosed)
415
return IOStatus.EOF;
416
417
n = IOUtil.read(fd, buf, -1, nd);
418
if (blocking) {
419
while (IOStatus.okayToRetry(n) && isOpen()) {
420
park(Net.POLLIN);
421
n = IOUtil.read(fd, buf, -1, nd);
422
}
423
}
424
} catch (ConnectionResetException e) {
425
connectionReset = true;
426
throwConnectionReset();
427
} finally {
428
endRead(blocking, n > 0);
429
if (n <= 0 && isInputClosed)
430
return IOStatus.EOF;
431
}
432
return IOStatus.normalize(n);
433
} finally {
434
readLock.unlock();
435
}
436
}
437
438
@Override
439
public long read(ByteBuffer[] dsts, int offset, int length)
440
throws IOException
441
{
442
Objects.checkFromIndexSize(offset, length, dsts.length);
443
444
readLock.lock();
445
try {
446
ensureOpenAndConnected();
447
boolean blocking = isBlocking();
448
long n = 0;
449
try {
450
beginRead(blocking);
451
452
// check if connection has been reset
453
if (connectionReset)
454
throwConnectionReset();
455
456
// check if input is shutdown
457
if (isInputClosed)
458
return IOStatus.EOF;
459
460
n = IOUtil.read(fd, dsts, offset, length, nd);
461
if (blocking) {
462
while (IOStatus.okayToRetry(n) && isOpen()) {
463
park(Net.POLLIN);
464
n = IOUtil.read(fd, dsts, offset, length, nd);
465
}
466
}
467
} catch (ConnectionResetException e) {
468
connectionReset = true;
469
throwConnectionReset();
470
} finally {
471
endRead(blocking, n > 0);
472
if (n <= 0 && isInputClosed)
473
return IOStatus.EOF;
474
}
475
return IOStatus.normalize(n);
476
} finally {
477
readLock.unlock();
478
}
479
}
480
481
/**
482
* Marks the beginning of a write operation that might block.
483
*
484
* @throws ClosedChannelException if blocking and the channel is closed
485
*/
486
private void beginWrite(boolean blocking) throws ClosedChannelException {
487
if (blocking) {
488
// set hook for Thread.interrupt
489
begin();
490
491
synchronized (stateLock) {
492
ensureOpen();
493
if (isOutputClosed)
494
throw new ClosedChannelException();
495
// record thread so it can be signalled if needed
496
writerThread = NativeThread.current();
497
}
498
}
499
}
500
501
/**
502
* Marks the end of a write operation that may have blocked.
503
*
504
* @throws AsynchronousCloseException if the channel was closed due to this
505
* thread being interrupted on a blocking write operation.
506
*/
507
private void endWrite(boolean blocking, boolean completed)
508
throws AsynchronousCloseException
509
{
510
if (blocking) {
511
synchronized (stateLock) {
512
writerThread = 0;
513
if (state == ST_CLOSING) {
514
tryFinishClose();
515
}
516
}
517
// remove hook for Thread.interrupt
518
end(completed);
519
}
520
}
521
522
@Override
523
public int write(ByteBuffer buf) throws IOException {
524
Objects.requireNonNull(buf);
525
writeLock.lock();
526
try {
527
ensureOpenAndConnected();
528
boolean blocking = isBlocking();
529
int n = 0;
530
try {
531
beginWrite(blocking);
532
n = IOUtil.write(fd, buf, -1, nd);
533
if (blocking) {
534
while (IOStatus.okayToRetry(n) && isOpen()) {
535
park(Net.POLLOUT);
536
n = IOUtil.write(fd, buf, -1, nd);
537
}
538
}
539
} finally {
540
endWrite(blocking, n > 0);
541
if (n <= 0 && isOutputClosed)
542
throw new AsynchronousCloseException();
543
}
544
return IOStatus.normalize(n);
545
} finally {
546
writeLock.unlock();
547
}
548
}
549
550
@Override
551
public long write(ByteBuffer[] srcs, int offset, int length)
552
throws IOException
553
{
554
Objects.checkFromIndexSize(offset, length, srcs.length);
555
556
writeLock.lock();
557
try {
558
ensureOpenAndConnected();
559
boolean blocking = isBlocking();
560
long n = 0;
561
try {
562
beginWrite(blocking);
563
n = IOUtil.write(fd, srcs, offset, length, nd);
564
if (blocking) {
565
while (IOStatus.okayToRetry(n) && isOpen()) {
566
park(Net.POLLOUT);
567
n = IOUtil.write(fd, srcs, offset, length, nd);
568
}
569
}
570
} finally {
571
endWrite(blocking, n > 0);
572
if (n <= 0 && isOutputClosed)
573
throw new AsynchronousCloseException();
574
}
575
return IOStatus.normalize(n);
576
} finally {
577
writeLock.unlock();
578
}
579
}
580
581
/**
582
* Writes a byte of out of band data.
583
*/
584
int sendOutOfBandData(byte b) throws IOException {
585
writeLock.lock();
586
try {
587
ensureOpenAndConnected();
588
boolean blocking = isBlocking();
589
int n = 0;
590
try {
591
beginWrite(blocking);
592
if (blocking) {
593
do {
594
n = Net.sendOOB(fd, b);
595
} while (n == IOStatus.INTERRUPTED && isOpen());
596
} else {
597
n = Net.sendOOB(fd, b);
598
}
599
} finally {
600
endWrite(blocking, n > 0);
601
if (n <= 0 && isOutputClosed)
602
throw new AsynchronousCloseException();
603
}
604
return IOStatus.normalize(n);
605
} finally {
606
writeLock.unlock();
607
}
608
}
609
610
@Override
611
protected void implConfigureBlocking(boolean block) throws IOException {
612
readLock.lock();
613
try {
614
writeLock.lock();
615
try {
616
lockedConfigureBlocking(block);
617
} finally {
618
writeLock.unlock();
619
}
620
} finally {
621
readLock.unlock();
622
}
623
}
624
625
/**
626
* Adjusts the blocking mode. readLock or writeLock must already be held.
627
*/
628
private void lockedConfigureBlocking(boolean block) throws IOException {
629
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
630
synchronized (stateLock) {
631
ensureOpen();
632
IOUtil.configureBlocking(fd, block);
633
}
634
}
635
636
/**
637
* Adjusts the blocking mode if the channel is open. readLock or writeLock
638
* must already be held.
639
*
640
* @return {@code true} if the blocking mode was adjusted, {@code false} if
641
* the blocking mode was not adjusted because the channel is closed
642
*/
643
private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
644
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
645
synchronized (stateLock) {
646
if (isOpen()) {
647
IOUtil.configureBlocking(fd, block);
648
return true;
649
} else {
650
return false;
651
}
652
}
653
}
654
655
/**
656
* Returns the local address, or null if not bound
657
*/
658
SocketAddress localAddress() {
659
synchronized (stateLock) {
660
return localAddress;
661
}
662
}
663
664
/**
665
* Returns the remote address, or null if not connected
666
*/
667
SocketAddress remoteAddress() {
668
synchronized (stateLock) {
669
return remoteAddress;
670
}
671
}
672
673
@Override
674
public SocketChannel bind(SocketAddress local) throws IOException {
675
readLock.lock();
676
try {
677
writeLock.lock();
678
try {
679
synchronized (stateLock) {
680
ensureOpen();
681
if (state == ST_CONNECTIONPENDING)
682
throw new ConnectionPendingException();
683
if (localAddress != null)
684
throw new AlreadyBoundException();
685
if (isUnixSocket()) {
686
localAddress = unixBind(local);
687
} else {
688
localAddress = netBind(local);
689
}
690
}
691
} finally {
692
writeLock.unlock();
693
}
694
} finally {
695
readLock.unlock();
696
}
697
return this;
698
}
699
700
private SocketAddress unixBind(SocketAddress local) throws IOException {
701
UnixDomainSockets.checkPermission();
702
if (local == null) {
703
return UnixDomainSockets.UNNAMED;
704
} else {
705
Path path = UnixDomainSockets.checkAddress(local).getPath();
706
if (path.toString().isEmpty()) {
707
return UnixDomainSockets.UNNAMED;
708
} else {
709
// bind to non-empty path
710
UnixDomainSockets.bind(fd, path);
711
return UnixDomainSockets.localAddress(fd);
712
}
713
}
714
}
715
716
private SocketAddress netBind(SocketAddress local) throws IOException {
717
InetSocketAddress isa;
718
if (local == null) {
719
isa = new InetSocketAddress(Net.anyLocalAddress(family), 0);
720
} else {
721
isa = Net.checkAddress(local, family);
722
}
723
@SuppressWarnings("removal")
724
SecurityManager sm = System.getSecurityManager();
725
if (sm != null) {
726
sm.checkListen(isa.getPort());
727
}
728
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
729
Net.bind(family, fd, isa.getAddress(), isa.getPort());
730
return Net.localAddress(fd);
731
}
732
733
@Override
734
public boolean isConnected() {
735
return (state == ST_CONNECTED);
736
}
737
738
@Override
739
public boolean isConnectionPending() {
740
return (state == ST_CONNECTIONPENDING);
741
}
742
743
/**
744
* Marks the beginning of a connect operation that might block.
745
* @param blocking true if configured blocking
746
* @param isa the remote address
747
* @throws ClosedChannelException if the channel is closed
748
* @throws AlreadyConnectedException if already connected
749
* @throws ConnectionPendingException is a connection is pending
750
* @throws IOException if the pre-connect hook fails
751
*/
752
private void beginConnect(boolean blocking, SocketAddress sa)
753
throws IOException
754
{
755
if (blocking) {
756
// set hook for Thread.interrupt
757
begin();
758
}
759
synchronized (stateLock) {
760
ensureOpen();
761
int state = this.state;
762
if (state == ST_CONNECTED)
763
throw new AlreadyConnectedException();
764
if (state == ST_CONNECTIONPENDING)
765
throw new ConnectionPendingException();
766
assert state == ST_UNCONNECTED;
767
this.state = ST_CONNECTIONPENDING;
768
769
if (isNetSocket() && (localAddress == null)) {
770
InetSocketAddress isa = (InetSocketAddress) sa;
771
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
772
}
773
remoteAddress = sa;
774
775
if (blocking) {
776
// record thread so it can be signalled if needed
777
readerThread = NativeThread.current();
778
}
779
}
780
}
781
782
/**
783
* Marks the end of a connect operation that may have blocked.
784
*
785
* @throws AsynchronousCloseException if the channel was closed due to this
786
* thread being interrupted on a blocking connect operation.
787
* @throws IOException if completed and unable to obtain the local address
788
*/
789
private void endConnect(boolean blocking, boolean completed)
790
throws IOException
791
{
792
endRead(blocking, completed);
793
794
if (completed) {
795
synchronized (stateLock) {
796
if (state == ST_CONNECTIONPENDING) {
797
if (isUnixSocket()) {
798
localAddress = UnixDomainSockets.localAddress(fd);
799
} else {
800
localAddress = Net.localAddress(fd);
801
}
802
state = ST_CONNECTED;
803
}
804
}
805
}
806
}
807
808
/**
809
* Checks the remote address to which this channel is to be connected.
810
*/
811
private SocketAddress checkRemote(SocketAddress sa) {
812
if (isUnixSocket()) {
813
UnixDomainSockets.checkPermission();
814
return UnixDomainSockets.checkAddress(sa);
815
} else {
816
InetSocketAddress isa = Net.checkAddress(sa, family);
817
@SuppressWarnings("removal")
818
SecurityManager sm = System.getSecurityManager();
819
if (sm != null) {
820
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
821
}
822
InetAddress address = isa.getAddress();
823
if (address.isAnyLocalAddress()) {
824
int port = isa.getPort();
825
if (address instanceof Inet4Address) {
826
return new InetSocketAddress(Net.inet4LoopbackAddress(), port);
827
} else {
828
assert family == INET6;
829
return new InetSocketAddress(Net.inet6LoopbackAddress(), port);
830
}
831
} else {
832
return isa;
833
}
834
}
835
}
836
837
@Override
838
public boolean connect(SocketAddress remote) throws IOException {
839
SocketAddress sa = checkRemote(remote);
840
try {
841
readLock.lock();
842
try {
843
writeLock.lock();
844
try {
845
boolean blocking = isBlocking();
846
boolean connected = false;
847
try {
848
beginConnect(blocking, sa);
849
int n;
850
if (isUnixSocket()) {
851
n = UnixDomainSockets.connect(fd, sa);
852
} else {
853
n = Net.connect(family, fd, sa);
854
}
855
if (n > 0) {
856
connected = true;
857
} else if (blocking) {
858
assert IOStatus.okayToRetry(n);
859
boolean polled = false;
860
while (!polled && isOpen()) {
861
park(Net.POLLOUT);
862
polled = Net.pollConnectNow(fd);
863
}
864
connected = polled && isOpen();
865
}
866
} finally {
867
endConnect(blocking, connected);
868
}
869
return connected;
870
} finally {
871
writeLock.unlock();
872
}
873
} finally {
874
readLock.unlock();
875
}
876
} catch (IOException ioe) {
877
// connect failed, close the channel
878
close();
879
throw SocketExceptions.of(ioe, sa);
880
}
881
}
882
883
/**
884
* Marks the beginning of a finishConnect operation that might block.
885
*
886
* @throws ClosedChannelException if the channel is closed
887
* @throws NoConnectionPendingException if no connection is pending
888
*/
889
private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
890
if (blocking) {
891
// set hook for Thread.interrupt
892
begin();
893
}
894
synchronized (stateLock) {
895
ensureOpen();
896
if (state != ST_CONNECTIONPENDING)
897
throw new NoConnectionPendingException();
898
if (blocking) {
899
// record thread so it can be signalled if needed
900
readerThread = NativeThread.current();
901
}
902
}
903
}
904
905
/**
906
* Marks the end of a finishConnect operation that may have blocked.
907
*
908
* @throws AsynchronousCloseException if the channel was closed due to this
909
* thread being interrupted on a blocking connect operation.
910
* @throws IOException if completed and unable to obtain the local address
911
*/
912
private void endFinishConnect(boolean blocking, boolean completed)
913
throws IOException
914
{
915
endRead(blocking, completed);
916
917
if (completed) {
918
synchronized (stateLock) {
919
if (state == ST_CONNECTIONPENDING) {
920
if (isUnixSocket()) {
921
localAddress = UnixDomainSockets.localAddress(fd);
922
} else {
923
localAddress = Net.localAddress(fd);
924
}
925
state = ST_CONNECTED;
926
}
927
}
928
}
929
}
930
931
@Override
932
public boolean finishConnect() throws IOException {
933
try {
934
readLock.lock();
935
try {
936
writeLock.lock();
937
try {
938
// no-op if already connected
939
if (isConnected())
940
return true;
941
942
boolean blocking = isBlocking();
943
boolean connected = false;
944
try {
945
beginFinishConnect(blocking);
946
boolean polled = Net.pollConnectNow(fd);
947
if (blocking) {
948
while (!polled && isOpen()) {
949
park(Net.POLLOUT);
950
polled = Net.pollConnectNow(fd);
951
}
952
}
953
connected = polled && isOpen();
954
} finally {
955
endFinishConnect(blocking, connected);
956
}
957
assert (blocking && connected) ^ !blocking;
958
return connected;
959
} finally {
960
writeLock.unlock();
961
}
962
} finally {
963
readLock.unlock();
964
}
965
} catch (IOException ioe) {
966
// connect failed, close the channel
967
close();
968
throw SocketExceptions.of(ioe, remoteAddress);
969
}
970
}
971
972
/**
973
* Closes the socket if there are no I/O operations in progress and the
974
* channel is not registered with a Selector.
975
*/
976
private boolean tryClose() throws IOException {
977
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
978
if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
979
state = ST_CLOSED;
980
nd.close(fd);
981
return true;
982
} else {
983
return false;
984
}
985
}
986
987
/**
988
* Invokes tryClose to attempt to close the socket.
989
*
990
* This method is used for deferred closing by I/O and Selector operations.
991
*/
992
private void tryFinishClose() {
993
try {
994
tryClose();
995
} catch (IOException ignore) { }
996
}
997
998
/**
999
* Closes this channel when configured in blocking mode.
1000
*
1001
* If there is an I/O operation in progress then the socket is pre-closed
1002
* and the I/O threads signalled, in which case the final close is deferred
1003
* until all I/O operations complete.
1004
*
1005
* Note that a channel configured blocking may be registered with a Selector
1006
* This arises when a key is canceled and the channel configured to blocking
1007
* mode before the key is flushed from the Selector.
1008
*/
1009
private void implCloseBlockingMode() throws IOException {
1010
synchronized (stateLock) {
1011
assert state < ST_CLOSING;
1012
state = ST_CLOSING;
1013
if (!tryClose()) {
1014
long reader = readerThread;
1015
long writer = writerThread;
1016
if (reader != 0 || writer != 0) {
1017
nd.preClose(fd);
1018
if (reader != 0)
1019
NativeThread.signal(reader);
1020
if (writer != 0)
1021
NativeThread.signal(writer);
1022
}
1023
}
1024
}
1025
}
1026
1027
/**
1028
* Closes this channel when configured in non-blocking mode.
1029
*
1030
* If the channel is registered with a Selector then the close is deferred
1031
* until the channel is flushed from all Selectors.
1032
*
1033
* If the socket is connected and the channel is registered with a Selector
1034
* then the socket is shutdown for writing so that the peer reads EOF. In
1035
* addition, if SO_LINGER is set to a non-zero value then it is disabled so
1036
* that the deferred close does not wait.
1037
*/
1038
private void implCloseNonBlockingMode() throws IOException {
1039
boolean connected;
1040
synchronized (stateLock) {
1041
assert state < ST_CLOSING;
1042
connected = (state == ST_CONNECTED);
1043
state = ST_CLOSING;
1044
}
1045
1046
// wait for any read/write operations to complete
1047
readLock.lock();
1048
readLock.unlock();
1049
writeLock.lock();
1050
writeLock.unlock();
1051
1052
// if the socket cannot be closed because it's registered with a Selector
1053
// then shutdown the socket for writing.
1054
synchronized (stateLock) {
1055
if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) {
1056
try {
1057
SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;
1058
int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt);
1059
if (interval != 0) {
1060
if (interval > 0) {
1061
// disable SO_LINGER
1062
Net.setSocketOption(fd, Net.UNSPEC, opt, -1);
1063
}
1064
Net.shutdown(fd, Net.SHUT_WR);
1065
}
1066
} catch (IOException ignore) { }
1067
}
1068
}
1069
}
1070
1071
/**
1072
* Invoked by implCloseChannel to close the channel.
1073
*/
1074
@Override
1075
protected void implCloseSelectableChannel() throws IOException {
1076
assert !isOpen();
1077
if (isBlocking()) {
1078
implCloseBlockingMode();
1079
} else {
1080
implCloseNonBlockingMode();
1081
}
1082
}
1083
1084
@Override
1085
public void kill() {
1086
synchronized (stateLock) {
1087
if (state == ST_CLOSING) {
1088
tryFinishClose();
1089
}
1090
}
1091
}
1092
1093
@Override
1094
public SocketChannel shutdownInput() throws IOException {
1095
synchronized (stateLock) {
1096
ensureOpen();
1097
if (!isConnected())
1098
throw new NotYetConnectedException();
1099
if (!isInputClosed) {
1100
Net.shutdown(fd, Net.SHUT_RD);
1101
long thread = readerThread;
1102
if (thread != 0)
1103
NativeThread.signal(thread);
1104
isInputClosed = true;
1105
}
1106
return this;
1107
}
1108
}
1109
1110
@Override
1111
public SocketChannel shutdownOutput() throws IOException {
1112
synchronized (stateLock) {
1113
ensureOpen();
1114
if (!isConnected())
1115
throw new NotYetConnectedException();
1116
if (!isOutputClosed) {
1117
Net.shutdown(fd, Net.SHUT_WR);
1118
long thread = writerThread;
1119
if (thread != 0)
1120
NativeThread.signal(thread);
1121
isOutputClosed = true;
1122
}
1123
return this;
1124
}
1125
}
1126
1127
boolean isInputOpen() {
1128
return !isInputClosed;
1129
}
1130
1131
boolean isOutputOpen() {
1132
return !isOutputClosed;
1133
}
1134
1135
/**
1136
* Waits for a connection attempt to finish with a timeout
1137
* @throws SocketTimeoutException if the connect timeout elapses
1138
*/
1139
private boolean finishTimedConnect(long nanos) throws IOException {
1140
long startNanos = System.nanoTime();
1141
boolean polled = Net.pollConnectNow(fd);
1142
while (!polled && isOpen()) {
1143
long remainingNanos = nanos - (System.nanoTime() - startNanos);
1144
if (remainingNanos <= 0) {
1145
throw new SocketTimeoutException("Connect timed out");
1146
}
1147
park(Net.POLLOUT, remainingNanos);
1148
polled = Net.pollConnectNow(fd);
1149
}
1150
return polled && isOpen();
1151
}
1152
1153
/**
1154
* Attempts to establish a connection to the given socket address with a
1155
* timeout. Closes the socket if connection cannot be established.
1156
*
1157
* @apiNote This method is for use by the socket adaptor.
1158
*
1159
* @throws IllegalBlockingModeException if the channel is non-blocking
1160
* @throws SocketTimeoutException if the read timeout elapses
1161
*/
1162
void blockingConnect(SocketAddress remote, long nanos) throws IOException {
1163
SocketAddress sa = checkRemote(remote);
1164
try {
1165
readLock.lock();
1166
try {
1167
writeLock.lock();
1168
try {
1169
if (!isBlocking())
1170
throw new IllegalBlockingModeException();
1171
boolean connected = false;
1172
try {
1173
beginConnect(true, sa);
1174
// change socket to non-blocking
1175
lockedConfigureBlocking(false);
1176
try {
1177
int n;
1178
if (isUnixSocket()) {
1179
n = UnixDomainSockets.connect(fd, sa);
1180
} else {
1181
n = Net.connect(family, fd, sa);
1182
}
1183
connected = (n > 0) ? true : finishTimedConnect(nanos);
1184
} finally {
1185
// restore socket to blocking mode (if channel is open)
1186
tryLockedConfigureBlocking(true);
1187
}
1188
} finally {
1189
endConnect(true, connected);
1190
}
1191
} finally {
1192
writeLock.unlock();
1193
}
1194
} finally {
1195
readLock.unlock();
1196
}
1197
} catch (IOException ioe) {
1198
// connect failed, close the channel
1199
close();
1200
throw SocketExceptions.of(ioe, sa);
1201
}
1202
}
1203
1204
/**
1205
* Attempts to read bytes from the socket into the given byte array.
1206
*/
1207
private int tryRead(byte[] b, int off, int len) throws IOException {
1208
ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
1209
assert dst.position() == 0;
1210
try {
1211
int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
1212
if (n > 0) {
1213
dst.get(b, off, n);
1214
}
1215
return n;
1216
} finally{
1217
Util.offerFirstTemporaryDirectBuffer(dst);
1218
}
1219
}
1220
1221
/**
1222
* Reads bytes from the socket into the given byte array with a timeout.
1223
* @throws SocketTimeoutException if the read timeout elapses
1224
*/
1225
private int timedRead(byte[] b, int off, int len, long nanos) throws IOException {
1226
long startNanos = System.nanoTime();
1227
int n = tryRead(b, off, len);
1228
while (n == IOStatus.UNAVAILABLE && isOpen()) {
1229
long remainingNanos = nanos - (System.nanoTime() - startNanos);
1230
if (remainingNanos <= 0) {
1231
throw new SocketTimeoutException("Read timed out");
1232
}
1233
park(Net.POLLIN, remainingNanos);
1234
n = tryRead(b, off, len);
1235
}
1236
return n;
1237
}
1238
1239
/**
1240
* Reads bytes from the socket into the given byte array.
1241
*
1242
* @apiNote This method is for use by the socket adaptor.
1243
*
1244
* @throws IllegalBlockingModeException if the channel is non-blocking
1245
* @throws SocketTimeoutException if the read timeout elapses
1246
*/
1247
int blockingRead(byte[] b, int off, int len, long nanos) throws IOException {
1248
Objects.checkFromIndexSize(off, len, b.length);
1249
if (len == 0) {
1250
// nothing to do
1251
return 0;
1252
}
1253
1254
readLock.lock();
1255
try {
1256
ensureOpenAndConnected();
1257
1258
// check that channel is configured blocking
1259
if (!isBlocking())
1260
throw new IllegalBlockingModeException();
1261
1262
int n = 0;
1263
try {
1264
beginRead(true);
1265
1266
// check if connection has been reset
1267
if (connectionReset)
1268
throwConnectionReset();
1269
1270
// check if input is shutdown
1271
if (isInputClosed)
1272
return IOStatus.EOF;
1273
1274
if (nanos > 0) {
1275
// change socket to non-blocking
1276
lockedConfigureBlocking(false);
1277
try {
1278
n = timedRead(b, off, len, nanos);
1279
} finally {
1280
// restore socket to blocking mode (if channel is open)
1281
tryLockedConfigureBlocking(true);
1282
}
1283
} else {
1284
// read, no timeout
1285
n = tryRead(b, off, len);
1286
while (IOStatus.okayToRetry(n) && isOpen()) {
1287
park(Net.POLLIN);
1288
n = tryRead(b, off, len);
1289
}
1290
}
1291
} catch (ConnectionResetException e) {
1292
connectionReset = true;
1293
throwConnectionReset();
1294
} finally {
1295
endRead(true, n > 0);
1296
if (n <= 0 && isInputClosed)
1297
return IOStatus.EOF;
1298
}
1299
assert n > 0 || n == -1;
1300
return n;
1301
} finally {
1302
readLock.unlock();
1303
}
1304
}
1305
1306
/**
1307
* Attempts to write a sequence of bytes to the socket from the given
1308
* byte array.
1309
*/
1310
private int tryWrite(byte[] b, int off, int len) throws IOException {
1311
ByteBuffer src = Util.getTemporaryDirectBuffer(len);
1312
assert src.position() == 0;
1313
try {
1314
src.put(b, off, len);
1315
return nd.write(fd, ((DirectBuffer)src).address(), len);
1316
} finally {
1317
Util.offerFirstTemporaryDirectBuffer(src);
1318
}
1319
}
1320
1321
/**
1322
* Writes a sequence of bytes to the socket from the given byte array.
1323
*
1324
* @apiNote This method is for use by the socket adaptor.
1325
*/
1326
void blockingWriteFully(byte[] b, int off, int len) throws IOException {
1327
Objects.checkFromIndexSize(off, len, b.length);
1328
if (len == 0) {
1329
// nothing to do
1330
return;
1331
}
1332
1333
writeLock.lock();
1334
try {
1335
ensureOpenAndConnected();
1336
1337
// check that channel is configured blocking
1338
if (!isBlocking())
1339
throw new IllegalBlockingModeException();
1340
1341
// loop until all bytes have been written
1342
int pos = off;
1343
int end = off + len;
1344
try {
1345
beginWrite(true);
1346
while (pos < end && isOpen()) {
1347
int size = end - pos;
1348
int n = tryWrite(b, pos, size);
1349
while (IOStatus.okayToRetry(n) && isOpen()) {
1350
park(Net.POLLOUT);
1351
n = tryWrite(b, pos, size);
1352
}
1353
if (n > 0) {
1354
pos += n;
1355
}
1356
}
1357
} finally {
1358
endWrite(true, pos >= end);
1359
}
1360
} finally {
1361
writeLock.unlock();
1362
}
1363
}
1364
1365
/**
1366
* Return the number of bytes in the socket input buffer.
1367
*/
1368
int available() throws IOException {
1369
synchronized (stateLock) {
1370
ensureOpenAndConnected();
1371
if (isInputClosed) {
1372
return 0;
1373
} else {
1374
return Net.available(fd);
1375
}
1376
}
1377
}
1378
1379
/**
1380
* Translates native poll revent ops into a ready operation ops
1381
*/
1382
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
1383
int intOps = ski.nioInterestOps();
1384
int oldOps = ski.nioReadyOps();
1385
int newOps = initialOps;
1386
1387
if ((ops & Net.POLLNVAL) != 0) {
1388
// This should only happen if this channel is pre-closed while a
1389
// selection operation is in progress
1390
// ## Throw an error if this channel has not been pre-closed
1391
return false;
1392
}
1393
1394
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
1395
newOps = intOps;
1396
ski.nioReadyOps(newOps);
1397
return (newOps & ~oldOps) != 0;
1398
}
1399
1400
boolean connected = isConnected();
1401
if (((ops & Net.POLLIN) != 0) &&
1402
((intOps & SelectionKey.OP_READ) != 0) && connected)
1403
newOps |= SelectionKey.OP_READ;
1404
1405
if (((ops & Net.POLLCONN) != 0) &&
1406
((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
1407
newOps |= SelectionKey.OP_CONNECT;
1408
1409
if (((ops & Net.POLLOUT) != 0) &&
1410
((intOps & SelectionKey.OP_WRITE) != 0) && connected)
1411
newOps |= SelectionKey.OP_WRITE;
1412
1413
ski.nioReadyOps(newOps);
1414
return (newOps & ~oldOps) != 0;
1415
}
1416
1417
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
1418
return translateReadyOps(ops, ski.nioReadyOps(), ski);
1419
}
1420
1421
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
1422
return translateReadyOps(ops, 0, ski);
1423
}
1424
1425
/**
1426
* Translates an interest operation set into a native poll event set
1427
*/
1428
public int translateInterestOps(int ops) {
1429
int newOps = 0;
1430
if ((ops & SelectionKey.OP_READ) != 0)
1431
newOps |= Net.POLLIN;
1432
if ((ops & SelectionKey.OP_WRITE) != 0)
1433
newOps |= Net.POLLOUT;
1434
if ((ops & SelectionKey.OP_CONNECT) != 0)
1435
newOps |= Net.POLLCONN;
1436
return newOps;
1437
}
1438
1439
public FileDescriptor getFD() {
1440
return fd;
1441
}
1442
1443
public int getFDVal() {
1444
return fdVal;
1445
}
1446
1447
@Override
1448
public String toString() {
1449
StringBuilder sb = new StringBuilder();
1450
sb.append(this.getClass().getSuperclass().getName());
1451
sb.append('[');
1452
if (!isOpen())
1453
sb.append("closed");
1454
else {
1455
synchronized (stateLock) {
1456
switch (state) {
1457
case ST_UNCONNECTED:
1458
sb.append("unconnected");
1459
break;
1460
case ST_CONNECTIONPENDING:
1461
sb.append("connection-pending");
1462
break;
1463
case ST_CONNECTED:
1464
sb.append("connected");
1465
if (isInputClosed)
1466
sb.append(" ishut");
1467
if (isOutputClosed)
1468
sb.append(" oshut");
1469
break;
1470
}
1471
SocketAddress addr = localAddress();
1472
if (addr != null) {
1473
sb.append(" local=");
1474
if (isUnixSocket()) {
1475
sb.append(UnixDomainSockets.getRevealedLocalAddressAsString(addr));
1476
} else {
1477
sb.append(Net.getRevealedLocalAddressAsString(addr));
1478
}
1479
}
1480
if (remoteAddress() != null) {
1481
sb.append(" remote=");
1482
sb.append(remoteAddress().toString());
1483
}
1484
}
1485
}
1486
sb.append(']');
1487
return sb.toString();
1488
}
1489
}
1490
1491