Path: blob/master/test/jdk/sun/net/www/httptest/TestHttpServer.java
41154 views
/*1* Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.net.*;24import java.io.*;25import java.nio.*;26import java.nio.channels.*;27import sun.net.www.MessageHeader;28import java.util.*;2930/**31* This class implements a simple HTTP server. It uses multiple threads to32* handle connections in parallel, and also multiple connections/requests33* can be handled per thread.34* <p>35* It must be instantiated with a {@link HttpCallback} object to which36* requests are given and must be handled.37* <p>38* Simple synchronization between the client(s) and server can be done39* using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and40* {@link #rendezvous(String,int)} methods.41*42* NOTE NOTE NOTE NOTE NOTE NOTE NOTE43*44* If changes are made here, please sure they are propagated to45* the HTTPS equivalent in the JSSE regression test suite.46*47* NOTE NOTE NOTE NOTE NOTE NOTE NOTE48*/4950public class TestHttpServer {5152ServerSocketChannel schan;53int threads;54int cperthread;55HttpCallback cb;56Server[] servers;5758/**59* Create a <code>TestHttpServer<code> instance with the specified callback object60* for handling requests. One thread is created to handle requests,61* and up to ten TCP connections will be handled simultaneously.62* @param cb the callback object which is invoked to handle each63* incoming request64*/6566public TestHttpServer (HttpCallback cb) throws IOException {67this (cb, 1, 10, 0);68}6970/**71* Create a <code>TestHttpServer<code> instance with the specified callback object72* for handling requests. One thread is created to handle requests,73* and up to ten TCP connections will be handled simultaneously.74* @param cb the callback object which is invoked to handle each75* incoming request76* @param address the address to bind the server to. <code>Null</code>77* means bind to the wildcard address.78* @param port the port number to bind the server to. <code>Zero</code>79* means choose any free port.80*/8182public TestHttpServer (HttpCallback cb, InetAddress address, int port) throws IOException {83this (cb, 1, 10, address, 0);84}8586/**87* Create a <code>TestHttpServer<code> instance with the specified number of88* threads and maximum number of connections per thread. This functions89* the same as the 4 arg constructor, where the port argument is set to zero.90* @param cb the callback object which is invoked to handle each91* incoming request92* @param threads the number of threads to create to handle requests93* in parallel94* @param cperthread the number of simultaneous TCP connections to95* handle per thread96*/9798public TestHttpServer (HttpCallback cb, int threads, int cperthread)99throws IOException {100this (cb, threads, cperthread, 0);101}102103/**104* Create a <code>TestHttpServer<code> instance with the specified number105* of threads and maximum number of connections per thread and running on106* the specified port. The specified number of threads are created to107* handle incoming requests, and each thread is allowed108* to handle a number of simultaneous TCP connections.109* @param cb the callback object which is invoked to handle110* each incoming request111* @param threads the number of threads to create to handle112* requests in parallel113* @param cperthread the number of simultaneous TCP connections114* to handle per thread115* @param port the port number to bind the server to. <code>Zero</code>116* means choose any free port.117*/118119public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port)120throws IOException {121this(cb, threads, cperthread, null, port);122}123124/**125* Create a <code>TestHttpServer<code> instance with the specified number126* of threads and maximum number of connections per thread and running on127* the specified port. The specified number of threads are created to128* handle incoming requests, and each thread is allowed129* to handle a number of simultaneous TCP connections.130* @param cb the callback object which is invoked to handle131* each incoming request132* @param threads the number of threads to create to handle133* requests in parallel134* @param cperthread the number of simultaneous TCP connections135* to handle per thread136* @param address the address to bind the server to. <code>Null</code>137* means bind to the wildcard address.138* @param port the port number to bind the server to. <code>Zero</code>139* means choose any free port.140*/141142public TestHttpServer (HttpCallback cb, int threads, int cperthread,143InetAddress address, int port)144throws IOException {145schan = ServerSocketChannel.open ();146InetSocketAddress addr = new InetSocketAddress (address, port);147schan.socket().bind (addr);148this.threads = threads;149this.cb = cb;150this.cperthread = cperthread;151servers = new Server [threads];152for (int i=0; i<threads; i++) {153servers[i] = new Server (cb, schan, cperthread);154servers[i].start();155}156}157158/**159* Tell all threads in the server to exit within 5 seconds.160* This is an abortive termination. Just prior to the thread exiting161* all channels in that thread waiting to be closed are forceably closed.162* @throws InterruptedException163*/164165public void terminate () {166for (int i=0; i<threads; i++) {167servers[i].terminate ();168}169170for (int i = 0; i < threads; i++) {171try {172servers[i].join();173} catch (InterruptedException e) {174System.err.println("Unexpected InterruptedException during terminating server");175throw new RuntimeException(e);176}177}178}179180/**181* return the local port number to which the server is bound.182* @return the local port number183*/184185public int getLocalPort () {186return schan.socket().getLocalPort ();187}188189public String getAuthority() {190InetAddress address = schan.socket().getInetAddress();191String hostaddr = address.getHostAddress();192if (address.isAnyLocalAddress()) hostaddr = "localhost";193if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";194return hostaddr + ":" + getLocalPort();195}196197static class Server extends Thread {198199ServerSocketChannel schan;200Selector selector;201SelectionKey listenerKey;202SelectionKey key; /* the current key being processed */203HttpCallback cb;204ByteBuffer consumeBuffer;205int maxconn;206int nconn;207ClosedChannelList clist;208volatile boolean shutdown;209210Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {211this.schan = schan;212this.maxconn = maxconn;213this.cb = cb;214nconn = 0;215consumeBuffer = ByteBuffer.allocate (512);216clist = new ClosedChannelList ();217try {218selector = Selector.open ();219schan.configureBlocking (false);220listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);221} catch (IOException e) {222System.err.println ("Server could not start: " + e);223throw new RuntimeException("Server could not start: " + e, e);224}225}226227/* Stop the thread as soon as possible */228public void terminate () {229shutdown = true;230}231232public void run () {233try {234while (true) {235selector.select(1000);236Set<SelectionKey> selected = selector.selectedKeys();237Iterator<SelectionKey> iter = selected.iterator();238while (iter.hasNext()) {239key = iter.next();240if (key.equals (listenerKey)) {241SocketChannel sock = schan.accept ();242if (sock == null) {243/* false notification */244iter.remove();245continue;246}247sock.configureBlocking (false);248sock.register (selector, SelectionKey.OP_READ);249nconn ++;250System.out.println("SERVER: new connection. chan[" + sock + "]");251if (nconn == maxconn) {252/* deregister */253listenerKey.cancel ();254listenerKey = null;255}256} else {257if (key.isReadable()) {258boolean closed;259SocketChannel chan = (SocketChannel) key.channel();260System.out.println("SERVER: connection readable. chan[" + chan + "]");261if (key.attachment() != null) {262System.out.println("Server: consume");263closed = consume (chan);264} else {265closed = read (chan, key);266}267if (closed) {268chan.close ();269key.cancel ();270if (nconn == maxconn) {271listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);272}273nconn --;274}275}276}277iter.remove();278}279clist.check();280if (shutdown) {281System.out.println("Force to Shutdown");282SelectionKey sKey = schan.keyFor(selector);283if (sKey != null) {284sKey.cancel();285}286287clist.terminate ();288selector.close();289schan.socket().close();290schan.close();291return;292}293}294} catch (IOException e) {295System.out.println ("Server exception: " + e);296// TODO finish297}298}299300/* read all the data off the channel without looking at it301* return true if connection closed302*/303boolean consume (SocketChannel chan) {304try {305consumeBuffer.clear ();306int c = chan.read (consumeBuffer);307if (c == -1)308return true;309} catch (IOException e) {310return true;311}312return false;313}314315/* return true if the connection is closed, false otherwise */316317private boolean read (SocketChannel chan, SelectionKey key) {318HttpTransaction msg;319boolean res;320try {321InputStream is = new BufferedInputStream (new NioInputStream (chan));322String requestline = readLine (is);323MessageHeader mhead = new MessageHeader (is);324String clen = mhead.findValue ("Content-Length");325String trferenc = mhead.findValue ("Transfer-Encoding");326String data = null;327if (trferenc != null && trferenc.equals ("chunked"))328data = new String (readChunkedData (is));329else if (clen != null)330data = new String (readNormalData (is, Integer.parseInt (clen)));331String[] req = requestline.split (" ");332if (req.length < 2) {333/* invalid request line */334return false;335}336String cmd = req[0];337URI uri = null;338try {339uri = new URI (req[1]);340msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key);341cb.request (msg);342} catch (URISyntaxException e) {343System.err.println ("Invalid URI: " + e);344msg = new HttpTransaction (this, cmd, null, null, null, null, key);345msg.sendResponse (501, "Whatever");346}347res = false;348} catch (IOException e) {349res = true;350}351return res;352}353354byte[] readNormalData (InputStream is, int len) throws IOException {355byte [] buf = new byte [len];356int c, off=0, remain=len;357while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {358remain -= c;359off += c;360}361return buf;362}363364private void readCRLF(InputStream is) throws IOException {365int cr = is.read();366int lf = is.read();367368if (((cr & 0xff) != 0x0d) ||369((lf & 0xff) != 0x0a)) {370throw new IOException(371"Expected <CR><LF>: got '" + cr + "/" + lf + "'");372}373}374375byte[] readChunkedData (InputStream is) throws IOException {376LinkedList l = new LinkedList ();377int total = 0;378for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {379l.add (readNormalData(is, len));380total += len;381readCRLF(is); // CRLF at end of chunk382}383readCRLF(is); // CRLF at end of Chunked Stream.384byte[] buf = new byte [total];385Iterator i = l.iterator();386int x = 0;387while (i.hasNext()) {388byte[] b = (byte[])i.next();389System.arraycopy (b, 0, buf, x, b.length);390x += b.length;391}392return buf;393}394395private int readChunkLen (InputStream is) throws IOException {396int c, len=0;397boolean done=false, readCR=false;398while (!done) {399c = is.read ();400if (c == '\n' && readCR) {401done = true;402} else {403if (c == '\r' && !readCR) {404readCR = true;405} else {406int x=0;407if (c >= 'a' && c <= 'f') {408x = c - 'a' + 10;409} else if (c >= 'A' && c <= 'F') {410x = c - 'A' + 10;411} else if (c >= '0' && c <= '9') {412x = c - '0';413}414len = len * 16 + x;415}416}417}418return len;419}420421private String readLine (InputStream is) throws IOException {422boolean done=false, readCR=false;423byte[] b = new byte [512];424int c, l = 0;425426while (!done) {427c = is.read ();428if (c == '\n' && readCR) {429done = true;430} else {431if (c == '\r' && !readCR) {432readCR = true;433} else {434b[l++] = (byte)c;435}436}437}438return new String (b);439}440441/** close the channel associated with the current key by:442* 1. shutdownOutput (send a FIN)443* 2. mark the key so that incoming data is to be consumed and discarded444* 3. After a period, close the socket445*/446447synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {448SocketChannel ch = (SocketChannel)key.channel ();449System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]");450ch.socket().shutdownOutput();451key.attach (this);452clist.add (key);453}454455synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {456SocketChannel ch = (SocketChannel)key.channel ();457System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]");458459Socket s = ch.socket ();460s.setSoLinger (true, 0);461ch.close();462}463}464465466/**467* Implements blocking reading semantics on top of a non-blocking channel468*/469470static class NioInputStream extends InputStream {471SocketChannel channel;472Selector selector;473ByteBuffer chanbuf;474SelectionKey key;475int available;476byte[] one;477boolean closed;478ByteBuffer markBuf; /* reads may be satisifed from this buffer */479boolean marked;480boolean reset;481int readlimit;482483public NioInputStream (SocketChannel chan) throws IOException {484this.channel = chan;485selector = Selector.open();486chanbuf = ByteBuffer.allocate (1024);487key = chan.register (selector, SelectionKey.OP_READ);488available = 0;489one = new byte[1];490closed = marked = reset = false;491}492493public synchronized int read (byte[] b) throws IOException {494return read (b, 0, b.length);495}496497public synchronized int read () throws IOException {498return read (one, 0, 1);499}500501public synchronized int read (byte[] b, int off, int srclen) throws IOException {502503int canreturn, willreturn;504505if (closed)506return -1;507508if (reset) { /* satisfy from markBuf */509canreturn = markBuf.remaining ();510willreturn = canreturn>srclen ? srclen : canreturn;511markBuf.get(b, off, willreturn);512if (canreturn == willreturn) {513reset = false;514}515} else { /* satisfy from channel */516canreturn = available();517if (canreturn == 0) {518block ();519canreturn = available();520}521willreturn = canreturn>srclen ? srclen : canreturn;522chanbuf.get(b, off, willreturn);523available -= willreturn;524525if (marked) { /* copy into markBuf */526try {527markBuf.put (b, off, willreturn);528} catch (BufferOverflowException e) {529marked = false;530}531}532}533return willreturn;534}535536public synchronized int available () throws IOException {537if (closed)538throw new IOException ("Stream is closed");539540if (reset)541return markBuf.remaining();542543if (available > 0)544return available;545546chanbuf.clear ();547available = channel.read (chanbuf);548if (available > 0)549chanbuf.flip();550else if (available == -1)551throw new IOException ("Stream is closed");552return available;553}554555/**556* block() only called when available==0 and buf is empty557*/558private synchronized void block () throws IOException {559//assert available == 0;560int n = selector.select ();561//assert n == 1;562selector.selectedKeys().clear();563available ();564}565566public void close () throws IOException {567if (closed)568return;569channel.close ();570closed = true;571}572573public synchronized void mark (int readlimit) {574if (closed)575return;576this.readlimit = readlimit;577markBuf = ByteBuffer.allocate (readlimit);578marked = true;579reset = false;580}581582public synchronized void reset () throws IOException {583if (closed )584return;585if (!marked)586throw new IOException ("Stream not marked");587marked = false;588reset = true;589markBuf.flip ();590}591}592593static class NioOutputStream extends OutputStream {594SocketChannel channel;595ByteBuffer buf;596SelectionKey key;597Selector selector;598boolean closed;599byte[] one;600601public NioOutputStream (SocketChannel channel) throws IOException {602this.channel = channel;603selector = Selector.open ();604key = channel.register (selector, SelectionKey.OP_WRITE);605closed = false;606one = new byte [1];607}608609public synchronized void write (int b) throws IOException {610one[0] = (byte)b;611write (one, 0, 1);612}613614public synchronized void write (byte[] b) throws IOException {615write (b, 0, b.length);616}617618public synchronized void write (byte[] b, int off, int len) throws IOException {619if (closed)620throw new IOException ("stream is closed");621622buf = ByteBuffer.allocate (len);623buf.put (b, off, len);624buf.flip ();625int n;626while ((n = channel.write (buf)) < len) {627len -= n;628if (len == 0)629return;630selector.select ();631selector.selectedKeys().clear ();632}633}634635public void close () throws IOException {636if (closed)637return;638channel.close ();639closed = true;640}641}642643/**644* Utilities for synchronization. A condition is645* identified by a string name, and is initialized646* upon first use (ie. setCondition() or waitForCondition()). Threads647* are blocked until some thread calls (or has called) setCondition() for the same648* condition.649* <P>650* A rendezvous built on a condition is also provided for synchronizing651* N threads.652*/653654private static HashMap conditions = new HashMap();655656/*657* Modifiable boolean object658*/659private static class BValue {660boolean v;661}662663/*664* Modifiable int object665*/666private static class IValue {667int v;668IValue (int i) {669v =i;670}671}672673674private static BValue getCond (String condition) {675synchronized (conditions) {676BValue cond = (BValue) conditions.get (condition);677if (cond == null) {678cond = new BValue();679conditions.put (condition, cond);680}681return cond;682}683}684685/**686* Set the condition to true. Any threads that are currently blocked687* waiting on the condition, will be unblocked and allowed to continue.688* Threads that subsequently call waitForCondition() will not block.689* If the named condition did not exist prior to the call, then it is created690* first.691*/692693public static void setCondition (String condition) {694BValue cond = getCond (condition);695synchronized (cond) {696if (cond.v) {697return;698}699cond.v = true;700cond.notifyAll();701}702}703704/**705* If the named condition does not exist, then it is created and initialized706* to false. If the condition exists or has just been created and its value707* is false, then the thread blocks until another thread sets the condition.708* If the condition exists and is already set to true, then this call returns709* immediately without blocking.710*/711712public static void waitForCondition (String condition) {713BValue cond = getCond (condition);714synchronized (cond) {715if (!cond.v) {716try {717cond.wait();718} catch (InterruptedException e) {}719}720}721}722723/* conditions must be locked when accessing this */724static HashMap rv = new HashMap();725726/**727* Force N threads to rendezvous (ie. wait for each other) before proceeding.728* The first thread(s) to call are blocked until the last729* thread makes the call. Then all threads continue.730* <p>731* All threads that call with the same condition name, must use the same value732* for N (or the results may be not be as expected).733* <P>734* Obviously, if fewer than N threads make the rendezvous then the result735* will be a hang.736*/737738public static void rendezvous (String condition, int N) {739BValue cond;740IValue iv;741String name = "RV_"+condition;742743/* get the condition */744745synchronized (conditions) {746cond = (BValue)conditions.get (name);747if (cond == null) {748/* we are first caller */749if (N < 2) {750throw new RuntimeException ("rendezvous must be called with N >= 2");751}752cond = new BValue ();753conditions.put (name, cond);754iv = new IValue (N-1);755rv.put (name, iv);756} else {757/* already initialised, just decrement the counter */758iv = (IValue) rv.get (name);759iv.v --;760}761}762763if (iv.v > 0) {764waitForCondition (name);765} else {766setCondition (name);767synchronized (conditions) {768clearCondition (name);769rv.remove (name);770}771}772}773774/**775* If the named condition exists and is set then remove it, so it can776* be re-initialized and used again. If the condition does not exist, or777* exists but is not set, then the call returns without doing anything.778* Note, some higher level synchronization779* may be needed between clear and the other operations.780*/781782public static void clearCondition(String condition) {783BValue cond;784synchronized (conditions) {785cond = (BValue) conditions.get (condition);786if (cond == null) {787return;788}789synchronized (cond) {790if (cond.v) {791conditions.remove (condition);792}793}794}795}796}797798799