Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java
41171 views
/*1* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.IOException;28import java.nio.ByteBuffer;29import java.util.ArrayList;30import java.util.Collections;31import java.util.List;32import java.util.function.Consumer;33import java.net.http.HttpHeaders;34import java.net.http.HttpResponse;35import jdk.internal.net.http.common.Logger;36import jdk.internal.net.http.common.Utils;37import static java.lang.String.format;3839/**40* Implements chunked/fixed transfer encodings of HTTP/1.1 responses.41*42* Call pushBody() to read the body (blocking). Data and errors are provided43* to given Consumers. After final buffer delivered, empty optional delivered44*/45class ResponseContent {4647final HttpResponse.BodySubscriber<?> pusher;48final long contentLength;49final HttpHeaders headers;50// this needs to run before we complete the body51// so that connection can be returned to pool52private final Runnable onFinished;53private final String dbgTag;5455ResponseContent(HttpConnection connection,56long contentLength,57HttpHeaders h,58HttpResponse.BodySubscriber<?> userSubscriber,59Runnable onFinished)60{61this.pusher = userSubscriber;62this.contentLength = contentLength;63this.headers = h;64this.onFinished = onFinished;65this.dbgTag = connection.dbgString() + "/ResponseContent";66}6768static final int LF = 10;69static final int CR = 13;7071private boolean chunkedContent, chunkedContentInitialized;7273boolean contentChunked() throws IOException {74if (chunkedContentInitialized) {75return chunkedContent;76}77if (contentLength == -2) {78// HTTP/1.0 content79chunkedContentInitialized = true;80chunkedContent = false;81return chunkedContent;82}83if (contentLength == -1) {84String tc = headers.firstValue("Transfer-Encoding")85.orElse("");86if (!tc.isEmpty()) {87if (tc.equalsIgnoreCase("chunked")) {88chunkedContent = true;89} else {90throw new IOException("invalid content");91}92} else {93chunkedContent = false;94}95}96chunkedContentInitialized = true;97return chunkedContent;98}99100interface BodyParser extends Consumer<ByteBuffer> {101void onSubscribe(AbstractSubscription sub);102// A current-state message suitable for inclusion in an exception103// detail message.104String currentStateMessage();105}106107// Returns a parser that will take care of parsing the received byte108// buffers and forward them to the BodySubscriber.109// When the parser is done, it will call onComplete.110// If parsing was successful, the throwable parameter will be null.111// Otherwise it will be the exception that occurred112// Note: revisit: it might be better to use a CompletableFuture than113// a completion handler.114BodyParser getBodyParser(Consumer<Throwable> onComplete)115throws IOException {116if (contentChunked()) {117return new ChunkedBodyParser(onComplete);118} else {119return contentLength == -2120? new UnknownLengthBodyParser(onComplete)121: new FixedLengthBodyParser(contentLength, onComplete);122}123}124125126static enum ChunkState {READING_LENGTH, READING_DATA, DONE}127static final int MAX_CHUNK_HEADER_SIZE = 2050;128class ChunkedBodyParser implements BodyParser {129final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;130final Consumer<Throwable> onComplete;131final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);132final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";133134volatile Throwable closedExceptionally;135volatile int partialChunklen = 0; // partially read chunk len136volatile int chunklen = -1; // number of bytes in chunk137volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF138volatile boolean cr = false; // tryReadChunkLength has found CR139volatile int chunkext = 0; // number of bytes already read in the chunk extension140volatile int digits = 0; // number of chunkLength bytes already read141volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding142volatile ChunkState state = ChunkState.READING_LENGTH; // current state143volatile AbstractSubscription sub;144ChunkedBodyParser(Consumer<Throwable> onComplete) {145this.onComplete = onComplete;146}147148String dbgString() {149return dbgTag;150}151152// best effort - we're assuming UTF-8 text and breaks at character boundaries153// for this debug output. Not called.154private void debugBuffer(ByteBuffer b) {155if (!debug.on()) return;156ByteBuffer printable = b.asReadOnlyBuffer();157byte[] bytes = new byte[printable.limit() - printable.position()];158printable.get(bytes, 0, bytes.length);159String msg = "============== accepted ==================\n";160try {161var str = new String(bytes, "UTF-8");162msg += str;163} catch (Exception x) {164msg += x;165x.printStackTrace();166}167msg += "\n==========================================\n";168debug.log(msg);169170}171172@Override173public void onSubscribe(AbstractSubscription sub) {174if (debug.on())175debug.log("onSubscribe: " + pusher.getClass().getName());176pusher.onSubscribe(this.sub = sub);177}178179@Override180public String currentStateMessage() {181return format("chunked transfer encoding, state: %s", state);182}183@Override184public void accept(ByteBuffer b) {185if (closedExceptionally != null) {186if (debug.on())187debug.log("already closed: " + closedExceptionally);188return;189}190// debugBuffer(b);191boolean completed = false;192try {193List<ByteBuffer> out = new ArrayList<>();194do {195if (tryPushOneHunk(b, out)) {196// We're done! (true if the final chunk was parsed).197if (!out.isEmpty()) {198// push what we have and complete199// only reduce demand if we actually push something.200// we would not have come here if there was no201// demand.202boolean hasDemand = sub.demand().tryDecrement();203assert hasDemand;204pusher.onNext(Collections.unmodifiableList(out));205if (debug.on()) debug.log("Chunks sent");206}207if (debug.on()) debug.log("done!");208assert closedExceptionally == null;209assert state == ChunkState.DONE;210onFinished.run();211pusher.onComplete();212if (debug.on()) debug.log("subscriber completed");213completed = true;214onComplete.accept(closedExceptionally); // should be null215break;216}217// the buffer may contain several hunks, and therefore218// we must loop while it's not exhausted.219} while (b.hasRemaining());220221if (!completed && !out.isEmpty()) {222// push what we have.223// only reduce demand if we actually push something.224// we would not have come here if there was no225// demand.226boolean hasDemand = sub.demand().tryDecrement();227assert hasDemand;228pusher.onNext(Collections.unmodifiableList(out));229if (debug.on()) debug.log("Chunk sent");230}231assert state == ChunkState.DONE || !b.hasRemaining();232} catch(Throwable t) {233if (debug.on())234debug.log("Error while processing buffer: %s", (Object)t );235closedExceptionally = t;236if (!completed) onComplete.accept(t);237}238}239240// reads and returns chunklen. Position of chunkbuf is first byte241// of chunk on return. chunklen includes the CR LF at end of chunk242// returns -1 if needs more bytes243private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {244assert state == ChunkState.READING_LENGTH;245while (chunkbuf.hasRemaining()) {246if (chunkext + digits >= MAX_CHUNK_HEADER_SIZE) {247throw new IOException("Chunk header size too long: " + (chunkext + digits));248}249int c = chunkbuf.get();250if (cr) {251if (c == LF) {252return partialChunklen;253} else {254throw new IOException("invalid chunk header");255}256}257if (c == CR) {258cr = true;259if (digits == 0 && debug.on()) {260debug.log("tryReadChunkLen: invalid chunk header? No digits in chunkLen?");261}262} else if (cr == false && chunkext > 0) {263// we have seen a non digit character after the chunk length.264// skip anything until CR is found.265chunkext++;266if (debug.on()) {267debug.log("tryReadChunkLen: More extraneous character after chunk length: " + c);268}269} else {270int digit = toDigit(c);271if (digit < 0) {272if (digits > 0) {273// first non-digit character after chunk length.274// skip anything until CR is found.275chunkext++;276if (debug.on()) {277debug.log("tryReadChunkLen: Extraneous character after chunk length: " + c);278}279} else {280// there should be at list one digit in chunk length281throw new IOException("Illegal character in chunk size: " + c);282}283} else {284digits++;285partialChunklen = partialChunklen * 16 + digit;286}287}288}289return -1;290}291292293// try to consume as many bytes as specified by bytesToConsume.294// returns the number of bytes that still need to be consumed.295// In practice this method is only called to consume one CRLF pair296// with bytesToConsume set to 2, so it will only return 0 (if completed),297// 1, or 2 (if chunkbuf doesn't have the 2 chars).298private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {299int n = bytesToConsume;300if (n > 0) {301int e = Math.min(chunkbuf.remaining(), n);302303// verifies some assertions304// this methods is called only to consume CRLF305if (Utils.ASSERTIONSENABLED) {306assert n <= 2 && e <= 2;307ByteBuffer tmp = chunkbuf.slice();308// if n == 2 assert that we will first consume CR309assert (n == 2 && e > 0) ? tmp.get() == CR : true;310// if n == 1 || n == 2 && e == 2 assert that we then consume LF311assert (n == 1 || e == 2) ? tmp.get() == LF : true;312}313314chunkbuf.position(chunkbuf.position() + e);315n -= e;316bytesToConsume = n;317}318assert n >= 0;319return n;320}321322/**323* Returns a ByteBuffer containing chunk of data or a "hunk" of data324* (a chunk of a chunk if the chunk size is larger than our ByteBuffers).325* If the given chunk does not have enough data this method return326* an empty ByteBuffer (READMORE).327* If we encounter the final chunk (an empty chunk) this method328* returns null.329*/330ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {331int unfulfilled = bytesremaining;332int toconsume = bytesToConsume;333ChunkState st = state;334if (st == ChunkState.READING_LENGTH && chunklen == -1) {335if (debug.on()) debug.log(() -> "Trying to read chunk len"336+ " (remaining in buffer:"+chunk.remaining()+")");337int clen = chunklen = tryReadChunkLen(chunk);338if (clen == -1) return READMORE;339digits = chunkext = 0;340if (debug.on()) debug.log("Got chunk len %d", clen);341cr = false; partialChunklen = 0;342unfulfilled = bytesremaining = clen;343if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk344else st = state = ChunkState.READING_DATA; // read the data345}346347if (toconsume > 0) {348if (debug.on())349debug.log("Trying to consume bytes: %d (remaining in buffer: %s)",350toconsume, chunk.remaining());351if (tryConsumeBytes(chunk) > 0) {352return READMORE;353}354}355356toconsume = bytesToConsume;357assert toconsume == 0;358359360if (st == ChunkState.READING_LENGTH) {361// we will come here only if chunklen was 0, after having362// consumed the trailing CRLF363int clen = chunklen;364assert clen == 0;365if (debug.on()) debug.log("No more chunks: %d", clen);366// the DONE state is not really needed but it helps with367// assertions...368state = ChunkState.DONE;369return null;370}371372int clen = chunklen;373assert clen > 0;374assert st == ChunkState.READING_DATA;375376ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk377if (unfulfilled > 0) {378int bytesread = chunk.remaining();379if (debug.on())380debug.log("Reading chunk: available %d, needed %d",381bytesread, unfulfilled);382383int bytes2return = Math.min(bytesread, unfulfilled);384if (debug.on())385debug.log( "Returning chunk bytes: %d", bytes2return);386returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();387unfulfilled = bytesremaining -= bytes2return;388if (unfulfilled == 0) bytesToConsume = 2;389}390391assert unfulfilled >= 0;392393if (unfulfilled == 0) {394if (debug.on())395debug.log("No more bytes to read - %d yet to consume.",396unfulfilled);397// check whether the trailing CRLF is consumed, try to398// consume it if not. If tryConsumeBytes needs more bytes399// then we will come back here later - skipping the block400// that reads data because remaining==0, and finding401// that the two bytes are now consumed.402if (tryConsumeBytes(chunk) == 0) {403// we're done for this chunk! reset all states and404// prepare to read the next chunk.405chunklen = -1;406partialChunklen = 0;407cr = false;408digits = chunkext = 0;409state = ChunkState.READING_LENGTH;410if (debug.on()) debug.log("Ready to read next chunk");411}412}413if (returnBuffer == READMORE) {414if (debug.on()) debug.log("Need more data");415}416return returnBuffer;417}418419420// Attempt to parse and push one hunk from the buffer.421// Returns true if the final chunk was parsed.422// Returns false if we need to push more chunks.423private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)424throws IOException {425assert state != ChunkState.DONE;426ByteBuffer b1 = tryReadOneHunk(b);427if (b1 != null) {428//assert b1.hasRemaining() || b1 == READMORE;429if (b1.hasRemaining()) {430if (debug.on())431debug.log("Sending chunk to consumer (%d)", b1.remaining());432out.add(b1);433}434return false; // we haven't parsed the final chunk yet.435} else {436return true; // we're done! the final chunk was parsed.437}438}439440private int toDigit(int b) throws IOException {441if (b >= 0x30 && b <= 0x39) {442return b - 0x30;443}444if (b >= 0x41 && b <= 0x46) {445return b - 0x41 + 10;446}447if (b >= 0x61 && b <= 0x66) {448return b - 0x61 + 10;449}450return -1;451}452453}454455class UnknownLengthBodyParser implements BodyParser {456final Consumer<Throwable> onComplete;457final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);458final String dbgTag = ResponseContent.this.dbgTag + "/UnknownLengthBodyParser";459volatile Throwable closedExceptionally;460volatile AbstractSubscription sub;461volatile int breceived = 0;462463UnknownLengthBodyParser(Consumer<Throwable> onComplete) {464this.onComplete = onComplete;465}466467String dbgString() {468return dbgTag;469}470471@Override472public void onSubscribe(AbstractSubscription sub) {473if (debug.on())474debug.log("onSubscribe: " + pusher.getClass().getName());475pusher.onSubscribe(this.sub = sub);476}477478@Override479public String currentStateMessage() {480return format("http1_0 content, bytes received: %d", breceived);481}482483@Override484public void accept(ByteBuffer b) {485if (closedExceptionally != null) {486if (debug.on())487debug.log("already closed: " + closedExceptionally);488return;489}490boolean completed = false;491try {492if (debug.on())493debug.log("Parser got %d bytes ", b.remaining());494495if (b.hasRemaining()) {496// only reduce demand if we actually push something.497// we would not have come here if there was no498// demand.499boolean hasDemand = sub.demand().tryDecrement();500assert hasDemand;501breceived += b.remaining();502pusher.onNext(List.of(b.asReadOnlyBuffer()));503}504} catch (Throwable t) {505if (debug.on()) debug.log("Unexpected exception", t);506closedExceptionally = t;507if (!completed) {508onComplete.accept(t);509}510}511}512513/**514* Must be called externally when connection has closed515* and therefore no more bytes can be read516*/517public void complete() {518// We're done! All data has been received.519if (debug.on())520debug.log("Parser got all expected bytes: completing");521assert closedExceptionally == null;522onFinished.run();523pusher.onComplete();524onComplete.accept(closedExceptionally); // should be null525}526}527528class FixedLengthBodyParser implements BodyParser {529final long contentLength;530final Consumer<Throwable> onComplete;531final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);532final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";533volatile long remaining;534volatile Throwable closedExceptionally;535volatile AbstractSubscription sub;536FixedLengthBodyParser(long contentLength, Consumer<Throwable> onComplete) {537this.contentLength = this.remaining = contentLength;538this.onComplete = onComplete;539}540541String dbgString() {542return dbgTag;543}544545@Override546public void onSubscribe(AbstractSubscription sub) {547if (debug.on())548debug.log("length=" + contentLength +", onSubscribe: "549+ pusher.getClass().getName());550pusher.onSubscribe(this.sub = sub);551try {552if (contentLength == 0) {553onFinished.run();554pusher.onComplete();555onComplete.accept(null);556}557} catch (Throwable t) {558closedExceptionally = t;559try {560pusher.onError(t);561} finally {562onComplete.accept(t);563}564}565}566567@Override568public String currentStateMessage() {569return format("fixed content-length: %d, bytes received: %d",570contentLength, contentLength - remaining);571}572573@Override574public void accept(ByteBuffer b) {575if (closedExceptionally != null) {576if (debug.on())577debug.log("already closed: " + closedExceptionally);578return;579}580boolean completed = false;581try {582long unfulfilled = remaining;583if (debug.on())584debug.log("Parser got %d bytes (%d remaining / %d)",585b.remaining(), unfulfilled, contentLength);586assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;587588if (unfulfilled == 0 && contentLength > 0) return;589590if (b.hasRemaining() && unfulfilled > 0) {591// only reduce demand if we actually push something.592// we would not have come here if there was no593// demand.594boolean hasDemand = sub.demand().tryDecrement();595assert hasDemand;596int amount = (int)Math.min(b.remaining(), unfulfilled); // safe cast597unfulfilled = remaining -= amount;598ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);599pusher.onNext(List.of(buffer.asReadOnlyBuffer()));600}601if (unfulfilled == 0) {602// We're done! All data has been received.603if (debug.on())604debug.log("Parser got all expected bytes: completing");605assert closedExceptionally == null;606onFinished.run();607pusher.onComplete();608completed = true;609onComplete.accept(closedExceptionally); // should be null610} else {611assert b.remaining() == 0;612}613} catch (Throwable t) {614if (debug.on()) debug.log("Unexpected exception", t);615closedExceptionally = t;616if (!completed) {617onComplete.accept(t);618}619}620}621}622}623624625