Path: blob/master/src/java.base/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java
41159 views
/*1* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.ByteBuffer;28import java.nio.channels.*;29import java.net.SocketOption;30import java.net.StandardSocketOptions;31import java.net.SocketAddress;32import java.net.InetSocketAddress;33import java.io.IOException;34import java.io.FileDescriptor;35import java.util.Set;36import java.util.HashSet;37import java.util.Collections;38import java.util.concurrent.*;39import java.util.concurrent.locks.*;40import sun.net.NetHooks;41import sun.net.ext.ExtendedSocketOptions;4243/**44* Base implementation of AsynchronousSocketChannel45*/4647abstract class AsynchronousSocketChannelImpl48extends AsynchronousSocketChannel49implements Cancellable, Groupable50{51protected final FileDescriptor fd;5253// protects state, localAddress, and remoteAddress54protected final Object stateLock = new Object();5556protected volatile InetSocketAddress localAddress;57protected volatile InetSocketAddress remoteAddress;5859// State, increases monotonically60static final int ST_UNINITIALIZED = -1;61static final int ST_UNCONNECTED = 0;62static final int ST_PENDING = 1;63static final int ST_CONNECTED = 2;64protected volatile int state = ST_UNINITIALIZED;6566// reading state67private final Object readLock = new Object();68private boolean reading;69private boolean readShutdown;70private boolean readKilled; // further reading disallowed due to timeout7172// writing state73private final Object writeLock = new Object();74private boolean writing;75private boolean writeShutdown;76private boolean writeKilled; // further writing disallowed due to timeout7778// close support79private final ReadWriteLock closeLock = new ReentrantReadWriteLock();80private volatile boolean closed;8182// set true when exclusive binding is on and SO_REUSEADDR is emulated83private boolean isReuseAddress;8485AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)86throws IOException87{88super(group.provider());89this.fd = Net.socket(true);90this.state = ST_UNCONNECTED;91}9293// Constructor for sockets obtained from AsynchronousServerSocketChannelImpl94AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,95FileDescriptor fd,96InetSocketAddress remote)97throws IOException98{99super(group.provider());100this.fd = fd;101this.state = ST_CONNECTED;102this.localAddress = Net.localAddress(fd);103this.remoteAddress = remote;104}105106@Override107public final boolean isOpen() {108return !closed;109}110111/**112* Marks beginning of access to file descriptor/handle113*/114final void begin() throws IOException {115closeLock.readLock().lock();116if (!isOpen())117throw new ClosedChannelException();118}119120/**121* Marks end of access to file descriptor/handle122*/123final void end() {124closeLock.readLock().unlock();125}126127/**128* Invoked to close socket and release other resources.129*/130abstract void implClose() throws IOException;131132@Override133public final void close() throws IOException {134// synchronize with any threads initiating asynchronous operations135closeLock.writeLock().lock();136try {137if (closed)138return; // already closed139closed = true;140} finally {141closeLock.writeLock().unlock();142}143implClose();144}145146final void enableReading(boolean killed) {147synchronized (readLock) {148reading = false;149if (killed)150readKilled = true;151}152}153154final void enableReading() {155enableReading(false);156}157158final void enableWriting(boolean killed) {159synchronized (writeLock) {160writing = false;161if (killed)162writeKilled = true;163}164}165166final void enableWriting() {167enableWriting(false);168}169170final void killReading() {171synchronized (readLock) {172readKilled = true;173}174}175176final void killWriting() {177synchronized (writeLock) {178writeKilled = true;179}180}181182final void killConnect() {183// when a connect is cancelled then the connection may have been184// established so prevent reading or writing.185killReading();186killWriting();187}188189/**190* Invoked by connect to initiate the connect operation.191*/192abstract <A> Future<Void> implConnect(SocketAddress remote,193A attachment,194CompletionHandler<Void,? super A> handler);195196@Override197public final Future<Void> connect(SocketAddress remote) {198return implConnect(remote, null, null);199}200201@Override202public final <A> void connect(SocketAddress remote,203A attachment,204CompletionHandler<Void,? super A> handler)205{206if (handler == null)207throw new NullPointerException("'handler' is null");208implConnect(remote, attachment, handler);209}210211/**212* Invoked by read to initiate the I/O operation.213*/214abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,215ByteBuffer dst,216ByteBuffer[] dsts,217long timeout,218TimeUnit unit,219A attachment,220CompletionHandler<V,? super A> handler);221222@SuppressWarnings("unchecked")223private <V extends Number,A> Future<V> read(boolean isScatteringRead,224ByteBuffer dst,225ByteBuffer[] dsts,226long timeout,227TimeUnit unit,228A att,229CompletionHandler<V,? super A> handler)230{231if (!isOpen()) {232Throwable e = new ClosedChannelException();233if (handler == null)234return CompletedFuture.withFailure(e);235Invoker.invoke(this, handler, att, null, e);236return null;237}238239if (remoteAddress == null)240throw new NotYetConnectedException();241242boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();243boolean shutdown = false;244245// check and update state246synchronized (readLock) {247if (readKilled)248throw new IllegalStateException("Reading not allowed due to timeout or cancellation");249if (reading)250throw new ReadPendingException();251if (readShutdown) {252shutdown = true;253} else {254if (hasSpaceToRead) {255reading = true;256}257}258}259260// immediately complete with -1 if shutdown for read261// immediately complete with 0 if no space remaining262if (shutdown || !hasSpaceToRead) {263Number result;264if (isScatteringRead) {265result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);266} else {267result = (shutdown) ? -1 : 0;268}269if (handler == null)270return CompletedFuture.withResult((V)result);271Invoker.invoke(this, handler, att, (V)result, null);272return null;273}274275return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);276}277278@Override279public final Future<Integer> read(ByteBuffer dst) {280if (dst.isReadOnly())281throw new IllegalArgumentException("Read-only buffer");282return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);283}284285@Override286public final <A> void read(ByteBuffer dst,287long timeout,288TimeUnit unit,289A attachment,290CompletionHandler<Integer,? super A> handler)291{292if (handler == null)293throw new NullPointerException("'handler' is null");294if (dst.isReadOnly())295throw new IllegalArgumentException("Read-only buffer");296read(false, dst, null, timeout, unit, attachment, handler);297}298299@Override300public final <A> void read(ByteBuffer[] dsts,301int offset,302int length,303long timeout,304TimeUnit unit,305A attachment,306CompletionHandler<Long,? super A> handler)307{308if (handler == null)309throw new NullPointerException("'handler' is null");310if ((offset < 0) || (length < 0) || (offset > dsts.length - length))311throw new IndexOutOfBoundsException();312ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);313for (int i=0; i<bufs.length; i++) {314if (bufs[i].isReadOnly())315throw new IllegalArgumentException("Read-only buffer");316}317read(true, null, bufs, timeout, unit, attachment, handler);318}319320/**321* Invoked by write to initiate the I/O operation.322*/323abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,324ByteBuffer src,325ByteBuffer[] srcs,326long timeout,327TimeUnit unit,328A attachment,329CompletionHandler<V,? super A> handler);330331@SuppressWarnings("unchecked")332private <V extends Number,A> Future<V> write(boolean isGatheringWrite,333ByteBuffer src,334ByteBuffer[] srcs,335long timeout,336TimeUnit unit,337A att,338CompletionHandler<V,? super A> handler)339{340boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();341342boolean closed = false;343if (isOpen()) {344if (remoteAddress == null)345throw new NotYetConnectedException();346// check and update state347synchronized (writeLock) {348if (writeKilled)349throw new IllegalStateException("Writing not allowed due to timeout or cancellation");350if (writing)351throw new WritePendingException();352if (writeShutdown) {353closed = true;354} else {355if (hasDataToWrite)356writing = true;357}358}359} else {360closed = true;361}362363// channel is closed or shutdown for write364if (closed) {365Throwable e = new ClosedChannelException();366if (handler == null)367return CompletedFuture.withFailure(e);368Invoker.invoke(this, handler, att, null, e);369return null;370}371372// nothing to write so complete immediately373if (!hasDataToWrite) {374Number result = (isGatheringWrite) ? (Number)0L : (Number)0;375if (handler == null)376return CompletedFuture.withResult((V)result);377Invoker.invoke(this, handler, att, (V)result, null);378return null;379}380381return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);382}383384@Override385public final Future<Integer> write(ByteBuffer src) {386return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);387}388389@Override390public final <A> void write(ByteBuffer src,391long timeout,392TimeUnit unit,393A attachment,394CompletionHandler<Integer,? super A> handler)395{396if (handler == null)397throw new NullPointerException("'handler' is null");398write(false, src, null, timeout, unit, attachment, handler);399}400401@Override402public final <A> void write(ByteBuffer[] srcs,403int offset,404int length,405long timeout,406TimeUnit unit,407A attachment,408CompletionHandler<Long,? super A> handler)409{410if (handler == null)411throw new NullPointerException("'handler' is null");412if ((offset < 0) || (length < 0) || (offset > srcs.length - length))413throw new IndexOutOfBoundsException();414srcs = Util.subsequence(srcs, offset, length);415write(true, null, srcs, timeout, unit, attachment, handler);416}417418@Override419public final AsynchronousSocketChannel bind(SocketAddress local)420throws IOException421{422try {423begin();424synchronized (stateLock) {425if (state == ST_PENDING)426throw new ConnectionPendingException();427if (localAddress != null)428throw new AlreadyBoundException();429InetSocketAddress isa = (local == null) ?430new InetSocketAddress(0) : Net.checkAddress(local);431@SuppressWarnings("removal")432SecurityManager sm = System.getSecurityManager();433if (sm != null) {434sm.checkListen(isa.getPort());435}436NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());437Net.bind(fd, isa.getAddress(), isa.getPort());438localAddress = Net.localAddress(fd);439}440} finally {441end();442}443return this;444}445446@Override447public final SocketAddress getLocalAddress() throws IOException {448if (!isOpen())449throw new ClosedChannelException();450return Net.getRevealedLocalAddress(localAddress);451}452453@Override454public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)455throws IOException456{457if (name == null)458throw new NullPointerException();459if (!supportedOptions().contains(name))460throw new UnsupportedOperationException("'" + name + "' not supported");461462try {463begin();464if (writeShutdown)465throw new IOException("Connection has been shutdown for writing");466if (name == StandardSocketOptions.SO_REUSEADDR &&467Net.useExclusiveBind())468{469// SO_REUSEADDR emulated when using exclusive bind470isReuseAddress = (Boolean)value;471} else {472Net.setSocketOption(fd, Net.UNSPEC, name, value);473}474return this;475} finally {476end();477}478}479480@Override481@SuppressWarnings("unchecked")482public final <T> T getOption(SocketOption<T> name) throws IOException {483if (name == null)484throw new NullPointerException();485if (!supportedOptions().contains(name))486throw new UnsupportedOperationException("'" + name + "' not supported");487488try {489begin();490if (name == StandardSocketOptions.SO_REUSEADDR &&491Net.useExclusiveBind())492{493// SO_REUSEADDR emulated when using exclusive bind494return (T)Boolean.valueOf(isReuseAddress);495}496return (T) Net.getSocketOption(fd, Net.UNSPEC, name);497} finally {498end();499}500}501502private static class DefaultOptionsHolder {503static final Set<SocketOption<?>> defaultOptions = defaultOptions();504505private static Set<SocketOption<?>> defaultOptions() {506HashSet<SocketOption<?>> set = new HashSet<>(5);507set.add(StandardSocketOptions.SO_SNDBUF);508set.add(StandardSocketOptions.SO_RCVBUF);509set.add(StandardSocketOptions.SO_KEEPALIVE);510set.add(StandardSocketOptions.SO_REUSEADDR);511if (Net.isReusePortAvailable()) {512set.add(StandardSocketOptions.SO_REUSEPORT);513}514set.add(StandardSocketOptions.TCP_NODELAY);515set.addAll(ExtendedSocketOptions.clientSocketOptions());516return Collections.unmodifiableSet(set);517}518}519520@Override521public final Set<SocketOption<?>> supportedOptions() {522return DefaultOptionsHolder.defaultOptions;523}524525@Override526public final SocketAddress getRemoteAddress() throws IOException {527if (!isOpen())528throw new ClosedChannelException();529return remoteAddress;530}531532@Override533public final AsynchronousSocketChannel shutdownInput() throws IOException {534try {535begin();536if (remoteAddress == null)537throw new NotYetConnectedException();538synchronized (readLock) {539if (!readShutdown) {540Net.shutdown(fd, Net.SHUT_RD);541readShutdown = true;542}543}544} finally {545end();546}547return this;548}549550@Override551public final AsynchronousSocketChannel shutdownOutput() throws IOException {552try {553begin();554if (remoteAddress == null)555throw new NotYetConnectedException();556synchronized (writeLock) {557if (!writeShutdown) {558Net.shutdown(fd, Net.SHUT_WR);559writeShutdown = true;560}561}562} finally {563end();564}565return this;566}567568@Override569public final String toString() {570StringBuilder sb = new StringBuilder();571sb.append(this.getClass().getName());572sb.append('[');573synchronized (stateLock) {574if (!isOpen()) {575sb.append("closed");576} else {577switch (state) {578case ST_UNCONNECTED:579sb.append("unconnected");580break;581case ST_PENDING:582sb.append("connection-pending");583break;584case ST_CONNECTED:585sb.append("connected");586if (readShutdown)587sb.append(" ishut");588if (writeShutdown)589sb.append(" oshut");590break;591}592if (localAddress != null) {593sb.append(" local=");594sb.append(595Net.getRevealedLocalAddressAsString(localAddress));596}597if (remoteAddress != null) {598sb.append(" remote=");599sb.append(remoteAddress.toString());600}601}602}603sb.append(']');604return sb.toString();605}606}607608609