Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java
41171 views
1
/*
2
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import jdk.internal.net.http.common.Demand;
29
import jdk.internal.net.http.common.FlowTube;
30
import jdk.internal.net.http.common.Logger;
31
import jdk.internal.net.http.common.Utils;
32
import jdk.internal.net.http.websocket.RawChannel;
33
34
import java.io.EOFException;
35
import java.io.IOException;
36
import java.lang.ref.Cleaner;
37
import java.nio.ByteBuffer;
38
import java.nio.channels.ClosedChannelException;
39
import java.nio.channels.SelectionKey;
40
import java.util.ArrayList;
41
import java.util.List;
42
import java.util.concurrent.ConcurrentLinkedQueue;
43
import java.util.concurrent.Flow;
44
import java.util.concurrent.atomic.AtomicBoolean;
45
import java.util.concurrent.atomic.AtomicReference;
46
import java.util.function.Supplier;
47
import java.lang.System.Logger.Level;
48
49
/*
50
* I/O abstraction used to implement WebSocket.
51
*
52
*/
53
public class RawChannelTube implements RawChannel {
54
55
final HttpConnection connection;
56
final FlowTube tube;
57
final WritePublisher writePublisher;
58
final ReadSubscriber readSubscriber;
59
final Supplier<ByteBuffer> initial;
60
final AtomicBoolean inited = new AtomicBoolean();
61
final AtomicBoolean outputClosed = new AtomicBoolean();
62
final AtomicBoolean inputClosed = new AtomicBoolean();
63
final AtomicBoolean closed = new AtomicBoolean();
64
final String dbgTag;
65
final Logger debug;
66
private static final Cleaner cleaner =
67
Utils.ASSERTIONSENABLED && Utils.DEBUG_WS ? Cleaner.create() : null;
68
69
RawChannelTube(HttpConnection connection,
70
Supplier<ByteBuffer> initial) {
71
this.connection = connection;
72
this.tube = connection.getConnectionFlow();
73
this.initial = initial;
74
this.writePublisher = new WritePublisher();
75
this.readSubscriber = new ReadSubscriber();
76
dbgTag = "[WebSocket] RawChannelTube(" + tube +")";
77
debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
78
connection.client().webSocketOpen();
79
connectFlows();
80
if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
81
// this is just for debug...
82
cleaner.register(this, new CleanupChecker(closed, debug));
83
}
84
}
85
86
// Make sure no back reference to RawChannelTube can exist
87
// from this class. In particular it would be dangerous
88
// to reference connection, since connection has a reference
89
// to SocketTube with which a RawChannelTube is registered.
90
// Ditto for HttpClientImpl, which might have a back reference
91
// to the connection.
92
static final class CleanupChecker implements Runnable {
93
final AtomicBoolean closed;
94
final System.Logger debug;
95
CleanupChecker(AtomicBoolean closed, System.Logger debug) {
96
this.closed = closed;
97
this.debug = debug;
98
}
99
100
@Override
101
public void run() {
102
if (!closed.get()) {
103
debug.log(Level.DEBUG,
104
"RawChannelTube was not closed before being released");
105
}
106
}
107
}
108
109
private void connectFlows() {
110
if (debug.on()) debug.log("connectFlows");
111
tube.connectFlows(writePublisher, readSubscriber);
112
}
113
114
class WriteSubscription implements Flow.Subscription {
115
final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
116
final Demand demand = new Demand();
117
volatile boolean cancelled;
118
WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
119
this.subscriber = subscriber;
120
}
121
@Override
122
public void request(long n) {
123
if (debug.on()) debug.log("WriteSubscription::request %d", n);
124
demand.increase(n);
125
RawEvent event;
126
while ((event = writePublisher.events.poll()) != null) {
127
if (debug.on()) debug.log("WriteSubscriber: handling event");
128
event.handle();
129
if (demand.isFulfilled()) break;
130
}
131
}
132
@Override
133
public void cancel() {
134
cancelled = true;
135
if (debug.on()) debug.log("WriteSubscription::cancel");
136
shutdownOutput();
137
RawEvent event;
138
while ((event = writePublisher.events.poll()) != null) {
139
if (debug.on()) debug.log("WriteSubscriber: handling event");
140
event.handle();
141
}
142
}
143
}
144
145
class WritePublisher implements FlowTube.TubePublisher {
146
final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
147
volatile WriteSubscription writeSubscription;
148
@Override
149
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
150
if (debug.on()) debug.log("WritePublisher::subscribe");
151
WriteSubscription subscription = new WriteSubscription(subscriber);
152
subscriber.onSubscribe(subscription);
153
writeSubscription = subscription;
154
}
155
}
156
157
class ReadSubscriber implements FlowTube.TubeSubscriber {
158
159
volatile Flow.Subscription readSubscription;
160
volatile boolean completed;
161
long initialRequest;
162
final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
163
final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
164
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
165
166
void checkEvents() {
167
Flow.Subscription subscription = readSubscription;
168
if (subscription != null) {
169
Throwable error = errorRef.get();
170
while (!buffers.isEmpty() || error != null || closed.get() || completed) {
171
RawEvent event = events.poll();
172
if (event == null) break;
173
if (debug.on()) debug.log("ReadSubscriber: handling event");
174
event.handle();
175
}
176
}
177
}
178
179
@Override
180
public void onSubscribe(Flow.Subscription subscription) {
181
//buffers.add(initial.get());
182
long n;
183
synchronized (this) {
184
readSubscription = subscription;
185
n = initialRequest;
186
initialRequest = 0;
187
}
188
if (debug.on()) debug.log("ReadSubscriber::onSubscribe");
189
if (n > 0) {
190
Throwable error = errorRef.get();
191
if (error == null && !closed.get() && !completed) {
192
if (debug.on()) debug.log("readSubscription: requesting " + n);
193
subscription.request(n);
194
}
195
}
196
checkEvents();
197
}
198
199
@Override
200
public void onNext(List<ByteBuffer> item) {
201
if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "
202
+ Utils.remaining(item) + " bytes");
203
buffers.addAll(item);
204
checkEvents();
205
}
206
207
@Override
208
public void onError(Throwable throwable) {
209
if (closed.get() || errorRef.compareAndSet(null, throwable)) {
210
if (debug.on()) debug.log("ReadSubscriber::onError", throwable);
211
if (buffers.isEmpty()) {
212
checkEvents();
213
shutdownInput();
214
}
215
}
216
}
217
218
@Override
219
public void onComplete() {
220
if (debug.on()) debug.log("ReadSubscriber::onComplete");
221
completed = true;
222
if (buffers.isEmpty()) {
223
checkEvents();
224
shutdownInput();
225
}
226
}
227
}
228
229
230
/*
231
* Registers given event whose callback will be called once only (i.e.
232
* register new event for each callback).
233
*
234
* Memory consistency effects: actions in a thread calling registerEvent
235
* happen-before any subsequent actions in the thread calling event.handle
236
*/
237
public void registerEvent(RawEvent event) throws IOException {
238
int interestOps = event.interestOps();
239
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
240
if (debug.on()) debug.log("register write event");
241
if (outputClosed.get()) throw new IOException("closed output");
242
writePublisher.events.add(event);
243
WriteSubscription writeSubscription = writePublisher.writeSubscription;
244
if (writeSubscription != null) {
245
while (!writeSubscription.demand.isFulfilled()) {
246
event = writePublisher.events.poll();
247
if (event == null) break;
248
event.handle();
249
}
250
}
251
}
252
if ((interestOps & SelectionKey.OP_READ) != 0) {
253
if (debug.on()) debug.log("register read event");
254
if (inputClosed.get()) throw new IOException("closed input");
255
readSubscriber.events.add(event);
256
readSubscriber.checkEvents();
257
if (readSubscriber.buffers.isEmpty()
258
&& !readSubscriber.events.isEmpty()) {
259
Flow.Subscription readSubscription =
260
readSubscriber.readSubscription;
261
if (readSubscription == null) {
262
synchronized (readSubscriber) {
263
readSubscription = readSubscriber.readSubscription;
264
if (readSubscription == null) {
265
readSubscriber.initialRequest = 1;
266
return;
267
}
268
}
269
}
270
assert readSubscription != null;
271
if (debug.on()) debug.log("readSubscription: requesting 1");
272
readSubscription.request(1);
273
}
274
}
275
}
276
277
/**
278
* Hands over the initial bytes. Once the bytes have been returned they are
279
* no longer available and the method will throw an {@link
280
* IllegalStateException} on each subsequent invocation.
281
*
282
* @return the initial bytes
283
* @throws IllegalStateException
284
* if the method has been already invoked
285
*/
286
public ByteBuffer initialByteBuffer() throws IllegalStateException {
287
if (inited.compareAndSet(false, true)) {
288
return initial.get();
289
} else throw new IllegalStateException("initial buffer already drained");
290
}
291
292
/*
293
* Returns a ByteBuffer with the data read or null if EOF is reached. Has no
294
* remaining bytes if no data available at the moment.
295
*/
296
public ByteBuffer read() throws IOException {
297
if (debug.on()) debug.log("read");
298
Flow.Subscription readSubscription = readSubscriber.readSubscription;
299
if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
300
ByteBuffer buffer = readSubscriber.buffers.poll();
301
if (buffer != null) {
302
if (debug.on()) debug.log("read: " + buffer.remaining());
303
return buffer;
304
}
305
Throwable error = readSubscriber.errorRef.get();
306
if (error != null) error = Utils.getIOException(error);
307
if (error instanceof EOFException) {
308
if (debug.on()) debug.log("read: EOFException");
309
shutdownInput();
310
return null;
311
}
312
if (error != null) {
313
if (debug.on()) debug.log("read: " + error);
314
if (closed.get()) {
315
return null;
316
}
317
shutdownInput();
318
throw Utils.getIOException(error);
319
}
320
if (readSubscriber.completed) {
321
if (debug.on()) debug.log("read: EOF");
322
shutdownInput();
323
return null;
324
}
325
if (inputClosed.get()) {
326
if (debug.on()) debug.log("read: CLOSED");
327
throw new IOException("closed output");
328
}
329
if (debug.on()) debug.log("read: nothing to read");
330
return Utils.EMPTY_BYTEBUFFER;
331
}
332
333
/*
334
* Writes a sequence of bytes to this channel from a subsequence of the
335
* given buffers.
336
*/
337
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
338
if (outputClosed.get()) {
339
if (debug.on()) debug.log("write: CLOSED");
340
throw new IOException("closed output");
341
}
342
WriteSubscription writeSubscription = writePublisher.writeSubscription;
343
if (writeSubscription == null) {
344
if (debug.on()) debug.log("write: unsubscribed: 0");
345
return 0;
346
}
347
if (writeSubscription.cancelled) {
348
if (debug.on()) debug.log("write: CANCELLED");
349
shutdownOutput();
350
throw new IOException("closed output");
351
}
352
if (writeSubscription.demand.tryDecrement()) {
353
List<ByteBuffer> buffers = copy(srcs, offset, length);
354
long res = Utils.remaining(buffers);
355
if (debug.on()) debug.log("write: writing %d", res);
356
writeSubscription.subscriber.onNext(buffers);
357
return res;
358
} else {
359
if (debug.on()) debug.log("write: no demand: 0");
360
return 0;
361
}
362
}
363
364
/**
365
* Shutdown the connection for reading without closing the channel.
366
*
367
* <p> Once shutdown for reading then further reads on the channel will
368
* return {@code null}, the end-of-stream indication. If the input side of
369
* the connection is already shutdown then invoking this method has no
370
* effect.
371
*
372
* @throws ClosedChannelException
373
* If this channel is closed
374
* @throws IOException
375
* If some other I/O error occurs
376
*/
377
public void shutdownInput() {
378
if (inputClosed.compareAndSet(false, true)) {
379
if (debug.on()) debug.log("shutdownInput");
380
// TransportImpl will eventually call RawChannel::close.
381
// We must not call it here as this would close the socket
382
// and can cause an exception to back fire before
383
// TransportImpl and WebSocketImpl have updated their state.
384
}
385
}
386
387
/**
388
* Shutdown the connection for writing without closing the channel.
389
*
390
* <p> Once shutdown for writing then further attempts to write to the
391
* channel will throw {@link ClosedChannelException}. If the output side of
392
* the connection is already shutdown then invoking this method has no
393
* effect.
394
*
395
* @throws ClosedChannelException
396
* If this channel is closed
397
* @throws IOException
398
* If some other I/O error occurs
399
*/
400
public void shutdownOutput() {
401
if (outputClosed.compareAndSet(false, true)) {
402
if (debug.on()) debug.log("shutdownOutput");
403
// TransportImpl will eventually call RawChannel::close.
404
// We must not call it here as this would close the socket
405
// and can cause an exception to back fire before
406
// TransportImpl and WebSocketImpl have updated their state.
407
}
408
}
409
410
/**
411
* Closes this channel.
412
*
413
* @throws IOException
414
* If an I/O error occurs
415
*/
416
@Override
417
public void close() {
418
if (closed.compareAndSet(false, true)) {
419
if (debug.on()) debug.log("close");
420
connection.client().webSocketClose();
421
connection.close();
422
}
423
}
424
425
private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
426
int count = Math.min(len, src.length - offset);
427
if (count <= 0) return Utils.EMPTY_BB_LIST;
428
if (count == 1) return List.of(Utils.copy(src[offset]));
429
if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
430
List<ByteBuffer> list = new ArrayList<>(count);
431
for (int i = 0; i < count; i++) {
432
list.add(Utils.copy(src[offset + i]));
433
}
434
return list;
435
}
436
}
437
438