Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/PushGroup.java
41171 views
/*1* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.security.AccessControlContext;28import java.util.Objects;29import java.util.concurrent.CompletableFuture;30import java.net.http.HttpRequest;31import java.net.http.HttpResponse;32import java.net.http.HttpResponse.BodyHandler;33import java.net.http.HttpResponse.PushPromiseHandler;34import java.util.concurrent.Executor;3536import jdk.internal.net.http.common.MinimalFuture;37import jdk.internal.net.http.common.Log;3839/**40* One PushGroup object is associated with the parent Stream of the pushed41* Streams. This keeps track of all common state associated with the pushes.42*/43class PushGroup<T> {44private final HttpRequest initiatingRequest;4546final CompletableFuture<Void> noMorePushesCF;4748volatile Throwable error; // any exception that occurred during pushes4950// user's subscriber object51final PushPromiseHandler<T> pushPromiseHandler;5253private final Executor executor;5455int numberOfPushes;56int remainingPushes;57boolean noMorePushes = false;5859PushGroup(PushPromiseHandler<T> pushPromiseHandler,60HttpRequestImpl initiatingRequest,61Executor executor) {62this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), executor);63}6465// Check mainBodyHandler before calling nested constructor.66private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler,67HttpRequestImpl initiatingRequest,68CompletableFuture<HttpResponse<T>> mainResponse,69Executor executor) {70this.noMorePushesCF = new MinimalFuture<>();71this.pushPromiseHandler = pushPromiseHandler;72this.initiatingRequest = initiatingRequest;73this.executor = executor;74}7576interface Acceptor<T> {77BodyHandler<T> bodyHandler();78CompletableFuture<HttpResponse<T>> cf();79boolean accepted();80}8182private static class AcceptorImpl<T> implements Acceptor<T> {83private final Executor executor;84private volatile HttpResponse.BodyHandler<T> bodyHandler;85private volatile CompletableFuture<HttpResponse<T>> cf;8687AcceptorImpl(Executor executor) {88this.executor = executor;89}9091CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) {92Objects.requireNonNull(bodyHandler);93if (this.bodyHandler != null)94throw new IllegalStateException("non-null bodyHandler");95this.bodyHandler = bodyHandler;96cf = new MinimalFuture<>();97return cf.whenCompleteAsync((r,t) -> {}, executor);98}99100@Override public BodyHandler<T> bodyHandler() { return bodyHandler; }101102@Override public CompletableFuture<HttpResponse<T>> cf() { return cf; }103104@Override public boolean accepted() { return cf != null; }105}106107Acceptor<T> acceptPushRequest(HttpRequest pushRequest) {108AcceptorImpl<T> acceptor = new AcceptorImpl<>(executor);109try {110pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept);111} catch (Throwable t) {112if (acceptor.accepted()) {113CompletableFuture<?> cf = acceptor.cf();114cf.completeExceptionally(t);115}116throw t;117}118119synchronized (this) {120if (acceptor.accepted()) {121numberOfPushes++;122remainingPushes++;123}124return acceptor;125}126}127128// This is called when the main body response completes because it means129// no more PUSH_PROMISEs are possible130131synchronized void noMorePushes(boolean noMore) {132noMorePushes = noMore;133checkIfCompleted();134noMorePushesCF.complete(null);135}136137synchronized CompletableFuture<Void> pushesCF() {138return noMorePushesCF;139}140141synchronized boolean noMorePushes() {142return noMorePushes;143}144145synchronized void pushCompleted() {146remainingPushes--;147checkIfCompleted();148}149150synchronized void checkIfCompleted() {151if (Log.trace()) {152Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}",153remainingPushes,154(error==null)?error:error.getClass().getSimpleName(),155noMorePushes);156}157if (remainingPushes == 0 && error == null && noMorePushes) {158if (Log.trace()) {159Log.logTrace("push completed");160}161}162}163164synchronized void pushError(Throwable t) {165if (t == null) {166return;167}168this.error = t;169}170}171172173