Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java
41171 views
/*1* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.lang.System.Logger.Level;29import java.net.InetSocketAddress;30import java.nio.ByteBuffer;31import java.time.Instant;32import java.time.temporal.ChronoUnit;33import java.util.ArrayList;34import java.util.Collections;35import java.util.HashMap;36import java.util.Iterator;37import java.util.LinkedList;38import java.util.List;39import java.util.ListIterator;40import java.util.Objects;41import java.util.Optional;42import java.util.concurrent.Flow;43import java.util.stream.Collectors;44import jdk.internal.net.http.common.FlowTube;45import jdk.internal.net.http.common.Logger;46import jdk.internal.net.http.common.Utils;4748/**49* Http 1.1 connection pool.50*/51final class ConnectionPool {5253static final long KEEP_ALIVE = Utils.getIntegerNetProperty(54"jdk.httpclient.keepalive.timeout", 1200); // seconds55static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(56"jdk.httpclient.connectionPoolSize", 0); // unbounded57final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);5859// Pools of idle connections6061private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;62private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;63private final ExpiryList expiryList;64private final String dbgTag; // used for debug65boolean stopped;6667/**68* Entries in connection pool are keyed by destination address and/or69* proxy address:70* case 1: plain TCP not via proxy (destination only)71* case 2: plain TCP via proxy (proxy only)72* case 3: SSL not via proxy (destination only)73* case 4: SSL over tunnel (destination and proxy)74*/75static class CacheKey {76final InetSocketAddress proxy;77final InetSocketAddress destination;7879CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {80this.proxy = proxy;81this.destination = destination;82}8384@Override85public boolean equals(Object obj) {86if (obj == null) {87return false;88}89if (getClass() != obj.getClass()) {90return false;91}92final CacheKey other = (CacheKey) obj;93if (!Objects.equals(this.proxy, other.proxy)) {94return false;95}96if (!Objects.equals(this.destination, other.destination)) {97return false;98}99return true;100}101102@Override103public int hashCode() {104return Objects.hash(proxy, destination);105}106}107108ConnectionPool(long clientId) {109this("ConnectionPool("+clientId+")");110}111112/**113* There should be one of these per HttpClient.114*/115private ConnectionPool(String tag) {116dbgTag = tag;117plainPool = new HashMap<>();118sslPool = new HashMap<>();119expiryList = new ExpiryList();120}121122final String dbgString() {123return dbgTag;124}125126synchronized void start() {127assert !stopped : "Already stopped";128}129130static CacheKey cacheKey(InetSocketAddress destination,131InetSocketAddress proxy)132{133return new CacheKey(destination, proxy);134}135136synchronized HttpConnection getConnection(boolean secure,137InetSocketAddress addr,138InetSocketAddress proxy) {139if (stopped) return null;140// for plain (unsecure) proxy connection the destination address is irrelevant.141addr = secure || proxy == null ? addr : null;142CacheKey key = new CacheKey(addr, proxy);143HttpConnection c = secure ? findConnection(key, sslPool)144: findConnection(key, plainPool);145//System.out.println ("getConnection returning: " + c);146assert c == null || c.isSecure() == secure;147return c;148}149150/**151* Returns the connection to the pool.152*/153void returnToPool(HttpConnection conn) {154returnToPool(conn, Instant.now(), KEEP_ALIVE);155}156157// Called also by whitebox tests158void returnToPool(HttpConnection conn, Instant now, long keepAlive) {159160assert (conn instanceof PlainHttpConnection) || conn.isSecure()161: "Attempting to return unsecure connection to SSL pool: "162+ conn.getClass();163164// Don't call registerCleanupTrigger while holding a lock,165// but register it before the connection is added to the pool,166// since we don't want to trigger the cleanup if the connection167// is not in the pool.168CleanupTrigger cleanup = registerCleanupTrigger(conn);169170// it's possible that cleanup may have been called.171HttpConnection toClose = null;172synchronized(this) {173if (cleanup.isDone()) {174return;175} else if (stopped) {176conn.close();177return;178}179if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {180toClose = expiryList.removeOldest();181if (toClose != null) removeFromPool(toClose);182}183if (conn instanceof PlainHttpConnection) {184putConnection(conn, plainPool);185} else {186assert conn.isSecure();187putConnection(conn, sslPool);188}189expiryList.add(conn, now, keepAlive);190}191if (toClose != null) {192if (debug.on()) {193debug.log("Maximum pool size reached: removing oldest connection %s",194toClose.dbgString());195}196close(toClose);197}198//System.out.println("Return to pool: " + conn);199}200201private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {202// Connect the connection flow to a pub/sub pair that will take the203// connection out of the pool and close it if anything happens204// while the connection is sitting in the pool.205CleanupTrigger cleanup = new CleanupTrigger(conn);206FlowTube flow = conn.getConnectionFlow();207if (debug.on()) debug.log("registering %s", cleanup);208flow.connectFlows(cleanup, cleanup);209return cleanup;210}211212private HttpConnection213findConnection(CacheKey key,214HashMap<CacheKey,LinkedList<HttpConnection>> pool) {215LinkedList<HttpConnection> l = pool.get(key);216if (l == null || l.isEmpty()) {217return null;218} else {219HttpConnection c = l.removeFirst();220expiryList.remove(c);221return c;222}223}224225/* called from cache cleaner only */226private boolean227removeFromPool(HttpConnection c,228HashMap<CacheKey,LinkedList<HttpConnection>> pool) {229//System.out.println("cacheCleaner removing: " + c);230assert Thread.holdsLock(this);231CacheKey k = c.cacheKey();232List<HttpConnection> l = pool.get(k);233if (l == null || l.isEmpty()) {234pool.remove(k);235return false;236}237return l.remove(c);238}239240private void241putConnection(HttpConnection c,242HashMap<CacheKey,LinkedList<HttpConnection>> pool) {243CacheKey key = c.cacheKey();244LinkedList<HttpConnection> l = pool.get(key);245if (l == null) {246l = new LinkedList<>();247pool.put(key, l);248}249l.add(c);250}251252/**253* Purge expired connection and return the number of milliseconds254* in which the next connection is scheduled to expire.255* If no connections are scheduled to be purged return 0.256* @return the delay in milliseconds in which the next connection will257* expire.258*/259long purgeExpiredConnectionsAndReturnNextDeadline() {260if (!expiryList.purgeMaybeRequired()) return 0;261return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());262}263264// Used for whitebox testing265long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {266long nextPurge = 0;267268// We may be in the process of adding new elements269// to the expiry list - but those elements will not270// have outlast their keep alive timer yet since we're271// just adding them.272if (!expiryList.purgeMaybeRequired()) return nextPurge;273274List<HttpConnection> closelist;275synchronized (this) {276closelist = expiryList.purgeUntil(now);277for (HttpConnection c : closelist) {278if (c instanceof PlainHttpConnection) {279boolean wasPresent = removeFromPool(c, plainPool);280assert wasPresent;281} else {282boolean wasPresent = removeFromPool(c, sslPool);283assert wasPresent;284}285}286nextPurge = now.until(287expiryList.nextExpiryDeadline().orElse(now),288ChronoUnit.MILLIS);289}290closelist.forEach(this::close);291return nextPurge;292}293294private void close(HttpConnection c) {295try {296c.close();297} catch (Throwable e) {} // ignore298}299300void stop() {301List<HttpConnection> closelist = Collections.emptyList();302try {303synchronized (this) {304stopped = true;305closelist = expiryList.stream()306.map(e -> e.connection)307.collect(Collectors.toList());308expiryList.clear();309plainPool.clear();310sslPool.clear();311}312} finally {313closelist.forEach(this::close);314}315}316317static final class ExpiryEntry {318final HttpConnection connection;319final Instant expiry; // absolute time in seconds of expiry time320ExpiryEntry(HttpConnection connection, Instant expiry) {321this.connection = connection;322this.expiry = expiry;323}324}325326/**327* Manages a LinkedList of sorted ExpiryEntry. The entry with the closer328* deadline is at the tail of the list, and the entry with the farther329* deadline is at the head. In the most common situation, new elements330* will need to be added at the head (or close to it), and expired elements331* will need to be purged from the tail.332*/333private static final class ExpiryList {334private final LinkedList<ExpiryEntry> list = new LinkedList<>();335private volatile boolean mayContainEntries;336337int size() { return list.size(); }338339// A loosely accurate boolean whose value is computed340// at the end of each operation performed on ExpiryList;341// Does not require synchronizing on the ConnectionPool.342boolean purgeMaybeRequired() {343return mayContainEntries;344}345346// Returns the next expiry deadline347// should only be called while holding a synchronization348// lock on the ConnectionPool349Optional<Instant> nextExpiryDeadline() {350if (list.isEmpty()) return Optional.empty();351else return Optional.of(list.getLast().expiry);352}353354// should only be called while holding a synchronization355// lock on the ConnectionPool356HttpConnection removeOldest() {357ExpiryEntry entry = list.pollLast();358return entry == null ? null : entry.connection;359}360361// should only be called while holding a synchronization362// lock on the ConnectionPool363void add(HttpConnection conn) {364add(conn, Instant.now(), KEEP_ALIVE);365}366367// Used by whitebox test.368void add(HttpConnection conn, Instant now, long keepAlive) {369Instant then = now.truncatedTo(ChronoUnit.SECONDS)370.plus(keepAlive, ChronoUnit.SECONDS);371372// Elements with the farther deadline are at the head of373// the list. It's more likely that the new element will374// have the farthest deadline, and will need to be inserted375// at the head of the list, so we're using an ascending376// list iterator to find the right insertion point.377ListIterator<ExpiryEntry> li = list.listIterator();378while (li.hasNext()) {379ExpiryEntry entry = li.next();380381if (then.isAfter(entry.expiry)) {382li.previous();383// insert here384li.add(new ExpiryEntry(conn, then));385mayContainEntries = true;386return;387}388}389// last (or first) element of list (the last element is390// the first when the list is empty)391list.add(new ExpiryEntry(conn, then));392mayContainEntries = true;393}394395// should only be called while holding a synchronization396// lock on the ConnectionPool397void remove(HttpConnection c) {398if (c == null || list.isEmpty()) return;399ListIterator<ExpiryEntry> li = list.listIterator();400while (li.hasNext()) {401ExpiryEntry e = li.next();402if (e.connection.equals(c)) {403li.remove();404mayContainEntries = !list.isEmpty();405return;406}407}408}409410// should only be called while holding a synchronization411// lock on the ConnectionPool.412// Purge all elements whose deadline is before now (now included).413List<HttpConnection> purgeUntil(Instant now) {414if (list.isEmpty()) return Collections.emptyList();415416List<HttpConnection> closelist = new ArrayList<>();417418// elements with the closest deadlines are at the tail419// of the queue, so we're going to use a descending iterator420// to remove them, and stop when we find the first element421// that has not expired yet.422Iterator<ExpiryEntry> li = list.descendingIterator();423while (li.hasNext()) {424ExpiryEntry entry = li.next();425// use !isAfter instead of isBefore in order to426// remove the entry if its expiry == now427if (!entry.expiry.isAfter(now)) {428li.remove();429HttpConnection c = entry.connection;430closelist.add(c);431} else break; // the list is sorted432}433mayContainEntries = !list.isEmpty();434return closelist;435}436437// should only be called while holding a synchronization438// lock on the ConnectionPool439java.util.stream.Stream<ExpiryEntry> stream() {440return list.stream();441}442443// should only be called while holding a synchronization444// lock on the ConnectionPool445void clear() {446list.clear();447mayContainEntries = false;448}449}450451// Remove a connection from the pool.452// should only be called while holding a synchronization453// lock on the ConnectionPool454private void removeFromPool(HttpConnection c) {455assert Thread.holdsLock(this);456if (c instanceof PlainHttpConnection) {457removeFromPool(c, plainPool);458} else {459assert c.isSecure() : "connection " + c + " is not secure!";460removeFromPool(c, sslPool);461}462}463464// Used by tests465synchronized boolean contains(HttpConnection c) {466final CacheKey key = c.cacheKey();467List<HttpConnection> list;468if ((list = plainPool.get(key)) != null) {469if (list.contains(c)) return true;470}471if ((list = sslPool.get(key)) != null) {472if (list.contains(c)) return true;473}474return false;475}476477void cleanup(HttpConnection c, Throwable error) {478if (debug.on())479debug.log("%s : ConnectionPool.cleanup(%s)",480String.valueOf(c.getConnectionFlow()), error);481synchronized(this) {482removeFromPool(c);483expiryList.remove(c);484}485c.close();486}487488/**489* An object that subscribes to the flow while the connection is in490* the pool. Anything that comes in will cause the connection to be closed491* and removed from the pool.492*/493private final class CleanupTrigger implements494FlowTube.TubeSubscriber, FlowTube.TubePublisher,495Flow.Subscription {496497private final HttpConnection connection;498private volatile boolean done;499500public CleanupTrigger(HttpConnection connection) {501this.connection = connection;502}503504public boolean isDone() { return done;}505506private void triggerCleanup(Throwable error) {507done = true;508cleanup(connection, error);509}510511@Override public void request(long n) {}512@Override public void cancel() {}513514@Override515public void onSubscribe(Flow.Subscription subscription) {516subscription.request(1);517}518@Override519public void onError(Throwable error) { triggerCleanup(error); }520@Override521public void onComplete() { triggerCleanup(null); }522@Override523public void onNext(List<ByteBuffer> item) {524triggerCleanup(new IOException("Data received while in pool"));525}526527@Override528public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {529subscriber.onSubscribe(this);530}531532@Override533public String toString() {534return "CleanupTrigger(" + connection.getConnectionFlow() + ")";535}536}537}538539540