Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java
41149 views
1
/*
2
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
import java.io.IOException;
25
import java.io.InputStream;
26
import java.io.OutputStream;
27
import java.net.InetAddress;
28
import java.net.InetSocketAddress;
29
import java.net.URI;
30
import java.nio.ByteBuffer;
31
import java.nio.MappedByteBuffer;
32
import java.util.Arrays;
33
import java.util.concurrent.Flow;
34
import java.util.concurrent.Flow.Publisher;
35
import java.util.concurrent.atomic.AtomicBoolean;
36
import java.util.concurrent.atomic.AtomicInteger;
37
import java.util.concurrent.atomic.AtomicLong;
38
import com.sun.net.httpserver.HttpExchange;
39
import com.sun.net.httpserver.HttpHandler;
40
import com.sun.net.httpserver.HttpServer;
41
import com.sun.net.httpserver.HttpsConfigurator;
42
import com.sun.net.httpserver.HttpsServer;
43
import java.net.http.HttpClient;
44
import java.net.http.HttpRequest;
45
import java.net.http.HttpResponse;
46
import jdk.test.lib.net.SimpleSSLContext;
47
import org.testng.annotations.AfterTest;
48
import org.testng.annotations.BeforeTest;
49
import org.testng.annotations.DataProvider;
50
import org.testng.annotations.Test;
51
import javax.net.ssl.SSLContext;
52
import static java.util.stream.Collectors.joining;
53
import static java.nio.charset.StandardCharsets.UTF_8;
54
import static java.net.http.HttpRequest.BodyPublishers.fromPublisher;
55
import static java.net.http.HttpResponse.BodyHandlers.ofString;
56
import static org.testng.Assert.assertEquals;
57
import static org.testng.Assert.assertThrows;
58
import static org.testng.Assert.assertTrue;
59
import static org.testng.Assert.fail;
60
61
/*
62
* @test
63
* @summary Basic tests for Flow adapter Publishers
64
* @modules java.base/sun.net.www.http
65
* java.net.http/jdk.internal.net.http.common
66
* java.net.http/jdk.internal.net.http.frame
67
* java.net.http/jdk.internal.net.http.hpack
68
* java.logging
69
* jdk.httpserver
70
* @library /test/lib http2/server
71
* @build Http2TestServer
72
* @build jdk.test.lib.net.SimpleSSLContext
73
* @run testng/othervm FlowAdapterPublisherTest
74
*/
75
76
public class FlowAdapterPublisherTest {
77
78
SSLContext sslContext;
79
HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]
80
HttpsServer httpsTestServer; // HTTPS/1.1
81
Http2TestServer http2TestServer; // HTTP/2 ( h2c )
82
Http2TestServer https2TestServer; // HTTP/2 ( h2 )
83
String httpURI;
84
String httpsURI;
85
String http2URI;
86
String https2URI;
87
88
@DataProvider(name = "uris")
89
public Object[][] variants() {
90
return new Object[][]{
91
{ httpURI },
92
{ httpsURI },
93
{ http2URI },
94
{ https2URI },
95
};
96
}
97
98
static final Class<NullPointerException> NPE = NullPointerException.class;
99
static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
100
101
@Test
102
public void testAPIExceptions() {
103
assertThrows(NPE, () -> fromPublisher(null));
104
assertThrows(NPE, () -> fromPublisher(null, 1));
105
assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0));
106
assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1));
107
assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE));
108
109
Publisher publisher = fromPublisher(new BBPublisher());
110
assertThrows(NPE, () -> publisher.subscribe(null));
111
}
112
113
// Flow.Publisher<ByteBuffer>
114
115
@Test(dataProvider = "uris")
116
void testByteBufferPublisherUnknownLength(String url) {
117
String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
118
"when the ", "rain gets ", "warmer." };
119
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
120
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
121
.POST(fromPublisher(new BBPublisher(body))).build();
122
123
HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();
124
String text = response.body();
125
System.out.println(text);
126
assertEquals(response.statusCode(), 200);
127
assertEquals(text, Arrays.stream(body).collect(joining()));
128
}
129
130
@Test(dataProvider = "uris")
131
void testByteBufferPublisherFixedLength(String url) {
132
String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
133
"when the ", "rain gets ", "warmer." };
134
int cl = Arrays.stream(body).mapToInt(String::length).sum();
135
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
136
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
137
.POST(fromPublisher(new BBPublisher(body), cl)).build();
138
139
HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();
140
String text = response.body();
141
System.out.println(text);
142
assertEquals(response.statusCode(), 200);
143
assertEquals(text, Arrays.stream(body).collect(joining()));
144
}
145
146
// Flow.Publisher<MappedByteBuffer>
147
148
@Test(dataProvider = "uris")
149
void testMappedByteBufferPublisherUnknownLength(String url) {
150
String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
151
"Irish from ", "ruling the ", "world." };
152
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
153
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
154
.POST(fromPublisher(new MBBPublisher(body))).build();
155
156
HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();
157
String text = response.body();
158
System.out.println(text);
159
assertEquals(response.statusCode(), 200);
160
assertEquals(text, Arrays.stream(body).collect(joining()));
161
}
162
163
@Test(dataProvider = "uris")
164
void testMappedByteBufferPublisherFixedLength(String url) {
165
String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
166
"Irish from ", "ruling the ", "world." };
167
int cl = Arrays.stream(body).mapToInt(String::length).sum();
168
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
169
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
170
.POST(fromPublisher(new MBBPublisher(body), cl)).build();
171
172
HttpResponse<String> response = client.sendAsync(request, ofString(UTF_8)).join();
173
String text = response.body();
174
System.out.println(text);
175
assertEquals(response.statusCode(), 200);
176
assertEquals(text, Arrays.stream(body).collect(joining()));
177
}
178
179
// The following two tests depend on Exception detail messages, which is
180
// not ideal, but necessary to discern correct behavior. They should be
181
// updated if the exception message is updated.
182
183
@Test(dataProvider = "uris")
184
void testPublishTooFew(String url) throws InterruptedException {
185
String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
186
"when the ", "rain gets ", "warmer." };
187
int cl = Arrays.stream(body).mapToInt(String::length).sum() + 1; // length + 1
188
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
189
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
190
.POST(fromPublisher(new BBPublisher(body), cl)).build();
191
192
try {
193
HttpResponse<String> response = client.send(request, ofString(UTF_8));
194
fail("Unexpected response: " + response);
195
} catch (IOException expected) {
196
assertMessage(expected, "Too few bytes returned");
197
}
198
}
199
200
@Test(dataProvider = "uris")
201
void testPublishTooMany(String url) throws InterruptedException {
202
String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
203
"when the ", "rain gets ", "warmer." };
204
int cl = Arrays.stream(body).mapToInt(String::length).sum() - 1; // length - 1
205
HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
206
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
207
.POST(fromPublisher(new BBPublisher(body), cl)).build();
208
209
try {
210
HttpResponse<String> response = client.send(request, ofString(UTF_8));
211
fail("Unexpected response: " + response);
212
} catch (IOException expected) {
213
assertMessage(expected, "Too many bytes in request body");
214
}
215
}
216
217
private void assertMessage(Throwable t, String contains) {
218
if (!t.getMessage().contains(contains)) {
219
String error = "Exception message:[" + t.toString() + "] doesn't contain [" + contains + "]";
220
throw new AssertionError(error, t);
221
}
222
}
223
224
static class BBPublisher extends AbstractPublisher
225
implements Flow.Publisher<ByteBuffer>
226
{
227
BBPublisher(String... bodyParts) { super(bodyParts); }
228
229
@Override
230
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
231
this.subscriber = subscriber;
232
subscriber.onSubscribe(new InternalSubscription());
233
}
234
}
235
236
static class MBBPublisher extends AbstractPublisher
237
implements Flow.Publisher<MappedByteBuffer>
238
{
239
MBBPublisher(String... bodyParts) { super(bodyParts); }
240
241
@Override
242
public void subscribe(Flow.Subscriber<? super MappedByteBuffer> subscriber) {
243
this.subscriber = subscriber;
244
subscriber.onSubscribe(new InternalSubscription());
245
}
246
}
247
248
static abstract class AbstractPublisher {
249
private final String[] bodyParts;
250
protected volatile Flow.Subscriber subscriber;
251
252
AbstractPublisher(String... bodyParts) {
253
this.bodyParts = bodyParts;
254
}
255
256
class InternalSubscription implements Flow.Subscription {
257
258
private final AtomicLong demand = new AtomicLong();
259
private final AtomicBoolean cancelled = new AtomicBoolean();
260
private volatile int position;
261
262
private static final int IDLE = 1;
263
private static final int PUSHING = 2;
264
private static final int AGAIN = 4;
265
private final AtomicInteger state = new AtomicInteger(IDLE);
266
267
@Override
268
public void request(long n) {
269
if (n <= 0L) {
270
subscriber.onError(new IllegalArgumentException(
271
"non-positive subscription request"));
272
return;
273
}
274
if (cancelled.get()) {
275
return;
276
}
277
278
while (true) {
279
long prev = demand.get(), d;
280
if ((d = prev + n) < prev) // saturate
281
d = Long.MAX_VALUE;
282
if (demand.compareAndSet(prev, d))
283
break;
284
}
285
286
while (true) {
287
int s = state.get();
288
if (s == IDLE) {
289
if (state.compareAndSet(IDLE, PUSHING)) {
290
while (true) {
291
push();
292
if (state.compareAndSet(PUSHING, IDLE))
293
return;
294
else if (state.compareAndSet(AGAIN, PUSHING))
295
continue;
296
}
297
}
298
} else if (s == PUSHING) {
299
if (state.compareAndSet(PUSHING, AGAIN))
300
return;
301
} else if (s == AGAIN){
302
// do nothing, the pusher will already rerun
303
return;
304
} else {
305
throw new AssertionError("Unknown state:" + s);
306
}
307
}
308
}
309
310
private void push() {
311
long prev;
312
while ((prev = demand.get()) > 0) {
313
if (!demand.compareAndSet(prev, prev -1))
314
continue;
315
316
int index = position;
317
if (index < bodyParts.length) {
318
position++;
319
subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8)));
320
}
321
}
322
323
if (position == bodyParts.length && !cancelled.get()) {
324
cancelled.set(true);
325
subscriber.onComplete();
326
}
327
}
328
329
@Override
330
public void cancel() {
331
if (cancelled.compareAndExchange(false, true))
332
return; // already cancelled
333
}
334
}
335
}
336
337
static String serverAuthority(HttpServer server) {
338
return InetAddress.getLoopbackAddress().getHostName() + ":"
339
+ server.getAddress().getPort();
340
}
341
342
@BeforeTest
343
public void setup() throws Exception {
344
sslContext = new SimpleSSLContext().get();
345
if (sslContext == null)
346
throw new AssertionError("Unexpected null sslContext");
347
348
InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(),0);
349
httpTestServer = HttpServer.create(sa, 0);
350
httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
351
httpURI = "http://" + serverAuthority(httpTestServer) + "/http1/echo";
352
353
httpsTestServer = HttpsServer.create(sa, 0);
354
httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
355
httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
356
httpsURI = "https://" + serverAuthority(httpsTestServer) + "/https1/echo";
357
358
http2TestServer = new Http2TestServer("localhost", false, 0);
359
http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
360
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo";
361
362
https2TestServer = new Http2TestServer("localhost", true, sslContext);
363
https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
364
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo";
365
366
httpTestServer.start();
367
httpsTestServer.start();
368
http2TestServer.start();
369
https2TestServer.start();
370
}
371
372
@AfterTest
373
public void teardown() throws Exception {
374
httpTestServer.stop(0);
375
httpsTestServer.stop(0);
376
http2TestServer.stop();
377
https2TestServer.stop();
378
}
379
380
static class Http1EchoHandler implements HttpHandler {
381
@Override
382
public void handle(HttpExchange t) throws IOException {
383
try (InputStream is = t.getRequestBody();
384
OutputStream os = t.getResponseBody()) {
385
byte[] bytes = is.readAllBytes();
386
t.sendResponseHeaders(200, bytes.length);
387
os.write(bytes);
388
}
389
}
390
}
391
392
static class Http2EchoHandler implements Http2Handler {
393
@Override
394
public void handle(Http2TestExchange t) throws IOException {
395
try (InputStream is = t.getRequestBody();
396
OutputStream os = t.getResponseBody()) {
397
byte[] bytes = is.readAllBytes();
398
t.sendResponseHeaders(200, bytes.length);
399
os.write(bytes);
400
}
401
}
402
}
403
}
404
405