Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java
41171 views
/*1* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.nio.ByteBuffer;28import java.nio.CharBuffer;29import java.nio.charset.CharacterCodingException;30import java.nio.charset.Charset;31import java.nio.charset.CharsetDecoder;32import java.nio.charset.CoderResult;33import java.nio.charset.CodingErrorAction;34import java.util.List;35import java.util.Objects;36import java.util.concurrent.CompletableFuture;37import java.util.concurrent.CompletionStage;38import java.util.concurrent.ConcurrentLinkedDeque;39import java.util.concurrent.Flow;40import java.util.concurrent.Flow.Subscriber;41import java.util.concurrent.Flow.Subscription;42import java.util.concurrent.atomic.AtomicBoolean;43import java.util.concurrent.atomic.AtomicLong;44import java.util.concurrent.atomic.AtomicReference;45import java.util.function.Function;46import jdk.internal.net.http.common.Demand;47import java.net.http.HttpResponse.BodySubscriber;48import jdk.internal.net.http.common.MinimalFuture;49import jdk.internal.net.http.common.SequentialScheduler;5051/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */52public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>53implements BodySubscriber<R> {54private final CompletableFuture<R> cf = new MinimalFuture<>();55private final S subscriber;56private final Function<? super S, ? extends R> finisher;57private final Charset charset;58private final String eol;59private final AtomicBoolean subscribed = new AtomicBoolean();60private volatile LineSubscription downstream;6162private LineSubscriberAdapter(S subscriber,63Function<? super S, ? extends R> finisher,64Charset charset,65String eol) {66if (eol != null && eol.isEmpty())67throw new IllegalArgumentException("empty line separator");68this.subscriber = Objects.requireNonNull(subscriber);69this.finisher = Objects.requireNonNull(finisher);70this.charset = Objects.requireNonNull(charset);71this.eol = eol;72}7374@Override75public void onSubscribe(Subscription subscription) {76Objects.requireNonNull(subscription);77if (!subscribed.compareAndSet(false, true)) {78subscription.cancel();79return;80}8182downstream = LineSubscription.create(subscription,83charset,84eol,85subscriber,86cf);87subscriber.onSubscribe(downstream);88}8990@Override91public void onNext(List<ByteBuffer> item) {92Objects.requireNonNull(item);93try {94downstream.submit(item);95} catch (Throwable t) {96onError(t);97}98}99100@Override101public void onError(Throwable throwable) {102Objects.requireNonNull(throwable);103try {104downstream.signalError(throwable);105} finally {106cf.completeExceptionally(throwable);107}108}109110@Override111public void onComplete() {112try {113downstream.signalComplete();114} finally {115cf.complete(finisher.apply(subscriber));116}117}118119@Override120public CompletionStage<R> getBody() {121return cf;122}123124public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>125create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)126{127if (eol != null && eol.isEmpty())128throw new IllegalArgumentException("empty line separator");129return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),130Objects.requireNonNull(finisher),131Objects.requireNonNull(charset),132eol);133}134135static final class LineSubscription implements Flow.Subscription {136final Flow.Subscription upstreamSubscription;137final CharsetDecoder decoder;138final String newline;139final Demand downstreamDemand;140final ConcurrentLinkedDeque<ByteBuffer> queue;141final SequentialScheduler scheduler;142final Flow.Subscriber<? super String> upstream;143final CompletableFuture<?> cf;144private final AtomicReference<Throwable> errorRef = new AtomicReference<>();145private final AtomicLong demanded = new AtomicLong();146private volatile boolean completed;147private volatile boolean cancelled;148149private final char[] chars = new char[1024];150private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);151private final CharBuffer buffer = CharBuffer.wrap(chars);152private final StringBuilder builder = new StringBuilder();153private String nextLine;154155private LineSubscription(Flow.Subscription s,156CharsetDecoder dec,157String separator,158Flow.Subscriber<? super String> subscriber,159CompletableFuture<?> completion) {160downstreamDemand = new Demand();161queue = new ConcurrentLinkedDeque<>();162upstreamSubscription = Objects.requireNonNull(s);163decoder = Objects.requireNonNull(dec);164newline = separator;165upstream = Objects.requireNonNull(subscriber);166cf = Objects.requireNonNull(completion);167scheduler = SequentialScheduler.lockingScheduler(this::loop);168}169170@Override171public void request(long n) {172if (cancelled) return;173if (downstreamDemand.increase(n)) {174scheduler.runOrSchedule();175}176}177178@Override179public void cancel() {180cancelled = true;181upstreamSubscription.cancel();182}183184public void submit(List<ByteBuffer> list) {185queue.addAll(list);186demanded.decrementAndGet();187scheduler.runOrSchedule();188}189190public void signalComplete() {191completed = true;192scheduler.runOrSchedule();193}194195public void signalError(Throwable error) {196if (errorRef.compareAndSet(null,197Objects.requireNonNull(error))) {198scheduler.runOrSchedule();199}200}201202// This method looks at whether some bytes where left over (in leftover)203// from decoding the previous buffer when the previous buffer was in204// underflow. If so, it takes bytes one by one from the new buffer 'in'205// and combines them with the leftover bytes until 'in' is exhausted or a206// character was produced in 'out', resolving the previous underflow.207// Returns true if the buffer is still in underflow, false otherwise.208// However, in both situation some chars might have been produced in 'out'.209private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)210throws CharacterCodingException {211int limit = leftover.position();212if (limit == 0) {213// no leftover214return false;215} else {216CoderResult res = null;217while (in.hasRemaining()) {218leftover.position(limit);219leftover.limit(++limit);220leftover.put(in.get());221leftover.position(0);222res = decoder.decode(leftover, out,223endOfInput && !in.hasRemaining());224int remaining = leftover.remaining();225if (remaining > 0) {226assert leftover.position() == 0;227leftover.position(remaining);228} else {229leftover.position(0);230}231leftover.limit(leftover.capacity());232if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {233continue;234}235if (res.isError()) {236res.throwException();237}238assert !res.isOverflow();239return false;240}241return !endOfInput;242}243}244245// extract characters from start to end and remove them from246// the StringBuilder247private static String take(StringBuilder b, int start, int end) {248assert start == 0;249String line;250if (end == start) return "";251line = b.substring(start, end);252b.delete(start, end);253return line;254}255256// finds end of line, returns -1 if not found, or the position after257// the line delimiter if found, removing the delimiter in the process.258private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {259int len = b.length();260if (eol != null) { // delimiter explicitly specified261int i = b.indexOf(eol);262if (i >= 0) {263// remove the delimiter and returns the position264// of the char after it.265b.delete(i, i + eol.length());266return i;267}268} else { // no delimiter specified, behaves as BufferedReader::readLine269boolean crfound = false;270for (int i = 0; i < len; i++) {271char c = b.charAt(i);272if (c == '\n') {273// '\n' or '\r\n' found.274// remove the delimiter and returns the position275// of the char after it.276b.delete(crfound ? i - 1 : i, i + 1);277return crfound ? i - 1 : i;278} else if (crfound) {279// previous char was '\r', c != '\n'280assert i != 0;281// remove the delimiter and returns the position282// of the char after it.283b.delete(i - 1, i);284return i - 1;285}286crfound = c == '\r';287}288if (crfound && endOfInput) {289// remove the delimiter and returns the position290// of the char after it.291b.delete(len - 1, len);292return len - 1;293}294}295return endOfInput && len > 0 ? len : -1;296}297298// Looks at whether the StringBuilder contains a line.299// Returns null if more character are needed.300private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {301int next = endOfLine(b, eol, endOfInput);302return (next > -1) ? take(b, 0, next) : null;303}304305// Attempts to read the next line. Returns the next line if306// the delimiter was found, null otherwise. The delimiters are307// consumed.308private String nextLine()309throws CharacterCodingException {310assert nextLine == null;311LINES:312while (nextLine == null) {313boolean endOfInput = completed && queue.isEmpty();314nextLine = nextLine(builder, newline,315endOfInput && leftover.position() == 0);316if (nextLine != null) return nextLine;317ByteBuffer b;318BUFFERS:319while ((b = queue.peek()) != null) {320if (!b.hasRemaining()) {321queue.poll();322continue BUFFERS;323}324BYTES:325while (b.hasRemaining()) {326buffer.position(0);327buffer.limit(buffer.capacity());328boolean endofInput = completed && queue.size() <= 1;329if (isUnderFlow(b, buffer, endofInput)) {330assert !b.hasRemaining();331if (buffer.position() > 0) {332buffer.flip();333builder.append(buffer);334}335continue BUFFERS;336}337CoderResult res = decoder.decode(b, buffer, endofInput);338if (res.isError()) res.throwException();339if (buffer.position() > 0) {340buffer.flip();341builder.append(buffer);342continue LINES;343}344if (res.isUnderflow() && b.hasRemaining()) {345//System.out.println("underflow: adding " + b.remaining() + " bytes");346leftover.put(b);347assert !b.hasRemaining();348continue BUFFERS;349}350}351}352353assert queue.isEmpty();354if (endOfInput) {355// Time to cleanup: there may be some undecoded leftover bytes356// We need to flush them out.357// The decoder has been configured to replace malformed/unmappable358// chars with some replacement, in order to behave like359// InputStreamReader.360leftover.flip();361buffer.position(0);362buffer.limit(buffer.capacity());363364// decode() must be called just before flush, even if there365// is nothing to decode. We must do this even if leftover366// has no remaining bytes.367CoderResult res = decoder.decode(leftover, buffer, endOfInput);368if (buffer.position() > 0) {369buffer.flip();370builder.append(buffer);371}372if (res.isError()) res.throwException();373374// Now call decoder.flush()375buffer.position(0);376buffer.limit(buffer.capacity());377res = decoder.flush(buffer);378if (buffer.position() > 0) {379buffer.flip();380builder.append(buffer);381}382if (res.isError()) res.throwException();383384// It's possible that we reach here twice - just for the385// purpose of checking that no bytes were left over, so386// we reset leftover/decoder to make the function reentrant.387leftover.position(0);388leftover.limit(leftover.capacity());389decoder.reset();390391// if some chars were produced then this call will392// return them.393return nextLine = nextLine(builder, newline, endOfInput);394}395return null;396}397return null;398}399400// The main sequential scheduler loop.401private void loop() {402try {403while (!cancelled) {404Throwable error = errorRef.get();405if (error != null) {406cancelled = true;407scheduler.stop();408upstream.onError(error);409cf.completeExceptionally(error);410return;411}412if (nextLine == null) nextLine = nextLine();413if (nextLine == null) {414if (completed) {415scheduler.stop();416if (leftover.position() != 0) {417// Underflow: not all bytes could be418// decoded, but no more bytes will be coming.419// This should not happen as we should already420// have got a MalformedInputException, or421// replaced the unmappable chars.422errorRef.compareAndSet(null,423new IllegalStateException(424"premature end of input ("425+ leftover.position()426+ " undecoded bytes)"));427continue;428} else {429upstream.onComplete();430}431return;432} else if (demanded.get() == 0433&& !downstreamDemand.isFulfilled()) {434long incr = Math.max(1, downstreamDemand.get());435demanded.addAndGet(incr);436upstreamSubscription.request(incr);437continue;438} else return;439}440assert nextLine != null;441assert newline != null && !nextLine.endsWith(newline)442|| !nextLine.endsWith("\n") || !nextLine.endsWith("\r");443if (downstreamDemand.tryDecrement()) {444String forward = nextLine;445nextLine = null;446upstream.onNext(forward);447} else return; // no demand: come back later448}449} catch (Throwable t) {450try {451upstreamSubscription.cancel();452} finally {453signalError(t);454}455}456}457458static LineSubscription create(Flow.Subscription s,459Charset charset,460String lineSeparator,461Flow.Subscriber<? super String> upstream,462CompletableFuture<?> cf) {463return new LineSubscription(Objects.requireNonNull(s),464Objects.requireNonNull(charset).newDecoder()465// use the same decoder configuration than466// java.io.InputStreamReader467.onMalformedInput(CodingErrorAction.REPLACE)468.onUnmappableCharacter(CodingErrorAction.REPLACE),469lineSeparator,470Objects.requireNonNull(upstream),471Objects.requireNonNull(cf));472}473}474}475476477478