experimental new embedded_httpd_threads, use -version=cgi_multiple_connections_per_thread to try

This commit is contained in:
Adam D. Ruppe 2013-08-10 13:54:19 -04:00
parent f34e9de27f
commit 93248ce308
1 changed files with 117 additions and 66 deletions

183
cgi.d
View File

@ -2426,51 +2426,8 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau
} }
} else } else
version(embedded_httpd_threads) { version(embedded_httpd_threads) {
auto manager = new ListeningConnectionManager(listeningPort(8085)); auto manager = new ListeningConnectionManager(listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun));
foreach(connection; manager) { manager.listen();
scope(failure) {
// catch all for other errors
sendAll(connection, plainHttpError(false, "500 Internal Server Error", null));
connection.close();
}
bool closeConnection;
auto ir = new BufferedInputRange(connection);
while(!ir.empty) {
Cgi cgi;
try {
cgi = new CustomCgi(ir, &closeConnection);
} catch(Throwable t) {
// a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P
// anyway let's kill the connection
stderr.writeln(t.toString());
sendAll(connection, plainHttpError(false, "400 Bad Request", t));
closeConnection = true;
break;
}
assert(cgi !is null);
scope(exit)
cgi.dispose();
try {
fun(cgi);
cgi.close();
} catch(Throwable t) {
// a processing error can be recovered from
stderr.writeln(t.toString);
if(!handleException(cgi, t))
closeConnection = true;
}
if(closeConnection) {
connection.close();
break;
} else {
if(!ir.empty)
ir.popFront(); // get the next
}
}
}
} else } else
version(scgi) { version(scgi) {
import std.exception; import std.exception;
@ -2677,6 +2634,53 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau
} }
} }
version(embedded_httpd_threads)
void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) {
scope(failure) {
// catch all for other errors
sendAll(connection, plainHttpError(false, "500 Internal Server Error", null));
connection.close();
}
bool closeConnection;
auto ir = new BufferedInputRange(connection);
while(!ir.empty) {
Cgi cgi;
try {
cgi = new CustomCgi(ir, &closeConnection);
} catch(Throwable t) {
// a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P
// anyway let's kill the connection
stderr.writeln(t.toString());
sendAll(connection, plainHttpError(false, "400 Bad Request", t));
closeConnection = true;
break;
}
assert(cgi !is null);
scope(exit)
cgi.dispose();
try {
fun(cgi);
cgi.close();
} catch(Throwable t) {
// a processing error can be recovered from
stderr.writeln(t.toString);
if(!handleException(cgi, t))
closeConnection = true;
}
if(closeConnection) {
connection.close();
break;
} else {
if(!ir.empty)
ir.popFront(); // get the next
}
}
}
string printDate(DateTime date) { string printDate(DateTime date) {
return format( return format(
"%.3s, %02d %.3s %d %02d:%02d:%02d GMT", // could be UTC too "%.3s, %02d %.3s %d %02d:%02d:%02d GMT", // could be UTC too
@ -2922,20 +2926,39 @@ class BufferedInputRange {
bool sourceClosed; bool sourceClosed;
} }
class ConnectionThread2 : Thread {
import std.concurrency;
this(void function(Socket) handler) {
this.handler = handler;
super(&run);
}
void run() {
tid = thisTid();
available = true;
while(true)
receive(
(/*Socket*/ size_t s) {
available = false;
try {
handler(cast(Socket) cast(void*) s);
} catch(Throwable t) {}
available = true;
}
);
}
bool available;
Tid tid;
void function(Socket) handler;
}
/** /**
To use this thing: To use this thing:
auto manager = new ListeningConnectionManager(80); void handler(Socket s) { do something... }
foreach(connection; manager) { auto manager = new ListeningConnectionManager(80, &handler);
// work with connection manager.listen();
// note: each connection may get its own thread, so this is a kind of concurrent foreach.
// this can have implications if you access local variables in the function, as they are
// implicitly shared!
// FIXME: break does not work
}
I suggest you use BufferedInputRange(connection) to handle the input. As a packet I suggest you use BufferedInputRange(connection) to handle the input. As a packet
comes in, you will get control. You can just continue; though to fetch more. comes in, you will get control. You can just continue; though to fetch more.
@ -2944,7 +2967,43 @@ class BufferedInputRange {
FIXME: should I offer an event based async thing like netman did too? Yeah, probably. FIXME: should I offer an event based async thing like netman did too? Yeah, probably.
*/ */
class ListeningConnectionManager { class ListeningConnectionManager {
this(ushort port) { void listen() {
version(cgi_multiple_connections_per_thread) {
import std.concurrency;
import std.random;
ConnectionThread2[16] pool;
foreach(ref p; pool) {
p = new ConnectionThread2(handler);
p.start();
}
while(true) {
auto connection = listener.accept();
bool handled = false;
retry:
foreach(p; pool)
if(p.available) {
handled = true;
send(p.tid, cast(size_t) cast(void*) connection);
break;
}
// none available right now, make it wait a bit then try again
if(!handled) {
Thread.sleep(dur!"msecs"(25));
goto retry;
}
}
} else {
foreach(connection; this)
handler(connection);
}
}
this(ushort port, void function(Socket) handler) {
this.handler = handler;
listener = new TcpSocket(); listener = new TcpSocket();
listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
listener.bind(new InternetAddress(port)); listener.bind(new InternetAddress(port));
@ -2952,6 +3011,7 @@ class ListeningConnectionManager {
} }
Socket listener; Socket listener;
void function(Socket) handler;
bool running; bool running;
void quit() { void quit() {
@ -2962,15 +3022,6 @@ class ListeningConnectionManager {
running = true; running = true;
shared(int) loopBroken; shared(int) loopBroken;
version(cgi_multiple_connections_per_thread) {
ConnectionThread[16] pool;
foreach(ref t; pool) {
t = new ConnectionThread(null, &loopBroken, dg);
t.start();
}
}
while(!loopBroken && running) { while(!loopBroken && running) {
auto sn = listener.accept(); auto sn = listener.accept();
try { try {