Path: blob/master/src/java.base/share/classes/java/nio/channels/Channels.java
41159 views
/*1* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package java.nio.channels;2627import java.io.FileInputStream;28import java.io.FileOutputStream;29import java.io.InputStream;30import java.io.OutputStream;31import java.io.Reader;32import java.io.Writer;33import java.io.IOException;34import java.nio.ByteBuffer;35import java.nio.charset.Charset;36import java.nio.charset.CharsetDecoder;37import java.nio.charset.CharsetEncoder;38import java.nio.charset.UnsupportedCharsetException;39import java.nio.channels.spi.AbstractInterruptibleChannel;40import java.util.Objects;41import java.util.concurrent.ExecutionException;42import sun.nio.ch.ChannelInputStream;43import sun.nio.cs.StreamDecoder;44import sun.nio.cs.StreamEncoder;454647/**48* Utility methods for channels and streams.49*50* <p> This class defines static methods that support the interoperation of the51* stream classes of the {@link java.io} package with the channel classes52* of this package. </p>53*54*55* @author Mark Reinhold56* @author Mike McCloskey57* @author JSR-51 Expert Group58* @since 1.459*/6061public final class Channels {6263private Channels() { throw new Error("no instances"); }6465/**66* Write all remaining bytes in buffer to the given channel.67* If the channel is selectable then it must be configured blocking.68*/69private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)70throws IOException71{72while (bb.remaining() > 0) {73int n = ch.write(bb);74if (n <= 0)75throw new RuntimeException("no bytes written");76}77}7879/**80* Write all remaining bytes in buffer to the given channel.81*82* @throws IllegalBlockingModeException83* If the channel is selectable and configured non-blocking.84*/85private static void writeFully(WritableByteChannel ch, ByteBuffer bb)86throws IOException87{88if (ch instanceof SelectableChannel sc) {89synchronized (sc.blockingLock()) {90if (!sc.isBlocking())91throw new IllegalBlockingModeException();92writeFullyImpl(ch, bb);93}94} else {95writeFullyImpl(ch, bb);96}97}9899// -- Byte streams from channels --100101/**102* Constructs a stream that reads bytes from the given channel.103*104* <p> The {@code read} methods of the resulting stream will throw an105* {@link IllegalBlockingModeException} if invoked while the underlying106* channel is in non-blocking mode. The stream will not be buffered, and107* it will not support the {@link InputStream#mark mark} or {@link108* InputStream#reset reset} methods. The stream will be safe for access by109* multiple concurrent threads. Closing the stream will in turn cause the110* channel to be closed. </p>111*112* @param ch113* The channel from which bytes will be read114*115* @return A new input stream116*/117public static InputStream newInputStream(ReadableByteChannel ch) {118Objects.requireNonNull(ch, "ch");119return new ChannelInputStream(ch);120}121122/**123* Constructs a stream that writes bytes to the given channel.124*125* <p> The {@code write} methods of the resulting stream will throw an126* {@link IllegalBlockingModeException} if invoked while the underlying127* channel is in non-blocking mode. The stream will not be buffered. The128* stream will be safe for access by multiple concurrent threads. Closing129* the stream will in turn cause the channel to be closed. </p>130*131* @param ch132* The channel to which bytes will be written133*134* @return A new output stream135*/136public static OutputStream newOutputStream(WritableByteChannel ch) {137Objects.requireNonNull(ch, "ch");138139return new OutputStream() {140141private ByteBuffer bb;142private byte[] bs; // Invoker's previous array143private byte[] b1;144145@Override146public synchronized void write(int b) throws IOException {147if (b1 == null)148b1 = new byte[1];149b1[0] = (byte) b;150this.write(b1);151}152153@Override154public synchronized void write(byte[] bs, int off, int len)155throws IOException156{157if ((off < 0) || (off > bs.length) || (len < 0) ||158((off + len) > bs.length) || ((off + len) < 0)) {159throw new IndexOutOfBoundsException();160} else if (len == 0) {161return;162}163ByteBuffer bb = ((this.bs == bs)164? this.bb165: ByteBuffer.wrap(bs));166bb.limit(Math.min(off + len, bb.capacity()));167bb.position(off);168this.bb = bb;169this.bs = bs;170Channels.writeFully(ch, bb);171}172173@Override174public void close() throws IOException {175ch.close();176}177178};179}180181/**182* Constructs a stream that reads bytes from the given channel.183*184* <p> The stream will not be buffered, and it will not support the {@link185* InputStream#mark mark} or {@link InputStream#reset reset} methods. The186* stream will be safe for access by multiple concurrent threads. Closing187* the stream will in turn cause the channel to be closed. </p>188*189* @param ch190* The channel from which bytes will be read191*192* @return A new input stream193*194* @since 1.7195*/196public static InputStream newInputStream(AsynchronousByteChannel ch) {197Objects.requireNonNull(ch, "ch");198return new InputStream() {199200private ByteBuffer bb;201private byte[] bs; // Invoker's previous array202private byte[] b1;203204@Override205public synchronized int read() throws IOException {206if (b1 == null)207b1 = new byte[1];208int n = this.read(b1);209if (n == 1)210return b1[0] & 0xff;211return -1;212}213214@Override215public synchronized int read(byte[] bs, int off, int len)216throws IOException217{218if ((off < 0) || (off > bs.length) || (len < 0) ||219((off + len) > bs.length) || ((off + len) < 0)) {220throw new IndexOutOfBoundsException();221} else if (len == 0) {222return 0;223}224225ByteBuffer bb = ((this.bs == bs)226? this.bb227: ByteBuffer.wrap(bs));228bb.position(off);229bb.limit(Math.min(off + len, bb.capacity()));230this.bb = bb;231this.bs = bs;232233boolean interrupted = false;234try {235for (;;) {236try {237return ch.read(bb).get();238} catch (ExecutionException ee) {239throw new IOException(ee.getCause());240} catch (InterruptedException ie) {241interrupted = true;242}243}244} finally {245if (interrupted)246Thread.currentThread().interrupt();247}248}249250@Override251public void close() throws IOException {252ch.close();253}254};255}256257/**258* Constructs a stream that writes bytes to the given channel.259*260* <p> The stream will not be buffered. The stream will be safe for access261* by multiple concurrent threads. Closing the stream will in turn cause262* the channel to be closed. </p>263*264* @param ch265* The channel to which bytes will be written266*267* @return A new output stream268*269* @since 1.7270*/271public static OutputStream newOutputStream(AsynchronousByteChannel ch) {272Objects.requireNonNull(ch, "ch");273return new OutputStream() {274275private ByteBuffer bb;276private byte[] bs; // Invoker's previous array277private byte[] b1;278279@Override280public synchronized void write(int b) throws IOException {281if (b1 == null)282b1 = new byte[1];283b1[0] = (byte) b;284this.write(b1);285}286287@Override288public synchronized void write(byte[] bs, int off, int len)289throws IOException290{291if ((off < 0) || (off > bs.length) || (len < 0) ||292((off + len) > bs.length) || ((off + len) < 0)) {293throw new IndexOutOfBoundsException();294} else if (len == 0) {295return;296}297ByteBuffer bb = ((this.bs == bs)298? this.bb299: ByteBuffer.wrap(bs));300bb.limit(Math.min(off + len, bb.capacity()));301bb.position(off);302this.bb = bb;303this.bs = bs;304305boolean interrupted = false;306try {307while (bb.remaining() > 0) {308try {309ch.write(bb).get();310} catch (ExecutionException ee) {311throw new IOException(ee.getCause());312} catch (InterruptedException ie) {313interrupted = true;314}315}316} finally {317if (interrupted)318Thread.currentThread().interrupt();319}320}321322@Override323public void close() throws IOException {324ch.close();325}326};327}328329330// -- Channels from streams --331332/**333* Constructs a channel that reads bytes from the given stream.334*335* <p> The resulting channel will not be buffered; it will simply redirect336* its I/O operations to the given stream. Closing the channel will in337* turn cause the stream to be closed. </p>338*339* @param in340* The stream from which bytes are to be read341*342* @return A new readable byte channel343*/344public static ReadableByteChannel newChannel(InputStream in) {345Objects.requireNonNull(in, "in");346347if (in.getClass() == FileInputStream.class) {348return ((FileInputStream) in).getChannel();349}350351return new ReadableByteChannelImpl(in);352}353354private static class ReadableByteChannelImpl355extends AbstractInterruptibleChannel // Not really interruptible356implements ReadableByteChannel357{358private final InputStream in;359private static final int TRANSFER_SIZE = 8192;360private byte[] buf = new byte[0];361private final Object readLock = new Object();362363ReadableByteChannelImpl(InputStream in) {364this.in = in;365}366367@Override368public int read(ByteBuffer dst) throws IOException {369if (!isOpen()) {370throw new ClosedChannelException();371}372373int len = dst.remaining();374int totalRead = 0;375int bytesRead = 0;376synchronized (readLock) {377while (totalRead < len) {378int bytesToRead = Math.min((len - totalRead),379TRANSFER_SIZE);380if (buf.length < bytesToRead)381buf = new byte[bytesToRead];382if ((totalRead > 0) && !(in.available() > 0))383break; // block at most once384try {385begin();386bytesRead = in.read(buf, 0, bytesToRead);387} finally {388end(bytesRead > 0);389}390if (bytesRead < 0)391break;392else393totalRead += bytesRead;394dst.put(buf, 0, bytesRead);395}396if ((bytesRead < 0) && (totalRead == 0))397return -1;398399return totalRead;400}401}402403@Override404protected void implCloseChannel() throws IOException {405in.close();406}407}408409410/**411* Constructs a channel that writes bytes to the given stream.412*413* <p> The resulting channel will not be buffered; it will simply redirect414* its I/O operations to the given stream. Closing the channel will in415* turn cause the stream to be closed. </p>416*417* @param out418* The stream to which bytes are to be written419*420* @return A new writable byte channel421*/422public static WritableByteChannel newChannel(OutputStream out) {423Objects.requireNonNull(out, "out");424425if (out.getClass() == FileOutputStream.class) {426return ((FileOutputStream) out).getChannel();427}428429return new WritableByteChannelImpl(out);430}431432private static class WritableByteChannelImpl433extends AbstractInterruptibleChannel // Not really interruptible434implements WritableByteChannel435{436private final OutputStream out;437private static final int TRANSFER_SIZE = 8192;438private byte[] buf = new byte[0];439private final Object writeLock = new Object();440441WritableByteChannelImpl(OutputStream out) {442this.out = out;443}444445@Override446public int write(ByteBuffer src) throws IOException {447if (!isOpen()) {448throw new ClosedChannelException();449}450451int len = src.remaining();452int totalWritten = 0;453synchronized (writeLock) {454while (totalWritten < len) {455int bytesToWrite = Math.min((len - totalWritten),456TRANSFER_SIZE);457if (buf.length < bytesToWrite)458buf = new byte[bytesToWrite];459src.get(buf, 0, bytesToWrite);460try {461begin();462out.write(buf, 0, bytesToWrite);463} finally {464end(bytesToWrite > 0);465}466totalWritten += bytesToWrite;467}468return totalWritten;469}470}471472@Override473protected void implCloseChannel() throws IOException {474out.close();475}476}477478479// -- Character streams from channels --480481/**482* Constructs a reader that decodes bytes from the given channel using the483* given decoder.484*485* <p> The resulting stream will contain an internal input buffer of at486* least {@code minBufferCap} bytes. The stream's {@code read} methods487* will, as needed, fill the buffer by reading bytes from the underlying488* channel; if the channel is in non-blocking mode when bytes are to be489* read then an {@link IllegalBlockingModeException} will be thrown. The490* resulting stream will not otherwise be buffered, and it will not support491* the {@link Reader#mark mark} or {@link Reader#reset reset} methods.492* Closing the stream will in turn cause the channel to be closed. </p>493*494* @param ch495* The channel from which bytes will be read496*497* @param dec498* The charset decoder to be used499*500* @param minBufferCap501* The minimum capacity of the internal byte buffer,502* or {@code -1} if an implementation-dependent503* default capacity is to be used504*505* @return A new reader506*/507public static Reader newReader(ReadableByteChannel ch,508CharsetDecoder dec,509int minBufferCap)510{511Objects.requireNonNull(ch, "ch");512return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);513}514515/**516* Constructs a reader that decodes bytes from the given channel according517* to the named charset.518*519* <p> An invocation of this method of the form520*521* <pre> {@code522* Channels.newReader(ch, csname)523* } </pre>524*525* behaves in exactly the same way as the expression526*527* <pre> {@code528* Channels.newReader(ch, Charset.forName(csName))529* } </pre>530*531* @param ch532* The channel from which bytes will be read533*534* @param csName535* The name of the charset to be used536*537* @return A new reader538*539* @throws UnsupportedCharsetException540* If no support for the named charset is available541* in this instance of the Java virtual machine542*/543public static Reader newReader(ReadableByteChannel ch,544String csName)545{546Objects.requireNonNull(csName, "csName");547return newReader(ch, Charset.forName(csName).newDecoder(), -1);548}549550/**551* Constructs a reader that decodes bytes from the given channel according552* to the given charset.553*554* <p> An invocation of this method of the form555*556* <pre> {@code557* Channels.newReader(ch, charset)558* } </pre>559*560* behaves in exactly the same way as the expression561*562* <pre> {@code563* Channels.newReader(ch, Charset.forName(csName).newDecoder(), -1)564* } </pre>565*566* <p> The reader's default action for malformed-input and unmappable-character567* errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report}568* them. When more control over the error handling is required, the constructor569* that takes a {@linkplain java.nio.charset.CharsetDecoder} should be used.570*571* @param ch The channel from which bytes will be read572*573* @param charset The charset to be used574*575* @return A new reader576*/577public static Reader newReader(ReadableByteChannel ch, Charset charset) {578Objects.requireNonNull(charset, "charset");579return newReader(ch, charset.newDecoder(), -1);580}581582/**583* Constructs a writer that encodes characters using the given encoder and584* writes the resulting bytes to the given channel.585*586* <p> The resulting stream will contain an internal output buffer of at587* least {@code minBufferCap} bytes. The stream's {@code write} methods588* will, as needed, flush the buffer by writing bytes to the underlying589* channel; if the channel is in non-blocking mode when bytes are to be590* written then an {@link IllegalBlockingModeException} will be thrown.591* The resulting stream will not otherwise be buffered. Closing the stream592* will in turn cause the channel to be closed. </p>593*594* @param ch595* The channel to which bytes will be written596*597* @param enc598* The charset encoder to be used599*600* @param minBufferCap601* The minimum capacity of the internal byte buffer,602* or {@code -1} if an implementation-dependent603* default capacity is to be used604*605* @return A new writer606*/607public static Writer newWriter(WritableByteChannel ch,608CharsetEncoder enc,609int minBufferCap)610{611Objects.requireNonNull(ch, "ch");612return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);613}614615/**616* Constructs a writer that encodes characters according to the named617* charset and writes the resulting bytes to the given channel.618*619* <p> An invocation of this method of the form620*621* <pre> {@code622* Channels.newWriter(ch, csname)623* } </pre>624*625* behaves in exactly the same way as the expression626*627* <pre> {@code628* Channels.newWriter(ch, Charset.forName(csName))629* } </pre>630*631* @param ch632* The channel to which bytes will be written633*634* @param csName635* The name of the charset to be used636*637* @return A new writer638*639* @throws UnsupportedCharsetException640* If no support for the named charset is available641* in this instance of the Java virtual machine642*/643public static Writer newWriter(WritableByteChannel ch,644String csName)645{646Objects.requireNonNull(csName, "csName");647return newWriter(ch, Charset.forName(csName).newEncoder(), -1);648}649650/**651* Constructs a writer that encodes characters according to the given652* charset and writes the resulting bytes to the given channel.653*654* <p> An invocation of this method of the form655*656* <pre> {@code657* Channels.newWriter(ch, charset)658* } </pre>659*660* behaves in exactly the same way as the expression661*662* <pre> {@code663* Channels.newWriter(ch, Charset.forName(csName).newEncoder(), -1)664* } </pre>665*666* <p> The writer's default action for malformed-input and unmappable-character667* errors is to {@linkplain java.nio.charset.CodingErrorAction#REPORT report}668* them. When more control over the error handling is required, the constructor669* that takes a {@linkplain java.nio.charset.CharsetEncoder} should be used.670*671* @param ch672* The channel to which bytes will be written673*674* @param charset675* The charset to be used676*677* @return A new writer678*/679public static Writer newWriter(WritableByteChannel ch, Charset charset) {680Objects.requireNonNull(charset, "charset");681return newWriter(ch, charset.newEncoder(), -1);682}683}684685686