Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java
41171 views
/*1* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.FileInputStream;28import java.io.FileNotFoundException;29import java.io.FilePermission;30import java.io.IOException;31import java.io.InputStream;32import java.io.UncheckedIOException;33import java.lang.reflect.UndeclaredThrowableException;34import java.net.http.HttpRequest.BodyPublisher;35import java.nio.ByteBuffer;36import java.nio.charset.Charset;37import java.nio.file.Files;38import java.nio.file.Path;39import java.security.AccessControlContext;40import java.security.AccessController;41import java.security.Permission;42import java.security.PrivilegedActionException;43import java.security.PrivilegedExceptionAction;44import java.util.ArrayList;45import java.util.Collections;46import java.util.Iterator;47import java.util.List;48import java.util.NoSuchElementException;49import java.util.Objects;50import java.util.Queue;51import java.util.concurrent.ConcurrentLinkedQueue;52import java.util.concurrent.Flow;53import java.util.concurrent.Flow.Publisher;54import java.util.concurrent.atomic.AtomicReference;55import java.util.function.Function;56import java.util.function.Supplier;5758import jdk.internal.net.http.common.Demand;59import jdk.internal.net.http.common.SequentialScheduler;60import jdk.internal.net.http.common.Utils;6162public final class RequestPublishers {6364private RequestPublishers() { }6566public static class ByteArrayPublisher implements BodyPublisher {67private final int length;68private final byte[] content;69private final int offset;70private final int bufSize;7172public ByteArrayPublisher(byte[] content) {73this(content, 0, content.length);74}7576public ByteArrayPublisher(byte[] content, int offset, int length) {77this(content, offset, length, Utils.BUFSIZE);78}7980/* bufSize exposed for testing purposes */81ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {82this.content = content;83this.offset = offset;84this.length = length;85this.bufSize = bufSize;86}8788List<ByteBuffer> copy(byte[] content, int offset, int length) {89List<ByteBuffer> bufs = new ArrayList<>();90while (length > 0) {91ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));92int max = b.capacity();93int tocopy = Math.min(max, length);94b.put(content, offset, tocopy);95offset += tocopy;96length -= tocopy;97b.flip();98bufs.add(b);99}100return bufs;101}102103@Override104public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {105List<ByteBuffer> copy = copy(content, offset, length);106var delegate = new PullPublisher<>(copy);107delegate.subscribe(subscriber);108}109110@Override111public long contentLength() {112return length;113}114}115116// This implementation has lots of room for improvement.117public static class IterablePublisher implements BodyPublisher {118private final Iterable<byte[]> content;119private volatile long contentLength;120121public IterablePublisher(Iterable<byte[]> content) {122this.content = Objects.requireNonNull(content);123}124125// The ByteBufferIterator will iterate over the byte[] arrays in126// the content one at the time.127//128class ByteBufferIterator implements Iterator<ByteBuffer> {129final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();130final Iterator<byte[]> iterator = content.iterator();131@Override132public boolean hasNext() {133return !buffers.isEmpty() || iterator.hasNext();134}135136@Override137public ByteBuffer next() {138ByteBuffer buffer = buffers.poll();139while (buffer == null) {140copy();141buffer = buffers.poll();142}143return buffer;144}145146ByteBuffer getBuffer() {147return Utils.getBuffer();148}149150void copy() {151byte[] bytes = iterator.next();152int length = bytes.length;153if (length == 0 && iterator.hasNext()) {154// avoid inserting empty buffers, except155// if that's the last.156return;157}158int offset = 0;159do {160ByteBuffer b = getBuffer();161int max = b.capacity();162163int tocopy = Math.min(max, length);164b.put(bytes, offset, tocopy);165offset += tocopy;166length -= tocopy;167b.flip();168buffers.add(b);169} while (length > 0);170}171}172173public Iterator<ByteBuffer> iterator() {174return new ByteBufferIterator();175}176177@Override178public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {179Iterable<ByteBuffer> iterable = this::iterator;180var delegate = new PullPublisher<>(iterable);181delegate.subscribe(subscriber);182}183184static long computeLength(Iterable<byte[]> bytes) {185// Avoid iterating just for the purpose of computing186// a length, in case iterating is a costly operation187// For HTTP/1.1 it means we will be using chunk encoding188// when sending the request body.189// For HTTP/2 it means we will not send the optional190// Content-length header.191return -1;192}193194@Override195public long contentLength() {196if (contentLength == 0) {197synchronized(this) {198if (contentLength == 0) {199contentLength = computeLength(content);200}201}202}203return contentLength;204}205}206207public static class StringPublisher extends ByteArrayPublisher {208public StringPublisher(String content, Charset charset) {209super(content.getBytes(charset));210}211}212213public static class EmptyPublisher implements BodyPublisher {214private final Flow.Publisher<ByteBuffer> delegate =215new PullPublisher<ByteBuffer>(Collections.emptyList(), null);216217@Override218public long contentLength() {219return 0;220}221222@Override223public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {224delegate.subscribe(subscriber);225}226}227228/**229* Publishes the content of a given file.230* <p>231* Privileged actions are performed within a limited doPrivileged that only232* asserts the specific, read, file permission that was checked during the233* construction of this FilePublisher. This only applies if the file system234* that created the file provides interoperability with {@code java.io.File}.235*/236public static class FilePublisher implements BodyPublisher {237238private final Path path;239private final long length;240private final Function<Path, InputStream> inputStreamSupplier;241242private static String pathForSecurityCheck(Path path) {243return path.toFile().getPath();244}245246/**247* Factory for creating FilePublisher.248*249* Permission checks are performed here before construction of the250* FilePublisher. Permission checking and construction are deliberately251* and tightly co-located.252*/253public static FilePublisher create(Path path)254throws FileNotFoundException {255@SuppressWarnings("removal")256SecurityManager sm = System.getSecurityManager();257FilePermission filePermission = null;258boolean defaultFS = true;259260try {261String fn = pathForSecurityCheck(path);262if (sm != null) {263FilePermission readPermission = new FilePermission(fn, "read");264sm.checkPermission(readPermission);265filePermission = readPermission;266}267} catch (UnsupportedOperationException uoe) {268defaultFS = false;269// Path not associated with the default file system270// Test early if an input stream can still be obtained271try {272if (sm != null) {273Files.newInputStream(path).close();274}275} catch (IOException ioe) {276if (ioe instanceof FileNotFoundException) {277throw (FileNotFoundException) ioe;278} else {279var ex = new FileNotFoundException(ioe.getMessage());280ex.initCause(ioe);281throw ex;282}283}284}285286// existence check must be after permission checks287if (Files.notExists(path))288throw new FileNotFoundException(path + " not found");289290Permission perm = filePermission;291assert perm == null || perm.getActions().equals("read");292@SuppressWarnings("removal")293AccessControlContext acc = sm != null ?294AccessController.getContext() : null;295boolean finalDefaultFS = defaultFS;296Function<Path, InputStream> inputStreamSupplier = (p) ->297createInputStream(p, acc, perm, finalDefaultFS);298299long length;300try {301length = Files.size(path);302} catch (IOException ioe) {303length = -1;304}305306return new FilePublisher(path, length, inputStreamSupplier);307}308309@SuppressWarnings("removal")310private static InputStream createInputStream(Path path,311AccessControlContext acc,312Permission perm,313boolean defaultFS) {314try {315if (acc != null) {316PrivilegedExceptionAction<InputStream> pa = defaultFS317? () -> new FileInputStream(path.toFile())318: () -> Files.newInputStream(path);319return perm != null320? AccessController.doPrivileged(pa, acc, perm)321: AccessController.doPrivileged(pa, acc);322} else {323return defaultFS324? new FileInputStream(path.toFile())325: Files.newInputStream(path);326}327} catch (PrivilegedActionException pae) {328throw toUncheckedException(pae.getCause());329} catch (IOException io) {330throw new UncheckedIOException(io);331}332}333334private static RuntimeException toUncheckedException(Throwable t) {335if (t instanceof RuntimeException)336throw (RuntimeException) t;337if (t instanceof Error)338throw (Error) t;339if (t instanceof IOException)340throw new UncheckedIOException((IOException) t);341throw new UndeclaredThrowableException(t);342}343344private FilePublisher(Path name,345long length,346Function<Path, InputStream> inputStreamSupplier) {347path = name;348this.length = length;349this.inputStreamSupplier = inputStreamSupplier;350}351352@Override353public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {354InputStream is = null;355Throwable t = null;356try {357is = inputStreamSupplier.apply(path);358} catch (UncheckedIOException | UndeclaredThrowableException ue) {359t = ue.getCause();360} catch (Throwable th) {361t = th;362}363final InputStream fis = is;364PullPublisher<ByteBuffer> publisher;365if (t == null) {366publisher = new PullPublisher<>(() -> new StreamIterator(fis));367} else {368publisher = new PullPublisher<>(null, t);369}370publisher.subscribe(subscriber);371}372373@Override374public long contentLength() {375return length;376}377}378379/**380* Reads one buffer ahead all the time, blocking in hasNext()381*/382public static class StreamIterator implements Iterator<ByteBuffer> {383final InputStream is;384final Supplier<? extends ByteBuffer> bufSupplier;385private volatile boolean eof;386volatile ByteBuffer nextBuffer;387volatile boolean need2Read = true;388volatile boolean haveNext;389390StreamIterator(InputStream is) {391this(is, Utils::getBuffer);392}393394StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) {395this.is = is;396this.bufSupplier = bufSupplier;397}398399// Throwable error() {400// return error;401// }402403private int read() throws IOException {404if (eof)405return -1;406nextBuffer = bufSupplier.get();407nextBuffer.clear();408byte[] buf = nextBuffer.array();409int offset = nextBuffer.arrayOffset();410int cap = nextBuffer.capacity();411int n = is.read(buf, offset, cap);412if (n == -1) {413eof = true;414return -1;415}416//flip417nextBuffer.limit(n);418nextBuffer.position(0);419return n;420}421422/**423* Close stream in this instance.424* UncheckedIOException may be thrown if IOE happens at InputStream::close.425*/426private void closeStream() {427try {428is.close();429} catch (IOException e) {430throw new UncheckedIOException(e);431}432}433434@Override435public synchronized boolean hasNext() {436if (need2Read) {437try {438haveNext = read() != -1;439if (haveNext) {440need2Read = false;441}442} catch (IOException e) {443haveNext = false;444need2Read = false;445throw new UncheckedIOException(e);446} finally {447if (!haveNext) {448closeStream();449}450}451}452return haveNext;453}454455@Override456public synchronized ByteBuffer next() {457if (!hasNext()) {458throw new NoSuchElementException();459}460need2Read = true;461return nextBuffer;462}463464}465466public static class InputStreamPublisher implements BodyPublisher {467private final Supplier<? extends InputStream> streamSupplier;468469public InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {470this.streamSupplier = Objects.requireNonNull(streamSupplier);471}472473@Override474public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {475PullPublisher<ByteBuffer> publisher;476InputStream is = streamSupplier.get();477if (is == null) {478Throwable t = new IOException("streamSupplier returned null");479publisher = new PullPublisher<>(null, t);480} else {481publisher = new PullPublisher<>(iterableOf(is), null);482}483publisher.subscribe(subscriber);484}485486protected Iterable<ByteBuffer> iterableOf(InputStream is) {487return () -> new StreamIterator(is);488}489490@Override491public long contentLength() {492return -1;493}494}495496public static final class PublisherAdapter implements BodyPublisher {497498private final Publisher<? extends ByteBuffer> publisher;499private final long contentLength;500501public PublisherAdapter(Publisher<? extends ByteBuffer> publisher,502long contentLength) {503this.publisher = Objects.requireNonNull(publisher);504this.contentLength = contentLength;505}506507@Override508public final long contentLength() {509return contentLength;510}511512@Override513public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {514publisher.subscribe(subscriber);515}516}517518519public static BodyPublisher concat(BodyPublisher... publishers) {520if (publishers.length == 0) {521return new EmptyPublisher();522} else if (publishers.length == 1) {523return Objects.requireNonNull(publishers[0]);524} else {525return new AggregatePublisher(List.of(publishers));526}527}528529/**530* An aggregate publisher acts as a proxy between a subscriber531* and a list of publishers. It lazily subscribes to each publisher532* in sequence in order to publish a request body that is533* composed from all the bytes obtained from each publisher.534* For instance, the following two publishers are equivalent, even535* though they may result in a different count of {@code onNext}536* invocations.537* <pre>{@code538* var bp1 = BodyPublishers.ofString("ab");539* var bp2 = BodyPublishers.concat(BodyPublishers.ofString("a"),540* BodyPublisher.ofByteArray(new byte[] {(byte)'b'}));541* }</pre>542*543*/544private static final class AggregatePublisher implements BodyPublisher {545final List<BodyPublisher> bodies;546AggregatePublisher(List<BodyPublisher> bodies) {547this.bodies = bodies;548}549550// -1 must be returned if any publisher returns -1551// Otherwise, we can just sum the contents.552@Override553public long contentLength() {554long length = bodies.stream()555.mapToLong(BodyPublisher::contentLength)556.reduce((a,b) -> a < 0 || b < 0 ? -1 : a + b)557.orElse(0);558// In case of overflow in any operation but the last, length559// will be -1.560// In case of overflow in the last reduce operation, length561// will be negative, but not necessarily -1: in that case,562// return -1563if (length < 0) return -1;564return length;565}566567@Override568public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {569subscriber.onSubscribe(new AggregateSubscription(bodies, subscriber));570}571}572573private static final class AggregateSubscription574implements Flow.Subscription, Flow.Subscriber<ByteBuffer> {575final Flow.Subscriber<? super ByteBuffer> subscriber; // upstream576final Queue<BodyPublisher> bodies;577final SequentialScheduler scheduler;578final Demand demand = new Demand(); // from upstream579final Demand demanded = new Demand(); // requested downstream580final AtomicReference<Throwable> error = new AtomicReference<>();581volatile Throwable illegalRequest;582volatile BodyPublisher publisher; // downstream583volatile Flow.Subscription subscription; // downstream584volatile boolean cancelled;585AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {586this.bodies = new ConcurrentLinkedQueue<>(bodies);587this.subscriber = subscriber;588this.scheduler = SequentialScheduler.lockingScheduler(this::run);589}590591@Override592public void request(long n) {593if (cancelled || publisher == null && bodies.isEmpty()) {594return;595}596try {597demand.increase(n);598} catch (IllegalArgumentException x) {599illegalRequest = x;600}601scheduler.runOrSchedule();602}603604@Override605public void cancel() {606cancelled = true;607scheduler.runOrSchedule();608}609610private boolean cancelSubscription() {611Flow.Subscription subscription = this.subscription;612if (subscription != null) {613this.subscription = null;614this.publisher = null;615subscription.cancel();616}617scheduler.stop();618return subscription != null;619}620621public void run() {622try {623while (error.get() == null624&& (!demand.isFulfilled()625|| (publisher == null && !bodies.isEmpty()))) {626boolean cancelled = this.cancelled;627BodyPublisher publisher = this.publisher;628Flow.Subscription subscription = this.subscription;629Throwable illegalRequest = this.illegalRequest;630if (cancelled) {631bodies.clear();632cancelSubscription();633return;634}635if (publisher == null && !bodies.isEmpty()) {636this.publisher = publisher = bodies.poll();637publisher.subscribe(this);638subscription = this.subscription;639} else if (publisher == null) {640return;641}642if (illegalRequest != null) {643onError(illegalRequest);644return;645}646if (subscription == null) return;647if (!demand.isFulfilled()) {648long n = demand.decreaseAndGet(demand.get());649demanded.increase(n);650subscription.request(n);651}652}653} catch (Throwable t) {654onError(t);655}656}657658659@Override660public void onSubscribe(Flow.Subscription subscription) {661this.subscription = subscription;662scheduler.runOrSchedule();663}664665@Override666public void onNext(ByteBuffer item) {667// make sure to cancel the subscription if we receive668// an item after the subscription was cancelled or669// an error was reported.670if (cancelled || error.get() != null) {671cancelSubscription();672return;673}674demanded.tryDecrement();675subscriber.onNext(item);676}677678@Override679public void onError(Throwable throwable) {680if (error.compareAndSet(null, throwable)) {681publisher = null;682subscription = null;683subscriber.onError(throwable);684scheduler.stop();685}686}687688@Override689public void onComplete() {690if (publisher != null && !bodies.isEmpty()) {691while (!demanded.isFulfilled()) {692demand.increase(demanded.decreaseAndGet(demanded.get()));693}694publisher = null;695subscription = null;696scheduler.runOrSchedule();697} else {698publisher = null;699subscription = null;700if (!cancelled) {701subscriber.onComplete();702}703scheduler.stop();704}705}706}707}708709710