Path: blob/master/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
41159 views
/*1* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.io.FileDescriptor;28import java.io.IOException;29import java.net.InetAddress;30import java.net.Inet4Address;31import java.net.InetSocketAddress;32import java.net.ProtocolFamily;33import java.net.Socket;34import java.net.SocketAddress;35import java.net.SocketException;36import java.net.SocketOption;37import java.net.SocketTimeoutException;38import java.net.StandardSocketOptions;39import java.nio.ByteBuffer;40import java.nio.channels.AlreadyBoundException;41import java.nio.channels.AlreadyConnectedException;42import java.nio.channels.AsynchronousCloseException;43import java.nio.channels.ClosedChannelException;44import java.nio.channels.ConnectionPendingException;45import java.nio.channels.IllegalBlockingModeException;46import java.nio.channels.NoConnectionPendingException;47import java.nio.channels.NotYetConnectedException;48import java.nio.channels.SelectionKey;49import java.nio.channels.SocketChannel;50import java.nio.channels.spi.SelectorProvider;51import java.nio.file.Path;52import java.util.Collections;53import java.util.HashSet;54import java.util.Set;55import java.util.Objects;56import java.util.concurrent.locks.ReentrantLock;57import static java.net.StandardProtocolFamily.INET;58import static java.net.StandardProtocolFamily.INET6;59import static java.net.StandardProtocolFamily.UNIX;6061import sun.net.ConnectionResetException;62import sun.net.NetHooks;63import sun.net.ext.ExtendedSocketOptions;64import sun.net.util.SocketExceptions;6566/**67* An implementation of SocketChannels68*/6970class SocketChannelImpl71extends SocketChannel72implements SelChImpl73{74// Used to make native read and write calls75private static final NativeDispatcher nd = new SocketDispatcher();7677// The protocol family of the socket78private final ProtocolFamily family;7980// Our file descriptor object81private final FileDescriptor fd;82private final int fdVal;8384// Lock held by current reading or connecting thread85private final ReentrantLock readLock = new ReentrantLock();8687// Lock held by current writing or connecting thread88private final ReentrantLock writeLock = new ReentrantLock();8990// Lock held by any thread that modifies the state fields declared below91// DO NOT invoke a blocking I/O operation while holding this lock!92private final Object stateLock = new Object();9394// Input/Output closed95private volatile boolean isInputClosed;96private volatile boolean isOutputClosed;9798// Connection reset protected by readLock99private boolean connectionReset;100101// -- The following fields are protected by stateLock102103// set true when exclusive binding is on and SO_REUSEADDR is emulated104private boolean isReuseAddress;105106// State, increases monotonically107private static final int ST_UNCONNECTED = 0;108private static final int ST_CONNECTIONPENDING = 1;109private static final int ST_CONNECTED = 2;110private static final int ST_CLOSING = 3;111private static final int ST_CLOSED = 4;112private volatile int state; // need stateLock to change113114// IDs of native threads doing reads and writes, for signalling115private long readerThread;116private long writerThread;117118// Binding119private SocketAddress localAddress;120private SocketAddress remoteAddress;121122// Socket adaptor, created on demand123private Socket socket;124125// -- End of fields protected by stateLock126127SocketChannelImpl(SelectorProvider sp) throws IOException {128this(sp, Net.isIPv6Available() ? INET6 : INET);129}130131SocketChannelImpl(SelectorProvider sp, ProtocolFamily family) throws IOException {132super(sp);133Objects.requireNonNull(family, "'family' is null");134if ((family != INET) && (family != INET6) && (family != UNIX)) {135throw new UnsupportedOperationException("Protocol family not supported");136}137if (family == INET6 && !Net.isIPv6Available()) {138throw new UnsupportedOperationException("IPv6 not available");139}140141this.family = family;142if (family == UNIX) {143this.fd = UnixDomainSockets.socket();144} else {145this.fd = Net.socket(family, true);146}147this.fdVal = IOUtil.fdVal(fd);148}149150// Constructor for sockets obtained from server sockets151//152SocketChannelImpl(SelectorProvider sp,153ProtocolFamily family,154FileDescriptor fd,155SocketAddress remoteAddress)156throws IOException157{158super(sp);159this.family = family;160this.fd = fd;161this.fdVal = IOUtil.fdVal(fd);162synchronized (stateLock) {163if (family == UNIX) {164this.localAddress = UnixDomainSockets.localAddress(fd);165} else {166this.localAddress = Net.localAddress(fd);167}168this.remoteAddress = remoteAddress;169this.state = ST_CONNECTED;170}171}172173/**174* Returns true if this channel is to a INET or INET6 socket.175*/176boolean isNetSocket() {177return (family == INET) || (family == INET6);178}179180/**181* Returns true if this channel is to a UNIX socket.182*/183boolean isUnixSocket() {184return (family == UNIX);185}186187/**188* Checks that the channel is open.189*190* @throws ClosedChannelException if channel is closed (or closing)191*/192private void ensureOpen() throws ClosedChannelException {193if (!isOpen())194throw new ClosedChannelException();195}196197/**198* Checks that the channel is open and connected.199*200* @apiNote This method uses the "state" field to check if the channel is201* open. It should never be used in conjuncion with isOpen or ensureOpen202* as these methods check AbstractInterruptibleChannel's closed field - that203* field is set before implCloseSelectableChannel is called and so before204* the state is changed.205*206* @throws ClosedChannelException if channel is closed (or closing)207* @throws NotYetConnectedException if open and not connected208*/209private void ensureOpenAndConnected() throws ClosedChannelException {210int state = this.state;211if (state < ST_CONNECTED) {212throw new NotYetConnectedException();213} else if (state > ST_CONNECTED) {214throw new ClosedChannelException();215}216}217218@Override219public Socket socket() {220synchronized (stateLock) {221if (socket == null) {222if (isNetSocket()) {223socket = SocketAdaptor.create(this);224} else {225throw new UnsupportedOperationException("Not supported");226}227}228return socket;229}230}231232@Override233public SocketAddress getLocalAddress() throws IOException {234synchronized (stateLock) {235ensureOpen();236if (isUnixSocket()) {237return UnixDomainSockets.getRevealedLocalAddress(localAddress);238} else {239return Net.getRevealedLocalAddress(localAddress);240}241}242}243244@Override245public SocketAddress getRemoteAddress() throws IOException {246synchronized (stateLock) {247ensureOpen();248return remoteAddress;249}250}251252@Override253public <T> SocketChannel setOption(SocketOption<T> name, T value)254throws IOException255{256Objects.requireNonNull(name);257if (!supportedOptions().contains(name))258throw new UnsupportedOperationException("'" + name + "' not supported");259if (!name.type().isInstance(value))260throw new IllegalArgumentException("Invalid value '" + value + "'");261262synchronized (stateLock) {263ensureOpen();264265if (isNetSocket()) {266if (name == StandardSocketOptions.IP_TOS) {267// special handling for IP_TOS268Net.setSocketOption(fd, family, name, value);269return this;270}271if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {272// SO_REUSEADDR emulated when using exclusive bind273isReuseAddress = (Boolean) value;274return this;275}276}277278// no options that require special handling279Net.setSocketOption(fd, name, value);280return this;281}282}283284@Override285@SuppressWarnings("unchecked")286public <T> T getOption(SocketOption<T> name)287throws IOException288{289Objects.requireNonNull(name);290if (!supportedOptions().contains(name))291throw new UnsupportedOperationException("'" + name + "' not supported");292293synchronized (stateLock) {294ensureOpen();295296if (isNetSocket()) {297if (name == StandardSocketOptions.IP_TOS) {298// special handling for IP_TOS299return (T) Net.getSocketOption(fd, family, name);300}301if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {302// SO_REUSEADDR emulated when using exclusive bind303return (T) Boolean.valueOf(isReuseAddress);304}305}306307// no options that require special handling308return (T) Net.getSocketOption(fd, name);309}310}311312private static class DefaultOptionsHolder {313static final Set<SocketOption<?>> defaultInetOptions = defaultInetOptions();314static final Set<SocketOption<?>> defaultUnixOptions = defaultUnixOptions();315316private static Set<SocketOption<?>> defaultInetOptions() {317HashSet<SocketOption<?>> set = new HashSet<>();318set.add(StandardSocketOptions.SO_SNDBUF);319set.add(StandardSocketOptions.SO_RCVBUF);320set.add(StandardSocketOptions.SO_KEEPALIVE);321set.add(StandardSocketOptions.SO_REUSEADDR);322if (Net.isReusePortAvailable()) {323set.add(StandardSocketOptions.SO_REUSEPORT);324}325set.add(StandardSocketOptions.SO_LINGER);326set.add(StandardSocketOptions.TCP_NODELAY);327// additional options required by socket adaptor328set.add(StandardSocketOptions.IP_TOS);329set.add(ExtendedSocketOption.SO_OOBINLINE);330set.addAll(ExtendedSocketOptions.clientSocketOptions());331return Collections.unmodifiableSet(set);332}333334private static Set<SocketOption<?>> defaultUnixOptions() {335HashSet<SocketOption<?>> set = new HashSet<>();336set.add(StandardSocketOptions.SO_SNDBUF);337set.add(StandardSocketOptions.SO_RCVBUF);338set.add(StandardSocketOptions.SO_LINGER);339set.addAll(ExtendedSocketOptions.unixDomainSocketOptions());340return Collections.unmodifiableSet(set);341}342}343344@Override345public final Set<SocketOption<?>> supportedOptions() {346if (isUnixSocket()) {347return DefaultOptionsHolder.defaultUnixOptions;348} else {349return DefaultOptionsHolder.defaultInetOptions;350}351}352353/**354* Marks the beginning of a read operation that might block.355*356* @throws ClosedChannelException if blocking and the channel is closed357*/358private void beginRead(boolean blocking) throws ClosedChannelException {359if (blocking) {360// set hook for Thread.interrupt361begin();362363synchronized (stateLock) {364ensureOpen();365// record thread so it can be signalled if needed366readerThread = NativeThread.current();367}368}369}370371/**372* Marks the end of a read operation that may have blocked.373*374* @throws AsynchronousCloseException if the channel was closed due to this375* thread being interrupted on a blocking read operation.376*/377private void endRead(boolean blocking, boolean completed)378throws AsynchronousCloseException379{380if (blocking) {381synchronized (stateLock) {382readerThread = 0;383if (state == ST_CLOSING) {384tryFinishClose();385}386}387// remove hook for Thread.interrupt388end(completed);389}390}391392private void throwConnectionReset() throws SocketException {393throw new SocketException("Connection reset");394}395396@Override397public int read(ByteBuffer buf) throws IOException {398Objects.requireNonNull(buf);399400readLock.lock();401try {402ensureOpenAndConnected();403boolean blocking = isBlocking();404int n = 0;405try {406beginRead(blocking);407408// check if connection has been reset409if (connectionReset)410throwConnectionReset();411412// check if input is shutdown413if (isInputClosed)414return IOStatus.EOF;415416n = IOUtil.read(fd, buf, -1, nd);417if (blocking) {418while (IOStatus.okayToRetry(n) && isOpen()) {419park(Net.POLLIN);420n = IOUtil.read(fd, buf, -1, nd);421}422}423} catch (ConnectionResetException e) {424connectionReset = true;425throwConnectionReset();426} finally {427endRead(blocking, n > 0);428if (n <= 0 && isInputClosed)429return IOStatus.EOF;430}431return IOStatus.normalize(n);432} finally {433readLock.unlock();434}435}436437@Override438public long read(ByteBuffer[] dsts, int offset, int length)439throws IOException440{441Objects.checkFromIndexSize(offset, length, dsts.length);442443readLock.lock();444try {445ensureOpenAndConnected();446boolean blocking = isBlocking();447long n = 0;448try {449beginRead(blocking);450451// check if connection has been reset452if (connectionReset)453throwConnectionReset();454455// check if input is shutdown456if (isInputClosed)457return IOStatus.EOF;458459n = IOUtil.read(fd, dsts, offset, length, nd);460if (blocking) {461while (IOStatus.okayToRetry(n) && isOpen()) {462park(Net.POLLIN);463n = IOUtil.read(fd, dsts, offset, length, nd);464}465}466} catch (ConnectionResetException e) {467connectionReset = true;468throwConnectionReset();469} finally {470endRead(blocking, n > 0);471if (n <= 0 && isInputClosed)472return IOStatus.EOF;473}474return IOStatus.normalize(n);475} finally {476readLock.unlock();477}478}479480/**481* Marks the beginning of a write operation that might block.482*483* @throws ClosedChannelException if blocking and the channel is closed484*/485private void beginWrite(boolean blocking) throws ClosedChannelException {486if (blocking) {487// set hook for Thread.interrupt488begin();489490synchronized (stateLock) {491ensureOpen();492if (isOutputClosed)493throw new ClosedChannelException();494// record thread so it can be signalled if needed495writerThread = NativeThread.current();496}497}498}499500/**501* Marks the end of a write operation that may have blocked.502*503* @throws AsynchronousCloseException if the channel was closed due to this504* thread being interrupted on a blocking write operation.505*/506private void endWrite(boolean blocking, boolean completed)507throws AsynchronousCloseException508{509if (blocking) {510synchronized (stateLock) {511writerThread = 0;512if (state == ST_CLOSING) {513tryFinishClose();514}515}516// remove hook for Thread.interrupt517end(completed);518}519}520521@Override522public int write(ByteBuffer buf) throws IOException {523Objects.requireNonNull(buf);524writeLock.lock();525try {526ensureOpenAndConnected();527boolean blocking = isBlocking();528int n = 0;529try {530beginWrite(blocking);531n = IOUtil.write(fd, buf, -1, nd);532if (blocking) {533while (IOStatus.okayToRetry(n) && isOpen()) {534park(Net.POLLOUT);535n = IOUtil.write(fd, buf, -1, nd);536}537}538} finally {539endWrite(blocking, n > 0);540if (n <= 0 && isOutputClosed)541throw new AsynchronousCloseException();542}543return IOStatus.normalize(n);544} finally {545writeLock.unlock();546}547}548549@Override550public long write(ByteBuffer[] srcs, int offset, int length)551throws IOException552{553Objects.checkFromIndexSize(offset, length, srcs.length);554555writeLock.lock();556try {557ensureOpenAndConnected();558boolean blocking = isBlocking();559long n = 0;560try {561beginWrite(blocking);562n = IOUtil.write(fd, srcs, offset, length, nd);563if (blocking) {564while (IOStatus.okayToRetry(n) && isOpen()) {565park(Net.POLLOUT);566n = IOUtil.write(fd, srcs, offset, length, nd);567}568}569} finally {570endWrite(blocking, n > 0);571if (n <= 0 && isOutputClosed)572throw new AsynchronousCloseException();573}574return IOStatus.normalize(n);575} finally {576writeLock.unlock();577}578}579580/**581* Writes a byte of out of band data.582*/583int sendOutOfBandData(byte b) throws IOException {584writeLock.lock();585try {586ensureOpenAndConnected();587boolean blocking = isBlocking();588int n = 0;589try {590beginWrite(blocking);591if (blocking) {592do {593n = Net.sendOOB(fd, b);594} while (n == IOStatus.INTERRUPTED && isOpen());595} else {596n = Net.sendOOB(fd, b);597}598} finally {599endWrite(blocking, n > 0);600if (n <= 0 && isOutputClosed)601throw new AsynchronousCloseException();602}603return IOStatus.normalize(n);604} finally {605writeLock.unlock();606}607}608609@Override610protected void implConfigureBlocking(boolean block) throws IOException {611readLock.lock();612try {613writeLock.lock();614try {615lockedConfigureBlocking(block);616} finally {617writeLock.unlock();618}619} finally {620readLock.unlock();621}622}623624/**625* Adjusts the blocking mode. readLock or writeLock must already be held.626*/627private void lockedConfigureBlocking(boolean block) throws IOException {628assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();629synchronized (stateLock) {630ensureOpen();631IOUtil.configureBlocking(fd, block);632}633}634635/**636* Adjusts the blocking mode if the channel is open. readLock or writeLock637* must already be held.638*639* @return {@code true} if the blocking mode was adjusted, {@code false} if640* the blocking mode was not adjusted because the channel is closed641*/642private boolean tryLockedConfigureBlocking(boolean block) throws IOException {643assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();644synchronized (stateLock) {645if (isOpen()) {646IOUtil.configureBlocking(fd, block);647return true;648} else {649return false;650}651}652}653654/**655* Returns the local address, or null if not bound656*/657SocketAddress localAddress() {658synchronized (stateLock) {659return localAddress;660}661}662663/**664* Returns the remote address, or null if not connected665*/666SocketAddress remoteAddress() {667synchronized (stateLock) {668return remoteAddress;669}670}671672@Override673public SocketChannel bind(SocketAddress local) throws IOException {674readLock.lock();675try {676writeLock.lock();677try {678synchronized (stateLock) {679ensureOpen();680if (state == ST_CONNECTIONPENDING)681throw new ConnectionPendingException();682if (localAddress != null)683throw new AlreadyBoundException();684if (isUnixSocket()) {685localAddress = unixBind(local);686} else {687localAddress = netBind(local);688}689}690} finally {691writeLock.unlock();692}693} finally {694readLock.unlock();695}696return this;697}698699private SocketAddress unixBind(SocketAddress local) throws IOException {700UnixDomainSockets.checkPermission();701if (local == null) {702return UnixDomainSockets.UNNAMED;703} else {704Path path = UnixDomainSockets.checkAddress(local).getPath();705if (path.toString().isEmpty()) {706return UnixDomainSockets.UNNAMED;707} else {708// bind to non-empty path709UnixDomainSockets.bind(fd, path);710return UnixDomainSockets.localAddress(fd);711}712}713}714715private SocketAddress netBind(SocketAddress local) throws IOException {716InetSocketAddress isa;717if (local == null) {718isa = new InetSocketAddress(Net.anyLocalAddress(family), 0);719} else {720isa = Net.checkAddress(local, family);721}722@SuppressWarnings("removal")723SecurityManager sm = System.getSecurityManager();724if (sm != null) {725sm.checkListen(isa.getPort());726}727NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());728Net.bind(family, fd, isa.getAddress(), isa.getPort());729return Net.localAddress(fd);730}731732@Override733public boolean isConnected() {734return (state == ST_CONNECTED);735}736737@Override738public boolean isConnectionPending() {739return (state == ST_CONNECTIONPENDING);740}741742/**743* Marks the beginning of a connect operation that might block.744* @param blocking true if configured blocking745* @param isa the remote address746* @throws ClosedChannelException if the channel is closed747* @throws AlreadyConnectedException if already connected748* @throws ConnectionPendingException is a connection is pending749* @throws IOException if the pre-connect hook fails750*/751private void beginConnect(boolean blocking, SocketAddress sa)752throws IOException753{754if (blocking) {755// set hook for Thread.interrupt756begin();757}758synchronized (stateLock) {759ensureOpen();760int state = this.state;761if (state == ST_CONNECTED)762throw new AlreadyConnectedException();763if (state == ST_CONNECTIONPENDING)764throw new ConnectionPendingException();765assert state == ST_UNCONNECTED;766this.state = ST_CONNECTIONPENDING;767768if (isNetSocket() && (localAddress == null)) {769InetSocketAddress isa = (InetSocketAddress) sa;770NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());771}772remoteAddress = sa;773774if (blocking) {775// record thread so it can be signalled if needed776readerThread = NativeThread.current();777}778}779}780781/**782* Marks the end of a connect operation that may have blocked.783*784* @throws AsynchronousCloseException if the channel was closed due to this785* thread being interrupted on a blocking connect operation.786* @throws IOException if completed and unable to obtain the local address787*/788private void endConnect(boolean blocking, boolean completed)789throws IOException790{791endRead(blocking, completed);792793if (completed) {794synchronized (stateLock) {795if (state == ST_CONNECTIONPENDING) {796if (isUnixSocket()) {797localAddress = UnixDomainSockets.localAddress(fd);798} else {799localAddress = Net.localAddress(fd);800}801state = ST_CONNECTED;802}803}804}805}806807/**808* Checks the remote address to which this channel is to be connected.809*/810private SocketAddress checkRemote(SocketAddress sa) {811if (isUnixSocket()) {812UnixDomainSockets.checkPermission();813return UnixDomainSockets.checkAddress(sa);814} else {815InetSocketAddress isa = Net.checkAddress(sa, family);816@SuppressWarnings("removal")817SecurityManager sm = System.getSecurityManager();818if (sm != null) {819sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());820}821InetAddress address = isa.getAddress();822if (address.isAnyLocalAddress()) {823int port = isa.getPort();824if (address instanceof Inet4Address) {825return new InetSocketAddress(Net.inet4LoopbackAddress(), port);826} else {827assert family == INET6;828return new InetSocketAddress(Net.inet6LoopbackAddress(), port);829}830} else {831return isa;832}833}834}835836@Override837public boolean connect(SocketAddress remote) throws IOException {838SocketAddress sa = checkRemote(remote);839try {840readLock.lock();841try {842writeLock.lock();843try {844boolean blocking = isBlocking();845boolean connected = false;846try {847beginConnect(blocking, sa);848int n;849if (isUnixSocket()) {850n = UnixDomainSockets.connect(fd, sa);851} else {852n = Net.connect(family, fd, sa);853}854if (n > 0) {855connected = true;856} else if (blocking) {857assert IOStatus.okayToRetry(n);858boolean polled = false;859while (!polled && isOpen()) {860park(Net.POLLOUT);861polled = Net.pollConnectNow(fd);862}863connected = polled && isOpen();864}865} finally {866endConnect(blocking, connected);867}868return connected;869} finally {870writeLock.unlock();871}872} finally {873readLock.unlock();874}875} catch (IOException ioe) {876// connect failed, close the channel877close();878throw SocketExceptions.of(ioe, sa);879}880}881882/**883* Marks the beginning of a finishConnect operation that might block.884*885* @throws ClosedChannelException if the channel is closed886* @throws NoConnectionPendingException if no connection is pending887*/888private void beginFinishConnect(boolean blocking) throws ClosedChannelException {889if (blocking) {890// set hook for Thread.interrupt891begin();892}893synchronized (stateLock) {894ensureOpen();895if (state != ST_CONNECTIONPENDING)896throw new NoConnectionPendingException();897if (blocking) {898// record thread so it can be signalled if needed899readerThread = NativeThread.current();900}901}902}903904/**905* Marks the end of a finishConnect operation that may have blocked.906*907* @throws AsynchronousCloseException if the channel was closed due to this908* thread being interrupted on a blocking connect operation.909* @throws IOException if completed and unable to obtain the local address910*/911private void endFinishConnect(boolean blocking, boolean completed)912throws IOException913{914endRead(blocking, completed);915916if (completed) {917synchronized (stateLock) {918if (state == ST_CONNECTIONPENDING) {919if (isUnixSocket()) {920localAddress = UnixDomainSockets.localAddress(fd);921} else {922localAddress = Net.localAddress(fd);923}924state = ST_CONNECTED;925}926}927}928}929930@Override931public boolean finishConnect() throws IOException {932try {933readLock.lock();934try {935writeLock.lock();936try {937// no-op if already connected938if (isConnected())939return true;940941boolean blocking = isBlocking();942boolean connected = false;943try {944beginFinishConnect(blocking);945boolean polled = Net.pollConnectNow(fd);946if (blocking) {947while (!polled && isOpen()) {948park(Net.POLLOUT);949polled = Net.pollConnectNow(fd);950}951}952connected = polled && isOpen();953} finally {954endFinishConnect(blocking, connected);955}956assert (blocking && connected) ^ !blocking;957return connected;958} finally {959writeLock.unlock();960}961} finally {962readLock.unlock();963}964} catch (IOException ioe) {965// connect failed, close the channel966close();967throw SocketExceptions.of(ioe, remoteAddress);968}969}970971/**972* Closes the socket if there are no I/O operations in progress and the973* channel is not registered with a Selector.974*/975private boolean tryClose() throws IOException {976assert Thread.holdsLock(stateLock) && state == ST_CLOSING;977if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {978state = ST_CLOSED;979nd.close(fd);980return true;981} else {982return false;983}984}985986/**987* Invokes tryClose to attempt to close the socket.988*989* This method is used for deferred closing by I/O and Selector operations.990*/991private void tryFinishClose() {992try {993tryClose();994} catch (IOException ignore) { }995}996997/**998* Closes this channel when configured in blocking mode.999*1000* If there is an I/O operation in progress then the socket is pre-closed1001* and the I/O threads signalled, in which case the final close is deferred1002* until all I/O operations complete.1003*1004* Note that a channel configured blocking may be registered with a Selector1005* This arises when a key is canceled and the channel configured to blocking1006* mode before the key is flushed from the Selector.1007*/1008private void implCloseBlockingMode() throws IOException {1009synchronized (stateLock) {1010assert state < ST_CLOSING;1011state = ST_CLOSING;1012if (!tryClose()) {1013long reader = readerThread;1014long writer = writerThread;1015if (reader != 0 || writer != 0) {1016nd.preClose(fd);1017if (reader != 0)1018NativeThread.signal(reader);1019if (writer != 0)1020NativeThread.signal(writer);1021}1022}1023}1024}10251026/**1027* Closes this channel when configured in non-blocking mode.1028*1029* If the channel is registered with a Selector then the close is deferred1030* until the channel is flushed from all Selectors.1031*1032* If the socket is connected and the channel is registered with a Selector1033* then the socket is shutdown for writing so that the peer reads EOF. In1034* addition, if SO_LINGER is set to a non-zero value then it is disabled so1035* that the deferred close does not wait.1036*/1037private void implCloseNonBlockingMode() throws IOException {1038boolean connected;1039synchronized (stateLock) {1040assert state < ST_CLOSING;1041connected = (state == ST_CONNECTED);1042state = ST_CLOSING;1043}10441045// wait for any read/write operations to complete1046readLock.lock();1047readLock.unlock();1048writeLock.lock();1049writeLock.unlock();10501051// if the socket cannot be closed because it's registered with a Selector1052// then shutdown the socket for writing.1053synchronized (stateLock) {1054if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) {1055try {1056SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;1057int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt);1058if (interval != 0) {1059if (interval > 0) {1060// disable SO_LINGER1061Net.setSocketOption(fd, Net.UNSPEC, opt, -1);1062}1063Net.shutdown(fd, Net.SHUT_WR);1064}1065} catch (IOException ignore) { }1066}1067}1068}10691070/**1071* Invoked by implCloseChannel to close the channel.1072*/1073@Override1074protected void implCloseSelectableChannel() throws IOException {1075assert !isOpen();1076if (isBlocking()) {1077implCloseBlockingMode();1078} else {1079implCloseNonBlockingMode();1080}1081}10821083@Override1084public void kill() {1085synchronized (stateLock) {1086if (state == ST_CLOSING) {1087tryFinishClose();1088}1089}1090}10911092@Override1093public SocketChannel shutdownInput() throws IOException {1094synchronized (stateLock) {1095ensureOpen();1096if (!isConnected())1097throw new NotYetConnectedException();1098if (!isInputClosed) {1099Net.shutdown(fd, Net.SHUT_RD);1100long thread = readerThread;1101if (thread != 0)1102NativeThread.signal(thread);1103isInputClosed = true;1104}1105return this;1106}1107}11081109@Override1110public SocketChannel shutdownOutput() throws IOException {1111synchronized (stateLock) {1112ensureOpen();1113if (!isConnected())1114throw new NotYetConnectedException();1115if (!isOutputClosed) {1116Net.shutdown(fd, Net.SHUT_WR);1117long thread = writerThread;1118if (thread != 0)1119NativeThread.signal(thread);1120isOutputClosed = true;1121}1122return this;1123}1124}11251126boolean isInputOpen() {1127return !isInputClosed;1128}11291130boolean isOutputOpen() {1131return !isOutputClosed;1132}11331134/**1135* Waits for a connection attempt to finish with a timeout1136* @throws SocketTimeoutException if the connect timeout elapses1137*/1138private boolean finishTimedConnect(long nanos) throws IOException {1139long startNanos = System.nanoTime();1140boolean polled = Net.pollConnectNow(fd);1141while (!polled && isOpen()) {1142long remainingNanos = nanos - (System.nanoTime() - startNanos);1143if (remainingNanos <= 0) {1144throw new SocketTimeoutException("Connect timed out");1145}1146park(Net.POLLOUT, remainingNanos);1147polled = Net.pollConnectNow(fd);1148}1149return polled && isOpen();1150}11511152/**1153* Attempts to establish a connection to the given socket address with a1154* timeout. Closes the socket if connection cannot be established.1155*1156* @apiNote This method is for use by the socket adaptor.1157*1158* @throws IllegalBlockingModeException if the channel is non-blocking1159* @throws SocketTimeoutException if the read timeout elapses1160*/1161void blockingConnect(SocketAddress remote, long nanos) throws IOException {1162SocketAddress sa = checkRemote(remote);1163try {1164readLock.lock();1165try {1166writeLock.lock();1167try {1168if (!isBlocking())1169throw new IllegalBlockingModeException();1170boolean connected = false;1171try {1172beginConnect(true, sa);1173// change socket to non-blocking1174lockedConfigureBlocking(false);1175try {1176int n;1177if (isUnixSocket()) {1178n = UnixDomainSockets.connect(fd, sa);1179} else {1180n = Net.connect(family, fd, sa);1181}1182connected = (n > 0) ? true : finishTimedConnect(nanos);1183} finally {1184// restore socket to blocking mode (if channel is open)1185tryLockedConfigureBlocking(true);1186}1187} finally {1188endConnect(true, connected);1189}1190} finally {1191writeLock.unlock();1192}1193} finally {1194readLock.unlock();1195}1196} catch (IOException ioe) {1197// connect failed, close the channel1198close();1199throw SocketExceptions.of(ioe, sa);1200}1201}12021203/**1204* Attempts to read bytes from the socket into the given byte array.1205*/1206private int tryRead(byte[] b, int off, int len) throws IOException {1207ByteBuffer dst = Util.getTemporaryDirectBuffer(len);1208assert dst.position() == 0;1209try {1210int n = nd.read(fd, ((DirectBuffer)dst).address(), len);1211if (n > 0) {1212dst.get(b, off, n);1213}1214return n;1215} finally{1216Util.offerFirstTemporaryDirectBuffer(dst);1217}1218}12191220/**1221* Reads bytes from the socket into the given byte array with a timeout.1222* @throws SocketTimeoutException if the read timeout elapses1223*/1224private int timedRead(byte[] b, int off, int len, long nanos) throws IOException {1225long startNanos = System.nanoTime();1226int n = tryRead(b, off, len);1227while (n == IOStatus.UNAVAILABLE && isOpen()) {1228long remainingNanos = nanos - (System.nanoTime() - startNanos);1229if (remainingNanos <= 0) {1230throw new SocketTimeoutException("Read timed out");1231}1232park(Net.POLLIN, remainingNanos);1233n = tryRead(b, off, len);1234}1235return n;1236}12371238/**1239* Reads bytes from the socket into the given byte array.1240*1241* @apiNote This method is for use by the socket adaptor.1242*1243* @throws IllegalBlockingModeException if the channel is non-blocking1244* @throws SocketTimeoutException if the read timeout elapses1245*/1246int blockingRead(byte[] b, int off, int len, long nanos) throws IOException {1247Objects.checkFromIndexSize(off, len, b.length);1248if (len == 0) {1249// nothing to do1250return 0;1251}12521253readLock.lock();1254try {1255ensureOpenAndConnected();12561257// check that channel is configured blocking1258if (!isBlocking())1259throw new IllegalBlockingModeException();12601261int n = 0;1262try {1263beginRead(true);12641265// check if connection has been reset1266if (connectionReset)1267throwConnectionReset();12681269// check if input is shutdown1270if (isInputClosed)1271return IOStatus.EOF;12721273if (nanos > 0) {1274// change socket to non-blocking1275lockedConfigureBlocking(false);1276try {1277n = timedRead(b, off, len, nanos);1278} finally {1279// restore socket to blocking mode (if channel is open)1280tryLockedConfigureBlocking(true);1281}1282} else {1283// read, no timeout1284n = tryRead(b, off, len);1285while (IOStatus.okayToRetry(n) && isOpen()) {1286park(Net.POLLIN);1287n = tryRead(b, off, len);1288}1289}1290} catch (ConnectionResetException e) {1291connectionReset = true;1292throwConnectionReset();1293} finally {1294endRead(true, n > 0);1295if (n <= 0 && isInputClosed)1296return IOStatus.EOF;1297}1298assert n > 0 || n == -1;1299return n;1300} finally {1301readLock.unlock();1302}1303}13041305/**1306* Attempts to write a sequence of bytes to the socket from the given1307* byte array.1308*/1309private int tryWrite(byte[] b, int off, int len) throws IOException {1310ByteBuffer src = Util.getTemporaryDirectBuffer(len);1311assert src.position() == 0;1312try {1313src.put(b, off, len);1314return nd.write(fd, ((DirectBuffer)src).address(), len);1315} finally {1316Util.offerFirstTemporaryDirectBuffer(src);1317}1318}13191320/**1321* Writes a sequence of bytes to the socket from the given byte array.1322*1323* @apiNote This method is for use by the socket adaptor.1324*/1325void blockingWriteFully(byte[] b, int off, int len) throws IOException {1326Objects.checkFromIndexSize(off, len, b.length);1327if (len == 0) {1328// nothing to do1329return;1330}13311332writeLock.lock();1333try {1334ensureOpenAndConnected();13351336// check that channel is configured blocking1337if (!isBlocking())1338throw new IllegalBlockingModeException();13391340// loop until all bytes have been written1341int pos = off;1342int end = off + len;1343try {1344beginWrite(true);1345while (pos < end && isOpen()) {1346int size = end - pos;1347int n = tryWrite(b, pos, size);1348while (IOStatus.okayToRetry(n) && isOpen()) {1349park(Net.POLLOUT);1350n = tryWrite(b, pos, size);1351}1352if (n > 0) {1353pos += n;1354}1355}1356} finally {1357endWrite(true, pos >= end);1358}1359} finally {1360writeLock.unlock();1361}1362}13631364/**1365* Return the number of bytes in the socket input buffer.1366*/1367int available() throws IOException {1368synchronized (stateLock) {1369ensureOpenAndConnected();1370if (isInputClosed) {1371return 0;1372} else {1373return Net.available(fd);1374}1375}1376}13771378/**1379* Translates native poll revent ops into a ready operation ops1380*/1381public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {1382int intOps = ski.nioInterestOps();1383int oldOps = ski.nioReadyOps();1384int newOps = initialOps;13851386if ((ops & Net.POLLNVAL) != 0) {1387// This should only happen if this channel is pre-closed while a1388// selection operation is in progress1389// ## Throw an error if this channel has not been pre-closed1390return false;1391}13921393if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {1394newOps = intOps;1395ski.nioReadyOps(newOps);1396return (newOps & ~oldOps) != 0;1397}13981399boolean connected = isConnected();1400if (((ops & Net.POLLIN) != 0) &&1401((intOps & SelectionKey.OP_READ) != 0) && connected)1402newOps |= SelectionKey.OP_READ;14031404if (((ops & Net.POLLCONN) != 0) &&1405((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())1406newOps |= SelectionKey.OP_CONNECT;14071408if (((ops & Net.POLLOUT) != 0) &&1409((intOps & SelectionKey.OP_WRITE) != 0) && connected)1410newOps |= SelectionKey.OP_WRITE;14111412ski.nioReadyOps(newOps);1413return (newOps & ~oldOps) != 0;1414}14151416public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {1417return translateReadyOps(ops, ski.nioReadyOps(), ski);1418}14191420public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {1421return translateReadyOps(ops, 0, ski);1422}14231424/**1425* Translates an interest operation set into a native poll event set1426*/1427public int translateInterestOps(int ops) {1428int newOps = 0;1429if ((ops & SelectionKey.OP_READ) != 0)1430newOps |= Net.POLLIN;1431if ((ops & SelectionKey.OP_WRITE) != 0)1432newOps |= Net.POLLOUT;1433if ((ops & SelectionKey.OP_CONNECT) != 0)1434newOps |= Net.POLLCONN;1435return newOps;1436}14371438public FileDescriptor getFD() {1439return fd;1440}14411442public int getFDVal() {1443return fdVal;1444}14451446@Override1447public String toString() {1448StringBuilder sb = new StringBuilder();1449sb.append(this.getClass().getSuperclass().getName());1450sb.append('[');1451if (!isOpen())1452sb.append("closed");1453else {1454synchronized (stateLock) {1455switch (state) {1456case ST_UNCONNECTED:1457sb.append("unconnected");1458break;1459case ST_CONNECTIONPENDING:1460sb.append("connection-pending");1461break;1462case ST_CONNECTED:1463sb.append("connected");1464if (isInputClosed)1465sb.append(" ishut");1466if (isOutputClosed)1467sb.append(" oshut");1468break;1469}1470SocketAddress addr = localAddress();1471if (addr != null) {1472sb.append(" local=");1473if (isUnixSocket()) {1474sb.append(UnixDomainSockets.getRevealedLocalAddressAsString(addr));1475} else {1476sb.append(Net.getRevealedLocalAddressAsString(addr));1477}1478}1479if (remoteAddress() != null) {1480sb.append(" remote=");1481sb.append(remoteAddress().toString());1482}1483}1484}1485sb.append(']');1486return sb.toString();1487}1488}148914901491