diff --git a/cgi.d b/cgi.d index 8935db5..b15d660 100644 --- a/cgi.d +++ b/cgi.d @@ -2426,51 +2426,8 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau } } else version(embedded_httpd_threads) { - auto manager = new ListeningConnectionManager(listeningPort(8085)); - foreach(connection; manager) { - scope(failure) { - // catch all for other errors - sendAll(connection, plainHttpError(false, "500 Internal Server Error", null)); - connection.close(); - } - bool closeConnection; - auto ir = new BufferedInputRange(connection); - - while(!ir.empty) { - Cgi cgi; - try { - cgi = new CustomCgi(ir, &closeConnection); - } catch(Throwable t) { - // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P - // anyway let's kill the connection - stderr.writeln(t.toString()); - sendAll(connection, plainHttpError(false, "400 Bad Request", t)); - closeConnection = true; - break; - } - assert(cgi !is null); - scope(exit) - cgi.dispose(); - - try { - fun(cgi); - cgi.close(); - } catch(Throwable t) { - // a processing error can be recovered from - stderr.writeln(t.toString); - if(!handleException(cgi, t)) - closeConnection = true; - } - - if(closeConnection) { - connection.close(); - break; - } else { - if(!ir.empty) - ir.popFront(); // get the next - } - } - } + auto manager = new ListeningConnectionManager(listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun)); + manager.listen(); } else version(scgi) { import std.exception; @@ -2677,6 +2634,53 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau } } +version(embedded_httpd_threads) +void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { + scope(failure) { + // catch all for other errors + sendAll(connection, plainHttpError(false, "500 Internal Server Error", null)); + connection.close(); + } + bool closeConnection; + auto ir = new BufferedInputRange(connection); + + while(!ir.empty) { + Cgi cgi; + try { + cgi = new CustomCgi(ir, &closeConnection); + } catch(Throwable t) { + // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P + // anyway let's kill the connection + stderr.writeln(t.toString()); + sendAll(connection, plainHttpError(false, "400 Bad Request", t)); + closeConnection = true; + break; + } + assert(cgi !is null); + scope(exit) + cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + } catch(Throwable t) { + // a processing error can be recovered from + stderr.writeln(t.toString); + if(!handleException(cgi, t)) + closeConnection = true; + } + + if(closeConnection) { + connection.close(); + break; + } else { + if(!ir.empty) + ir.popFront(); // get the next + } + } +} + + string printDate(DateTime date) { return format( "%.3s, %02d %.3s %d %02d:%02d:%02d GMT", // could be UTC too @@ -2922,20 +2926,39 @@ class BufferedInputRange { bool sourceClosed; } +class ConnectionThread2 : Thread { + import std.concurrency; + this(void function(Socket) handler) { + this.handler = handler; + super(&run); + } + + void run() { + tid = thisTid(); + available = true; + while(true) + receive( + (/*Socket*/ size_t s) { + available = false; + try { + handler(cast(Socket) cast(void*) s); + } catch(Throwable t) {} + available = true; + } + ); + } + + bool available; + Tid tid; + void function(Socket) handler; +} + /** To use this thing: - auto manager = new ListeningConnectionManager(80); - foreach(connection; manager) { - // work with connection - // note: each connection may get its own thread, so this is a kind of concurrent foreach. - - // this can have implications if you access local variables in the function, as they are - // implicitly shared! - - // FIXME: break does not work - } - + void handler(Socket s) { do something... } + auto manager = new ListeningConnectionManager(80, &handler); + manager.listen(); I suggest you use BufferedInputRange(connection) to handle the input. As a packet comes in, you will get control. You can just continue; though to fetch more. @@ -2944,7 +2967,43 @@ class BufferedInputRange { FIXME: should I offer an event based async thing like netman did too? Yeah, probably. */ class ListeningConnectionManager { - this(ushort port) { + void listen() { + version(cgi_multiple_connections_per_thread) { + import std.concurrency; + import std.random; + ConnectionThread2[16] pool; + foreach(ref p; pool) { + p = new ConnectionThread2(handler); + p.start(); + } + + while(true) { + auto connection = listener.accept(); + + bool handled = false; + retry: + foreach(p; pool) + if(p.available) { + handled = true; + send(p.tid, cast(size_t) cast(void*) connection); + break; + } + + // none available right now, make it wait a bit then try again + if(!handled) { + Thread.sleep(dur!"msecs"(25)); + goto retry; + } + } + } else { + foreach(connection; this) + handler(connection); + + } + } + + this(ushort port, void function(Socket) handler) { + this.handler = handler; listener = new TcpSocket(); listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); listener.bind(new InternetAddress(port)); @@ -2952,6 +3011,7 @@ class ListeningConnectionManager { } Socket listener; + void function(Socket) handler; bool running; void quit() { @@ -2962,15 +3022,6 @@ class ListeningConnectionManager { running = true; shared(int) loopBroken; - version(cgi_multiple_connections_per_thread) { - ConnectionThread[16] pool; - foreach(ref t; pool) { - t = new ConnectionThread(null, &loopBroken, dg); - t.start(); - } - } - - while(!loopBroken && running) { auto sn = listener.accept(); try {