Path: blob/master/src/java.base/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java
41159 views
/*1* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.Channel;28import java.nio.channels.AsynchronousChannelGroup;29import java.nio.channels.spi.AsynchronousChannelProvider;30import java.io.IOException;31import java.io.FileDescriptor;32import java.util.Queue;33import java.util.concurrent.*;34import java.util.concurrent.atomic.AtomicInteger;35import java.util.concurrent.atomic.AtomicBoolean;36import java.security.PrivilegedAction;37import java.security.AccessController;38import java.security.AccessControlContext;39import sun.security.action.GetIntegerAction;4041/**42* Base implementation of AsynchronousChannelGroup43*/4445abstract class AsynchronousChannelGroupImpl46extends AsynchronousChannelGroup implements Executor47{48// number of internal threads handling I/O events when using an unbounded49// thread pool. Internal threads do not dispatch to completion handlers.50@SuppressWarnings("removal")51private static final int internalThreadCount = AccessController.doPrivileged(52new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));5354// associated thread pool55private final ThreadPool pool;5657// number of tasks running (including internal)58private final AtomicInteger threadCount = new AtomicInteger();5960// associated Executor for timeouts61private ScheduledThreadPoolExecutor timeoutExecutor;6263// task queue for when using a fixed thread pool. In that case, a thread64// waiting on I/O events must be awoken to poll tasks from this queue.65private final Queue<Runnable> taskQueue;6667// group shutdown68private final AtomicBoolean shutdown = new AtomicBoolean();69private final Object shutdownNowLock = new Object();70private volatile boolean terminateInitiated;7172AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,73ThreadPool pool)74{75super(provider);76this.pool = pool;7778if (pool.isFixedThreadPool()) {79taskQueue = new ConcurrentLinkedQueue<>();80} else {81taskQueue = null; // not used82}8384// use default thread factory as thread should not be visible to85// application (it doesn't execute completion handlers).86this.timeoutExecutor = (ScheduledThreadPoolExecutor)87Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());88this.timeoutExecutor.setRemoveOnCancelPolicy(true);89}9091final ExecutorService executor() {92return pool.executor();93}9495final boolean isFixedThreadPool() {96return pool.isFixedThreadPool();97}9899final int fixedThreadCount() {100if (isFixedThreadPool()) {101return pool.poolSize();102} else {103return pool.poolSize() + internalThreadCount;104}105}106107private Runnable bindToGroup(final Runnable task) {108final AsynchronousChannelGroupImpl thisGroup = this;109return new Runnable() {110public void run() {111Invoker.bindToGroup(thisGroup);112task.run();113}114};115}116117@SuppressWarnings("removal")118private void startInternalThread(final Runnable task) {119AccessController.doPrivileged(new PrivilegedAction<>() {120@Override121public Void run() {122// internal threads should not be visible to application so123// cannot use user-supplied thread factory124ThreadPool.defaultThreadFactory().newThread(task).start();125return null;126}127});128}129130protected final void startThreads(Runnable task) {131if (!isFixedThreadPool()) {132for (int i=0; i<internalThreadCount; i++) {133startInternalThread(task);134threadCount.incrementAndGet();135}136}137if (pool.poolSize() > 0) {138task = bindToGroup(task);139try {140for (int i=0; i<pool.poolSize(); i++) {141pool.executor().execute(task);142threadCount.incrementAndGet();143}144} catch (RejectedExecutionException x) {145// nothing we can do146}147}148}149150final int threadCount() {151return threadCount.get();152}153154/**155* Invoked by tasks as they terminate156*/157final int threadExit(Runnable task, boolean replaceMe) {158if (replaceMe) {159try {160if (Invoker.isBoundToAnyGroup()) {161// submit new task to replace this thread162pool.executor().execute(bindToGroup(task));163} else {164// replace internal thread165startInternalThread(task);166}167return threadCount.get();168} catch (RejectedExecutionException x) {169// unable to replace170}171}172return threadCount.decrementAndGet();173}174175/**176* Wakes up a thread waiting for I/O events to execute the given task.177*/178abstract void executeOnHandlerTask(Runnable task);179180/**181* For a fixed thread pool the task is queued to a thread waiting on I/O182* events. For other thread pools we simply submit the task to the thread183* pool.184*/185final void executeOnPooledThread(Runnable task) {186if (isFixedThreadPool()) {187executeOnHandlerTask(task);188} else {189pool.executor().execute(bindToGroup(task));190}191}192193final void offerTask(Runnable task) {194taskQueue.offer(task);195}196197final Runnable pollTask() {198return (taskQueue == null) ? null : taskQueue.poll();199}200201final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {202try {203return timeoutExecutor.schedule(task, timeout, unit);204} catch (RejectedExecutionException rej) {205if (terminateInitiated) {206// no timeout scheduled as group is terminating207return null;208}209throw new AssertionError(rej);210}211}212213@Override214public final boolean isShutdown() {215return shutdown.get();216}217218@Override219public final boolean isTerminated() {220return pool.executor().isTerminated();221}222223/**224* Returns true if there are no channels in the group225*/226abstract boolean isEmpty();227228/**229* Attaches a foreign channel to this group.230*/231abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)232throws IOException;233234/**235* Detaches a foreign channel from this group.236*/237abstract void detachForeignChannel(Object key);238239/**240* Closes all channels in the group241*/242abstract void closeAllChannels() throws IOException;243244/**245* Shutdown all tasks waiting for I/O events.246*/247abstract void shutdownHandlerTasks();248249@SuppressWarnings("removal")250private void shutdownExecutors() {251AccessController.doPrivileged(252new PrivilegedAction<>() {253public Void run() {254pool.executor().shutdown();255timeoutExecutor.shutdown();256return null;257}258},259null,260new RuntimePermission("modifyThread"));261}262263@Override264public final void shutdown() {265if (shutdown.getAndSet(true)) {266// already shutdown267return;268}269// if there are channels in the group then shutdown will continue270// when the last channel is closed271if (!isEmpty()) {272return;273}274// initiate termination (acquire shutdownNowLock to ensure that other275// threads invoking shutdownNow will block).276synchronized (shutdownNowLock) {277if (!terminateInitiated) {278terminateInitiated = true;279shutdownHandlerTasks();280shutdownExecutors();281}282}283}284285@Override286public final void shutdownNow() throws IOException {287shutdown.set(true);288synchronized (shutdownNowLock) {289if (!terminateInitiated) {290terminateInitiated = true;291closeAllChannels();292shutdownHandlerTasks();293shutdownExecutors();294}295}296}297298/**299* For use by AsynchronousFileChannel to release resources without shutting300* down the thread pool.301*/302final void detachFromThreadPool() {303if (shutdown.getAndSet(true))304throw new AssertionError("Already shutdown");305if (!isEmpty())306throw new AssertionError("Group not empty");307shutdownHandlerTasks();308}309310@Override311public final boolean awaitTermination(long timeout, TimeUnit unit)312throws InterruptedException313{314return pool.executor().awaitTermination(timeout, unit);315}316317/**318* Executes the given command on one of the channel group's pooled threads.319*/320@Override321public final void execute(Runnable task) {322@SuppressWarnings("removal")323SecurityManager sm = System.getSecurityManager();324if (sm != null) {325// when a security manager is installed then the user's task326// must be run with the current calling context327@SuppressWarnings("removal")328final AccessControlContext acc = AccessController.getContext();329final Runnable delegate = task;330task = new Runnable() {331@SuppressWarnings("removal")332@Override333public void run() {334AccessController.doPrivileged(new PrivilegedAction<>() {335@Override336public Void run() {337delegate.run();338return null;339}340}, acc);341}342};343}344executeOnPooledThread(task);345}346}347348349