Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
41171 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.net.ConnectException;29import java.net.InetSocketAddress;30import java.net.StandardSocketOptions;31import java.nio.channels.SelectableChannel;32import java.nio.channels.SelectionKey;33import java.nio.channels.SocketChannel;34import java.security.AccessController;35import java.security.PrivilegedActionException;36import java.security.PrivilegedExceptionAction;37import java.time.Duration;38import java.time.Instant;39import java.util.concurrent.CompletableFuture;40import java.util.function.Function;4142import jdk.internal.net.http.common.FlowTube;43import jdk.internal.net.http.common.Log;44import jdk.internal.net.http.common.MinimalFuture;45import jdk.internal.net.http.common.Utils;4647/**48* Plain raw TCP connection direct to destination.49* The connection operates in asynchronous non-blocking mode.50* All reads and writes are done non-blocking.51*/52class PlainHttpConnection extends HttpConnection {5354private final Object reading = new Object();55protected final SocketChannel chan;56private final SocketTube tube; // need SocketTube to call signalClosed().57private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);58private volatile boolean connected;59private boolean closed;60private volatile ConnectTimerEvent connectTimerEvent; // may be null61private volatile int unsuccessfulAttempts;6263// Indicates whether a connection attempt has succeeded or should be retried.64// If the attempt failed, and shouldn't be retried, there will be an exception65// instead.66private enum ConnectState { SUCCESS, RETRY }676869/**70* Returns a ConnectTimerEvent iff there is a connect timeout duration,71* otherwise null.72*/73private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,74CompletableFuture<?> cf) {75Duration duration = exchange.remainingConnectTimeout().orElse(null);76if (duration != null) {77ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);78return cte;79}80return null;81}8283final class ConnectTimerEvent extends TimeoutEvent {84private final CompletableFuture<?> cf;85private final Exchange<?> exchange;8687ConnectTimerEvent(Duration duration,88Exchange<?> exchange,89CompletableFuture<?> cf) {90super(duration);91this.exchange = exchange;92this.cf = cf;93}9495@Override96public void handle() {97if (debug.on()) {98debug.log("HTTP connect timed out");99}100ConnectException ce = new ConnectException("HTTP connect timed out");101exchange.multi.cancel(ce);102client().theExecutor().execute(() -> cf.completeExceptionally(ce));103}104105@Override106public String toString() {107return "ConnectTimerEvent, " + super.toString();108}109}110111final class ConnectEvent extends AsyncEvent {112private final CompletableFuture<ConnectState> cf;113private final Exchange<?> exchange;114115ConnectEvent(CompletableFuture<ConnectState> cf, Exchange<?> exchange) {116this.cf = cf;117this.exchange = exchange;118}119120@Override121public SelectableChannel channel() {122return chan;123}124125@Override126public int interestOps() {127return SelectionKey.OP_CONNECT;128}129130@Override131public void handle() {132try {133assert !connected : "Already connected";134assert !chan.isBlocking() : "Unexpected blocking channel";135if (debug.on())136debug.log("ConnectEvent: finishing connect");137boolean finished = chan.finishConnect();138if (debug.on())139debug.log("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",140finished, exchange.multi.requestCancelled(), chan.getLocalAddress());141assert finished || exchange.multi.requestCancelled() : "Expected channel to be connected";142// complete async since the event runs on the SelectorManager thread143cf.completeAsync(() -> ConnectState.SUCCESS, client().theExecutor());144} catch (Throwable e) {145if (canRetryConnect(e)) {146unsuccessfulAttempts++;147cf.completeAsync(() -> ConnectState.RETRY, client().theExecutor());148return;149}150Throwable t = Utils.toConnectException(e);151client().theExecutor().execute( () -> cf.completeExceptionally(t));152close();153}154}155156@Override157public void abort(IOException ioe) {158client().theExecutor().execute( () -> cf.completeExceptionally(ioe));159close();160}161}162163@SuppressWarnings("removal")164@Override165public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {166CompletableFuture<ConnectState> cf = new MinimalFuture<>();167try {168assert !connected : "Already connected";169assert !chan.isBlocking() : "Unexpected blocking channel";170boolean finished;171172if (connectTimerEvent == null) {173connectTimerEvent = newConnectTimer(exchange, cf);174if (connectTimerEvent != null) {175if (debug.on())176debug.log("registering connect timer: " + connectTimerEvent);177client().registerTimer(connectTimerEvent);178}179}180181PrivilegedExceptionAction<Boolean> pa =182() -> chan.connect(Utils.resolveAddress(address));183try {184finished = AccessController.doPrivileged(pa);185} catch (PrivilegedActionException e) {186throw e.getCause();187}188if (finished) {189if (debug.on()) debug.log("connect finished without blocking");190cf.complete(ConnectState.SUCCESS);191} else {192if (debug.on()) debug.log("registering connect event");193client().registerEvent(new ConnectEvent(cf, exchange));194}195cf = exchange.checkCancelled(cf, this);196} catch (Throwable throwable) {197cf.completeExceptionally(Utils.toConnectException(throwable));198try {199close();200} catch (Exception x) {201if (debug.on())202debug.log("Failed to close channel after unsuccessful connect");203}204}205return cf.handle((r,t) -> checkRetryConnect(r, t,exchange))206.thenCompose(Function.identity());207}208209/**210* On some platforms, a ConnectEvent may be raised and a ConnectionException211* may occur with the message "Connection timed out: no further information"212* before our actual connection timeout has expired. In this case, this213* method will be called with a {@code connect} state of {@code ConnectState.RETRY)}214* and we will retry once again.215* @param connect indicates whether the connection was successful or should be retried216* @param failed the failure if the connection failed217* @param exchange the exchange218* @return a completable future that will take care of retrying the connection if needed.219*/220private CompletableFuture<Void> checkRetryConnect(ConnectState connect, Throwable failed, Exchange<?> exchange) {221// first check if the connection failed222if (failed != null) return MinimalFuture.failedFuture(failed);223// then check if the connection should be retried224if (connect == ConnectState.RETRY) {225int attempts = unsuccessfulAttempts;226assert attempts <= 1;227if (debug.on())228debug.log("Retrying connect after %d attempts", attempts);229return connectAsync(exchange);230}231// Otherwise, the connection was successful;232assert connect == ConnectState.SUCCESS;233return MinimalFuture.completedFuture(null);234}235236private boolean canRetryConnect(Throwable e) {237if (!MultiExchange.RETRY_CONNECT) return false;238if (!(e instanceof ConnectException)) return false;239if (unsuccessfulAttempts > 0) return false;240ConnectTimerEvent timer = connectTimerEvent;241if (timer == null) return true;242return timer.deadline().isAfter(Instant.now());243}244245@Override246public CompletableFuture<Void> finishConnect() {247assert connected == false;248if (debug.on()) debug.log("finishConnect, setting connected=true");249connected = true;250if (connectTimerEvent != null)251client().cancelTimer(connectTimerEvent);252return MinimalFuture.completedFuture(null);253}254255@Override256SocketChannel channel() {257return chan;258}259260@Override261final FlowTube getConnectionFlow() {262return tube;263}264265PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {266super(addr, client);267try {268this.chan = SocketChannel.open();269chan.configureBlocking(false);270if (debug.on()) {271int bufsize = getSoReceiveBufferSize();272debug.log("Initial receive buffer size is: %d", bufsize);273bufsize = getSoSendBufferSize();274debug.log("Initial send buffer size is: %d", bufsize);275}276if (trySetReceiveBufferSize(client.getReceiveBufferSize())) {277if (debug.on()) {278int bufsize = getSoReceiveBufferSize();279debug.log("Receive buffer size configured: %d", bufsize);280}281}282if (trySetSendBufferSize(client.getSendBufferSize())) {283if (debug.on()) {284int bufsize = getSoSendBufferSize();285debug.log("Send buffer size configured: %d", bufsize);286}287}288chan.setOption(StandardSocketOptions.TCP_NODELAY, true);289// wrap the channel in a Tube for async reading and writing290tube = new SocketTube(client(), chan, Utils::getBuffer);291} catch (IOException e) {292throw new InternalError(e);293}294}295296private int getSoReceiveBufferSize() {297try {298return chan.getOption(StandardSocketOptions.SO_RCVBUF);299} catch (IOException x) {300if (debug.on())301debug.log("Failed to get initial receive buffer size on %s", chan);302}303return 0;304}305306private int getSoSendBufferSize() {307try {308return chan.getOption(StandardSocketOptions.SO_SNDBUF);309} catch (IOException x) {310if (debug.on())311debug.log("Failed to get initial receive buffer size on %s", chan);312}313return 0;314}315316private boolean trySetReceiveBufferSize(int bufsize) {317try {318if (bufsize > 0) {319chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);320return true;321}322} catch (IOException x) {323if (debug.on())324debug.log("Failed to set receive buffer size to %d on %s",325bufsize, chan);326}327return false;328}329330private boolean trySetSendBufferSize(int bufsize) {331try {332if (bufsize > 0) {333chan.setOption(StandardSocketOptions.SO_SNDBUF, bufsize);334return true;335}336} catch (IOException x) {337if (debug.on())338debug.log("Failed to set send buffer size to %d on %s",339bufsize, chan);340}341return false;342}343344@Override345HttpPublisher publisher() { return writePublisher; }346347348@Override349public String toString() {350return "PlainHttpConnection: " + super.toString();351}352353/**354* Closes this connection355*/356@Override357public void close() {358synchronized (this) {359if (closed) {360return;361}362closed = true;363}364try {365Log.logTrace("Closing: " + toString());366if (debug.on())367debug.log("Closing channel: " + client().debugInterestOps(chan));368if (connectTimerEvent != null)369client().cancelTimer(connectTimerEvent);370chan.close();371tube.signalClosed();372} catch (IOException e) {373Log.logTrace("Closing resulted in " + e);374}375}376377378@Override379ConnectionPool.CacheKey cacheKey() {380return new ConnectionPool.CacheKey(address, null);381}382383@Override384synchronized boolean connected() {385return connected;386}387388389@Override390boolean isSecure() {391return false;392}393394@Override395boolean isProxied() {396return false;397}398399@Override400InetSocketAddress proxy() {401return null;402}403}404405406