From ca852a8c82019c47eae54e23e613222f13a1d8b0 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 14:46:22 +0000 Subject: [PATCH 01/34] Move to use a workerpool --- danode/client.d | 6 ++-- danode/filesystem.d | 2 +- danode/imports.d | 2 ++ danode/router.d | 3 +- danode/server.d | 85 +++++++++++++++++++++---------------------- danode/workerpool.d | 88 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 134 insertions(+), 52 deletions(-) create mode 100644 danode/workerpool.d diff --git a/danode/client.d b/danode/client.d index 99b803b..2d7cd5d 100644 --- a/danode/client.d +++ b/danode/client.d @@ -1,4 +1,4 @@ -/** danode/client.d - Per-connection thread: request/response lifecycle, keep-alive, timeouts +/** danode/client.d - Per-connection handler: request/response lifecycle, keep-alive, timeouts * License: GPLv3 (https://github.com/DannyArends/DaNode) - Danny Arends **/ module danode.client; @@ -20,7 +20,7 @@ immutable size_t MAX_UPLOAD_SIZE = 1024 * 1024 * 100; // 100MB Multipart uploa immutable size_t MAX_SSE_TIME = 60_000; // 60 seconds max SSE lifetime -class Client : Thread, ClientInterface { +class Client : ClientInterface { private: Router router; /// Router class from server DriverInterface driver; /// Driver @@ -33,7 +33,6 @@ class Client : Thread, ClientInterface { this.router = router; this.driver = driver; this.maxtime = maxtime; - super(&run); // initialize the thread } final void run() { @@ -85,7 +84,6 @@ class Client : Thread, ClientInterface { this.log(request, response); } catch(Exception e) { log(Level.Verbose, "Unknown Client Exception: %s", e); stop(); } catch(Error e) { log(Level.Verbose, "Unknown Client Error: %s", e); stop(); } - log(Level.Verbose, "Connection %s:%s (%s) closed. %d requests %s (%s msecs)", ip, port, (driver.isSecure() ? "SSL" : "HTTP"), driver.requests, driver.senddata, starttime); } diff --git a/danode/filesystem.d b/danode/filesystem.d index b80a1a1..db6ee7b 100644 --- a/danode/filesystem.d +++ b/danode/filesystem.d @@ -65,7 +65,7 @@ class FileSystem { // Remove files that no longer exist on disk foreach (k; domain.files.keys) { if (!exists(dname ~ k)) { domain.files.remove(k); } } - log(Level.Verbose, "Domain: '%s' files %s|%s", dname, domain.buffered, domain.entries); + log(Level.Always, "Domain: '%s' files %s|%s", dname, domain.buffered, domain.entries); log(Level.Verbose, "Domain: '%s' size %.2f/%.2f kB", dname, domain.buffersize / 1024.0, domain.size / 1024.0); return(domain); } } diff --git a/danode/imports.d b/danode/imports.d index 023f7cf..1965982 100644 --- a/danode/imports.d +++ b/danode/imports.d @@ -8,6 +8,8 @@ public import core.stdc.stdio : fileno; // Public imported structures and enums from core public import core.atomic; +public import core.memory : GC; +public import core.sync.semaphore : Semaphore; public import core.sync.mutex : Mutex; public import core.thread; diff --git a/danode/router.d b/danode/router.d index 8eb990a..1e3c536 100644 --- a/danode/router.d +++ b/danode/router.d @@ -154,8 +154,7 @@ StringDriver runRequest(Router router, string request = "GET /dmd.d HTTP/1.1\nHo auto driver = new StringDriver(request); auto client = new Client(router, driver, maxtime); log(Level.Verbose, "Router: [I] %s:%s %s", client.ip(), client.port(), request.splitLines()[0]); - client.start(); - client.join(); + client.run(); return driver; } diff --git a/danode/server.d b/danode/server.d index 298e308..b7a4ee0 100644 --- a/danode/server.d +++ b/danode/server.d @@ -8,6 +8,7 @@ import danode.client : Client; import danode.interfaces : DriverInterface; import danode.http : HTTP; import danode.router : Router; +import danode.workerpool : WorkerPool; import danode.log : cv, abort, log, tag, error, Level; version(SSL) { @@ -16,18 +17,13 @@ version(SSL) { import danode.https : HTTPS; } -immutable int MAX_CLIENTS = 2048; -immutable int MAX_CLIENTS_PER_IP = 32; - class Server : Thread { private: Socket socket; // The server socket SocketSet set; // SocketSet for server socket and client listeners - Client[] clients; // List of clients bool terminated; // Server running SysTime starttime; // Start time of the server - Router router; // Router to route requests - long[string] nAlivePerIP; + WorkerPool pool; public: string wwwFolder = "www/"; @@ -46,7 +42,7 @@ class Server : Thread { string sslFolder = ".ssl/", string sslKey = "server.key", string accountKey = "account.key") { starttime = Clock.currTime(); // Start the timer socket = initialize(port, backlog); // Create the HTTP socket - router = new Router(wwwFolder, socket.localAddress()); // Start the router + pool = new WorkerPool(new Router(wwwFolder, socket.localAddress())); version(SSL) { sslPath = sslFolder.resolveFolder(); ssl = sslKey; @@ -73,20 +69,22 @@ class Server : Thread { } // Accept an incoming connection and create a client object - final void accept(ref Appender!(Client[]) persistent, Socket socket, bool secure = false) { - if (set.sISelect(socket, false, 5) <= 0 || nAlive >= MAX_CLIENTS) return; - log(Level.Trace, "Accepting %s request", secure ? "HTTPs" : "HTTP"); + final void accept(Socket socket, bool secure = false) { + if (set.sISelect(socket, false, 5) <= 0) return; try { - DriverInterface driver = null; - if (!secure) driver = new HTTP(socket.accept(), false); - version(SSL) { if (secure) driver = new HTTPS(socket.accept(), false); } - if (driver is null) return; - Client client = new Client(router, driver); - client.start(); - if (nAlivePerIP.from(client.ip, 0L) <= MAX_CLIENTS_PER_IP) { - persistent.put(client); - } else { log(Level.Always, "Rate limit exceeded [%s]", client.ip); client.stop(); } - } catch(Exception e) { error("Unable to accept connection: %s", e.msg); } + Socket accepted = socket.accept(); + string ip = accepted.remoteAddress().toAddrString(); + bool isLoopback = (ip == "127.0.0.1" || ip == "::1"); + DriverInterface driver = null; + if (!secure) driver = new HTTP(accepted, false); + version(SSL) { if (secure) driver = new HTTPS(accepted, false); } + if (driver is null) { accepted.close(); return; } + if (!pool.push(driver, ip, isLoopback)) { + log(Level.Always, "Rate limit or capacity exceeded [%s]", ip); + driver.closeConnection(); + } + } catch(Exception e) { error("Unable to accept connection, Exception: %s", e.msg); + } catch(Error e) { error("Unable to accept connection, Error: %s", e.msg); } } // is the server still running ? @@ -98,19 +96,15 @@ class Server : Thread { } } } - // Stop all clients and shutdown the server - final void stop(){ synchronized { - foreach(ref Client client; clients){ client.stop(); } terminated = true; - } } - - final @property long nAlive() { return nAlivePerIP.byValue.sum; } + // Stop the pool and shutdown the server + final void stop(){ synchronized { pool.stop(); terminated = true; } } // Returns a Duration object holding the server uptime final @property Duration uptime() const { return(Clock.currTime() - starttime); } - // Print some server information - final @property void info() { log(Level.Always, "Uptime %s, Connections: %d / %d", uptime, nAlive, clients.length); } - + // Print some server information + final @property void info() { log(Level.Always, "Uptime %s, Connections: %d / queued: %d", uptime, pool.nAlive, pool.queued); } + // Hostname of the server final @property string hostname() { return(socket.hostName()); } version(SSL) { @@ -119,42 +113,43 @@ class Server : Thread { } final void run() { - Appender!(Client[]) persistent; SysTime lastScan = Clock.currTime(); while(running) { try { - Client[] previous = clients; // Slice reference - persistent.clear(); // Clear the Appender - accept(persistent, socket); - version (SSL) { accept(persistent, sslsocket, true); } - - nAlivePerIP = null; - foreach (Client client; previous) { // Foreach through the Slice reference - if(client.running) { nAlivePerIP[client.ip]++; persistent.put(client); } - else if(!client.isRunning) client.join(); // join finished threads - } - clients = persistent.data; - if (Msecs(lastScan) > 86_400_000) { // Scan for deleted files & expiring certificates every day - router.scan(); + accept(socket); + version (SSL) { accept(sslsocket, true); } + if (Msecs(lastScan) > 86_400_000) { + pool.scan(); version(SSL) { checkAndRenew(sslPath, sslKey, accountKey); } lastScan = Clock.currTime(); } } catch(Exception e) { error("Unspecified top level server exception: %s", e.msg); } catch(Error e) { error("Unspecified top level server error: %s", e.msg); } } - log(Level.Always, "Server socket closed, running: %s", running); + pool.stop(); socket.close(); version (SSL) { sslsocket.closeSSL(); } } } -void parseKeyInput(ref Server server){ +void parseKeyInput(ref Server server) { string line = chomp(stdin.readln()); if (line.startsWith("quit")) server.stop(); if (line.startsWith("info")) server.info(); } +void setLimit() { version(Posix) { + import core.sys.posix.sys.resource; + + rlimit rl; + getrlimit(RLIMIT_NOFILE, &rl); + rl.rlim_cur = rl.rlim_max; + auto res = setrlimit(RLIMIT_NOFILE, &rl); + log(Level.Always, "FD limit: %d [%d]", rl.rlim_cur, res); +} } + void main(string[] args) { + setLimit(); version(unittest){ ushort port = 8080; }else{ ushort port = 80; } int backlog = 100; int verbose = Level.Verbose; diff --git a/danode/workerpool.d b/danode/workerpool.d new file mode 100644 index 0000000..b4245bc --- /dev/null +++ b/danode/workerpool.d @@ -0,0 +1,88 @@ +/** danode/workerpool.d - Fixed thread pool: connection dispatch, per-IP tracking, worker lifecycle + * License: GPLv3 (https://github.com/DannyArends/DaNode) - Danny Arends **/ +module danode.workerpool; + +import danode.imports; +import danode.client : Client; +import danode.interfaces : DriverInterface; +import danode.router : Router; +import danode.log : log, error, Level; + +immutable int MAX_CLIENTS = 2048; +immutable int MAX_CLIENTS_PER_IP = 32; +immutable int POOL_SIZE = 200; + +class WorkerPool { + private: + Router router; + Thread[] workers; + DriverInterface[] queue; + Mutex mutex; + Semaphore sem; + bool stopped; + + public: + long[string] nAlivePerIP; // protected by mutex + + this(Router router) { + this.router = router; + this.mutex = new Mutex(); + this.sem = new Semaphore(0); + foreach (i; 0 .. POOL_SIZE) { + auto t = new Thread(&workerLoop, 256 * 1024); + t.isDaemon = true; + t.start(); + workers ~= t; + } + log(Level.Always, "WorkerPool started with %d threads", POOL_SIZE); + } + + bool push(DriverInterface driver, string ip, bool isLoopback) { + synchronized(mutex) { + if (!isLoopback && nAlivePerIP.get(ip, 0L) >= MAX_CLIENTS_PER_IP) return(false); + if (queue.length >= MAX_CLIENTS) return(false); + queue ~= driver; + } + sem.notify(); + return true; + } + + @property long nAlive() { synchronized(mutex) { return nAlivePerIP.byValue.sum; } } + @property long queued() { synchronized(mutex) { return queue.length; } } + void scan() { router.scan(); } + + + void stop() { + synchronized(mutex) { stopped = true; } + foreach (i; 0 .. POOL_SIZE) sem.notify(); // wake all workers to exit + foreach (t; workers) t.join(); + log(Level.Always, "WorkerPool stopped"); + } + + private: + void workerLoop() { + while (true) { + sem.wait(); + + DriverInterface driver; + synchronized(mutex) { + if (stopped) return; + if (queue.length == 0) continue; // spurious notify from stop() + driver = queue[0]; + queue = queue[1 .. $]; + } + + string ip = driver.ip; + synchronized(mutex) { nAlivePerIP[ip]++; } + try { + auto client = new Client(router, driver); + client.run(); + } catch(Exception e) { error("WorkerPool: client exception [%s]: %s", ip, e.msg); + } catch(Error e) { error("WorkerPool: client error [%s]: %s", ip, e.msg); } + synchronized(mutex) { + if (ip in nAlivePerIP && nAlivePerIP[ip] > 0) nAlivePerIP[ip]--; + if (nAlivePerIP[ip] == 0) nAlivePerIP.remove(ip); + } + } + } +} From 333cb2a197c8c8d32a4cf092321a0995106274b1 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 14:50:40 +0000 Subject: [PATCH 02/34] Minor fixes --- danode/filesystem.d | 2 +- danode/imports.d | 1 - danode/server.d | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/danode/filesystem.d b/danode/filesystem.d index db6ee7b..b80a1a1 100644 --- a/danode/filesystem.d +++ b/danode/filesystem.d @@ -65,7 +65,7 @@ class FileSystem { // Remove files that no longer exist on disk foreach (k; domain.files.keys) { if (!exists(dname ~ k)) { domain.files.remove(k); } } - log(Level.Always, "Domain: '%s' files %s|%s", dname, domain.buffered, domain.entries); + log(Level.Verbose, "Domain: '%s' files %s|%s", dname, domain.buffered, domain.entries); log(Level.Verbose, "Domain: '%s' size %.2f/%.2f kB", dname, domain.buffersize / 1024.0, domain.size / 1024.0); return(domain); } } diff --git a/danode/imports.d b/danode/imports.d index 1965982..732f185 100644 --- a/danode/imports.d +++ b/danode/imports.d @@ -8,7 +8,6 @@ public import core.stdc.stdio : fileno; // Public imported structures and enums from core public import core.atomic; -public import core.memory : GC; public import core.sync.semaphore : Semaphore; public import core.sync.mutex : Mutex; public import core.thread; diff --git a/danode/server.d b/danode/server.d index b7a4ee0..ca45f3e 100644 --- a/danode/server.d +++ b/danode/server.d @@ -126,7 +126,6 @@ class Server : Thread { } catch(Exception e) { error("Unspecified top level server exception: %s", e.msg); } catch(Error e) { error("Unspecified top level server error: %s", e.msg); } } - pool.stop(); socket.close(); version (SSL) { sslsocket.closeSSL(); } } From 8b19712483f52b4a9af7e280ac7f8c3d9c54dc31 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:12:57 +0000 Subject: [PATCH 03/34] Cleaning up --- danode/client.d | 3 +- danode/server.d | 73 ++++++++++++++------------------------------- danode/signals.d | 19 +++++++++--- danode/workerpool.d | 42 +++++++++++++++----------- 4 files changed, 64 insertions(+), 73 deletions(-) diff --git a/danode/client.d b/danode/client.d index 2d7cd5d..f8838a6 100644 --- a/danode/client.d +++ b/danode/client.d @@ -19,7 +19,6 @@ immutable size_t MAX_REQUEST_SIZE = 1024 * 1024 * 2; // 2MB Body immutable size_t MAX_UPLOAD_SIZE = 1024 * 1024 * 100; // 100MB Multipart uploads immutable size_t MAX_SSE_TIME = 60_000; // 60 seconds max SSE lifetime - class Client : ClientInterface { private: Router router; /// Router class from server @@ -93,7 +92,7 @@ class Client : ClientInterface { // Stop the client by setting the terminated flag final @property void stop() { - log(Level.Trace, "connection %s:%s stop called", ip, port); + log(Level.Trace, "Connection %s:%s stop called", ip, port); atomicStore(terminated, true); } diff --git a/danode/server.d b/danode/server.d index ca45f3e..5fb8840 100644 --- a/danode/server.d +++ b/danode/server.d @@ -3,8 +3,7 @@ module danode.server; import danode.imports; -import danode.functions : from, Msecs, sISelect, resolveFolder; -import danode.client : Client; +import danode.functions : Msecs, sISelect, resolveFolder; import danode.interfaces : DriverInterface; import danode.http : HTTP; import danode.router : Router; @@ -25,9 +24,6 @@ class Server : Thread { SysTime starttime; // Start time of the server WorkerPool pool; - public: - string wwwFolder = "www/"; - version(SSL) { private: Socket sslsocket; // SSL / HTTPs socket @@ -78,7 +74,7 @@ class Server : Thread { DriverInterface driver = null; if (!secure) driver = new HTTP(accepted, false); version(SSL) { if (secure) driver = new HTTPS(accepted, false); } - if (driver is null) { accepted.close(); return; } + if (driver is null) { return(accepted.close()); } if (!pool.push(driver, ip, isLoopback)) { log(Level.Always, "Rate limit or capacity exceeded [%s]", ip); driver.closeConnection(); @@ -88,12 +84,9 @@ class Server : Thread { } // is the server still running ? - final @property bool running(){ synchronized { - version(SSL) { - return(socket.isAlive() && sslsocket.isAlive() && !terminated); - } else { - return(socket.isAlive() && !terminated); - } + final @property bool running() { synchronized { + version(SSL) { return(socket.isAlive() && sslsocket.isAlive() && !terminated); + } else { return(socket.isAlive() && !terminated); } } } // Stop the pool and shutdown the server @@ -137,19 +130,7 @@ void parseKeyInput(ref Server server) { if (line.startsWith("info")) server.info(); } -void setLimit() { version(Posix) { - import core.sys.posix.sys.resource; - - rlimit rl; - getrlimit(RLIMIT_NOFILE, &rl); - rl.rlim_cur = rl.rlim_max; - auto res = setrlimit(RLIMIT_NOFILE, &rl); - log(Level.Always, "FD limit: %d [%d]", rl.rlim_cur, res); -} } - void main(string[] args) { - setLimit(); - version(unittest){ ushort port = 8080; }else{ ushort port = 80; } int backlog = 100; int verbose = Level.Verbose; bool keyoff = false; @@ -157,7 +138,7 @@ void main(string[] args) { string sslFolder = ".ssl/"; string sslKey = "server.key"; string accountKey = "account.key"; - + getopt(args, "port|p", &port, // Port to listen on "backlog|b", &backlog, // Backlog of clients supported "keyoff|k", &keyoff, // Keyboard on or off @@ -167,32 +148,24 @@ void main(string[] args) { "accountKey", &accountKey, // Server Let's encrypt account key "verbose|v", &verbose); // Verbose level (via commandline) atomicStore(cv, verbose); - version (unittest) { - // Do nothing, unittests will run - } else { - version (Posix) { - import core.sys.posix.signal : signal, SIGPIPE; - import danode.signals : handle_signal; - signal(SIGPIPE, &handle_signal); - } - version (Windows) { - log(Level.Always, "-k was set to true. However, keyboard input under windows is not supported"); - keyoff = true; - } - auto server = new Server(port, backlog, wwwFolder, sslFolder, sslKey, accountKey); - version (SSL) { - loadSSL(server.sslPath, server.sslKey); // Load SSL certificates - checkAndRenew(server.sslPath, server.sslKey, server.accountKey); // checkAndRenew SSL certificates - } - server.start(); - while (server.running) { - if (!keyoff) { server.parseKeyInput(); } - stdout.flush(); - Thread.sleep(dur!"msecs"(250)); - } - log(Level.Always, "Server shutting down: %d", server.running); - server.info(); + version(Posix) setupPosix(); + version (Windows) { + log(Level.Always, "-k was set to true. However, keyboard input under windows is not supported"); + keyoff = true; + } + auto server = new Server(port, backlog, wwwFolder, sslFolder, sslKey, accountKey); + version (SSL) { + loadSSL(server.sslPath, server.sslKey); // Load SSL certificates + checkAndRenew(server.sslPath, server.sslKey, server.accountKey); // checkAndRenew SSL certificates + } + server.start(); + while (server.running) { + if (!keyoff) { server.parseKeyInput(); } + stdout.flush(); + Thread.sleep(dur!"msecs"(250)); } + log(Level.Always, "Server shutting down: %d", server.running); + server.info(); } unittest { diff --git a/danode/signals.d b/danode/signals.d index abc5bef..3864d69 100644 --- a/danode/signals.d +++ b/danode/signals.d @@ -3,13 +3,15 @@ module danode.signals; version(Posix) { + import danode.imports; + + import core.sys.posix.sys.resource; + import core.sys.posix.signal : signal, SIGPIPE; import core.sys.posix.unistd : write; - import core.sys.posix.signal : SIGPIPE; - import danode.imports; - import danode.log : cv; + import danode.log : cv, log, Level; - extern(C) @nogc nothrow void handle_signal(int signal) { + extern(C) @nogc nothrow void handleSignal(int signal) { switch (signal) { case SIGPIPE: if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] Broken pipe caught, and ignored\n\0".ptr, 41); @@ -19,5 +21,14 @@ version(Posix) { break; } } + + void setupPosix() { + rlimit rl; + getrlimit(RLIMIT_NOFILE, &rl); + rl.rlim_cur = rl.rlim_max; + auto res = setrlimit(RLIMIT_NOFILE, &rl); + log(Level.Always, "FD limit: %d [%d]", rl.rlim_cur, res); + signal(SIGPIPE, &handleSignal); + } } diff --git a/danode/workerpool.d b/danode/workerpool.d index b4245bc..afc5881 100644 --- a/danode/workerpool.d +++ b/danode/workerpool.d @@ -8,22 +8,21 @@ import danode.interfaces : DriverInterface; import danode.router : Router; import danode.log : log, error, Level; -immutable int MAX_CLIENTS = 2048; -immutable int MAX_CLIENTS_PER_IP = 32; -immutable int POOL_SIZE = 200; +immutable int MAX_CLIENTS = 2048; /// Maximum number of queued connections before dropping +immutable int MAX_CLIENTS_PER_IP = 32; /// Maximum concurrent connections per remote IP +immutable int POOL_SIZE = 200; /// Number of pre-allocated worker threads class WorkerPool { private: - Router router; - Thread[] workers; - DriverInterface[] queue; - Mutex mutex; - Semaphore sem; - bool stopped; + Router router; /// Shared router instance passed to each Client + Thread[] workers; /// Fixed array of pre-allocated worker threads + Mutex mutex; /// Guards queue, stopped, and nAlivePerIP + DriverInterface[] queue; /// Pending connection queue, protected by mutex + long[string] nAlivePerIP; /// Active connection count per remote IP, protected by mutex + bool stopped; /// Set to true on shutdown; workers exit their loop when seen + Semaphore sem; /// Counts pending items in queue; workers block on wait() public: - long[string] nAlivePerIP; // protected by mutex - this(Router router) { this.router = router; this.mutex = new Mutex(); @@ -37,6 +36,8 @@ class WorkerPool { log(Level.Always, "WorkerPool started with %d threads", POOL_SIZE); } + /* Enqueue a new connection driver for handling by the next available worker. + Returns false if the connection should be rejected (rate limit or capacity exceeded). */ bool push(DriverInterface driver, string ip, bool isLoopback) { synchronized(mutex) { if (!isLoopback && nAlivePerIP.get(ip, 0L) >= MAX_CLIENTS_PER_IP) return(false); @@ -47,19 +48,25 @@ class WorkerPool { return true; } - @property long nAlive() { synchronized(mutex) { return nAlivePerIP.byValue.sum; } } - @property long queued() { synchronized(mutex) { return queue.length; } } - void scan() { router.scan(); } + // Total number of connections currently being handled across all workers + @property long nAlive() { synchronized(mutex) { return(nAlivePerIP.byValue.sum); } } + // Number of connections waiting in the queue for a free worker + @property long queued() { synchronized(mutex) { return(queue.length); } } + // Trigger a filesystem rescan on the router (called by the server) + void scan() { router.scan(); } + + // Signal all workers to exit and join them void stop() { - synchronized(mutex) { stopped = true; } + synchronized(mutex) { if (stopped) { return; } stopped = true; } foreach (i; 0 .. POOL_SIZE) sem.notify(); // wake all workers to exit foreach (t; workers) t.join(); log(Level.Always, "WorkerPool stopped"); } private: + // Worker thread body: blocks on semaphore, dequeues one connection, runs it to completion. void workerLoop() { while (true) { sem.wait(); @@ -77,8 +84,8 @@ class WorkerPool { try { auto client = new Client(router, driver); client.run(); - } catch(Exception e) { error("WorkerPool: client exception [%s]: %s", ip, e.msg); - } catch(Error e) { error("WorkerPool: client error [%s]: %s", ip, e.msg); } + } catch(Exception e) { error("WorkerPool: Client exception [%s]: %s", ip, e.msg); + } catch(Error e) { error("WorkerPool: Client error [%s]: %s", ip, e.msg); } synchronized(mutex) { if (ip in nAlivePerIP && nAlivePerIP[ip] > 0) nAlivePerIP[ip]--; if (nAlivePerIP[ip] == 0) nAlivePerIP.remove(ip); @@ -86,3 +93,4 @@ class WorkerPool { } } } + From bb3eb480a4b5c9ff774afa5951568e054a20ac9b Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:19:16 +0000 Subject: [PATCH 04/34] Updated server main loop --- danode/server.d | 40 +++++++++------------------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/danode/server.d b/danode/server.d index 5fb8840..bb6b99e 100644 --- a/danode/server.d +++ b/danode/server.d @@ -16,11 +16,10 @@ version(SSL) { import danode.https : HTTPS; } -class Server : Thread { +class Server { private: Socket socket; // The server socket SocketSet set; // SocketSet for server socket and client listeners - bool terminated; // Server running SysTime starttime; // Start time of the server WorkerPool pool; @@ -47,7 +46,6 @@ class Server : Thread { } set = new SocketSet(1); // Create a server socket set log(Level.Always, "Server '%s' created backlog: %d", this.hostname(), backlog); - super(&run); } // Initialize the listening socket to a certain port and backlog @@ -83,14 +81,8 @@ class Server : Thread { } catch(Error e) { error("Unable to accept connection, Error: %s", e.msg); } } - // is the server still running ? - final @property bool running() { synchronized { - version(SSL) { return(socket.isAlive() && sslsocket.isAlive() && !terminated); - } else { return(socket.isAlive() && !terminated); } - } } - // Stop the pool and shutdown the server - final void stop(){ synchronized { pool.stop(); terminated = true; } } + final void stop(){ pool.stop(); } // Returns a Duration object holding the server uptime final @property Duration uptime() const { return(Clock.currTime() - starttime); } @@ -107,7 +99,7 @@ class Server : Thread { final void run() { SysTime lastScan = Clock.currTime(); - while(running) { + while(socket.isAlive()) { try { accept(socket); version (SSL) { accept(sslsocket, true); } @@ -124,16 +116,10 @@ class Server : Thread { } } -void parseKeyInput(ref Server server) { - string line = chomp(stdin.readln()); - if (line.startsWith("quit")) server.stop(); - if (line.startsWith("info")) server.info(); -} - void main(string[] args) { + ushort port = 80; int backlog = 100; int verbose = Level.Verbose; - bool keyoff = false; string wwwFolder = "www/"; string sslFolder = ".ssl/"; string sslKey = "server.key"; @@ -141,31 +127,23 @@ void main(string[] args) { getopt(args, "port|p", &port, // Port to listen on "backlog|b", &backlog, // Backlog of clients supported - "keyoff|k", &keyoff, // Keyboard on or off "www", &wwwFolder, // Server www root folder "ssl", &sslFolder, // Location of SSL certificates "sslKey", &sslKey, // Server private key "accountKey", &accountKey, // Server Let's encrypt account key "verbose|v", &verbose); // Verbose level (via commandline) atomicStore(cv, verbose); - version(Posix) setupPosix(); - version (Windows) { - log(Level.Always, "-k was set to true. However, keyboard input under windows is not supported"); - keyoff = true; + version(Posix){ + import danode.signals : setupPosix; + setupPosix(); } + auto server = new Server(port, backlog, wwwFolder, sslFolder, sslKey, accountKey); version (SSL) { loadSSL(server.sslPath, server.sslKey); // Load SSL certificates checkAndRenew(server.sslPath, server.sslKey, server.accountKey); // checkAndRenew SSL certificates } - server.start(); - while (server.running) { - if (!keyoff) { server.parseKeyInput(); } - stdout.flush(); - Thread.sleep(dur!"msecs"(250)); - } - log(Level.Always, "Server shutting down: %d", server.running); - server.info(); + server.run(); } unittest { From 283830c53bf005df7f04bc06bf041b17b57f133a Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:42:20 +0000 Subject: [PATCH 05/34] Check server.alive --- danode/server.d | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/danode/server.d b/danode/server.d index bb6b99e..e8cade2 100644 --- a/danode/server.d +++ b/danode/server.d @@ -97,9 +97,14 @@ class Server { final @property string accountKey() { return(sslPath ~ account); } } + @property bool alive() { + version(SSL) { return socket.isAlive() && sslsocket.isAlive(); + } else { return socket.isAlive(); } + } + final void run() { SysTime lastScan = Clock.currTime(); - while(socket.isAlive()) { + while(alive) { try { accept(socket); version (SSL) { accept(sslsocket, true); } @@ -133,7 +138,7 @@ void main(string[] args) { "accountKey", &accountKey, // Server Let's encrypt account key "verbose|v", &verbose); // Verbose level (via commandline) atomicStore(cv, verbose); - version(Posix){ + version(Posix) { import danode.signals : setupPosix; setupPosix(); } From 64fccd3a73f21cb845216bf791c8cc4e95589d14 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:44:50 +0000 Subject: [PATCH 06/34] Minor fix --- danode/server.d | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/danode/server.d b/danode/server.d index e8cade2..1499f6d 100644 --- a/danode/server.d +++ b/danode/server.d @@ -97,9 +97,9 @@ class Server { final @property string accountKey() { return(sslPath ~ account); } } - @property bool alive() { - version(SSL) { return socket.isAlive() && sslsocket.isAlive(); - } else { return socket.isAlive(); } + @property bool alive() { + version(SSL) { return(socket.isAlive() && sslsocket.isAlive()); + } else { return(socket.isAlive()); } } final void run() { From f829b821696b17d290cd0790d78040bdc76a5052 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:55:12 +0000 Subject: [PATCH 07/34] Cleaning and moving code --- danode/client.d | 49 +++++++++------------------------------- danode/interfaces.d | 54 +++++++++++++++++++++++---------------------- 2 files changed, 39 insertions(+), 64 deletions(-) diff --git a/danode/client.d b/danode/client.d index f8838a6..b461d92 100644 --- a/danode/client.d +++ b/danode/client.d @@ -4,20 +4,18 @@ module danode.client; import danode.imports; -import danode.cgi : CGI; import danode.statuscode : StatusCode; import danode.functions: htmltime, Msecs; -import danode.interfaces : DriverInterface, ClientInterface, StringDriver; +import danode.interfaces : DriverInterface, ClientInterface, StringDriver, sendHeaderTooLarge, sendPayloadTooLarge, sendTimedOut; import danode.router : Router, runRequest; -import danode.response : Response, setPayload; +import danode.response : Response; import danode.request : Request; -import danode.payload : PayloadType; import danode.log : log, tag, Level; -immutable size_t MAX_HEADER_SIZE = 1024 * 32; // 32KB Header -immutable size_t MAX_REQUEST_SIZE = 1024 * 1024 * 2; // 2MB Body -immutable size_t MAX_UPLOAD_SIZE = 1024 * 1024 * 100; // 100MB Multipart uploads -immutable size_t MAX_SSE_TIME = 60_000; // 60 seconds max SSE lifetime +immutable size_t MAX_HEADER_SIZE = 1024 * 32; /// 32KB Header +immutable size_t MAX_REQUEST_SIZE = 1024 * 1024 * 2; /// 2MB Body +immutable size_t MAX_UPLOAD_SIZE = 1024 * 1024 * 100; /// 100MB Multipart uploads +immutable size_t MAX_SSE_TIME = 60_000; /// 60 seconds max SSE lifetime class Client : ClientInterface { private: @@ -28,7 +26,6 @@ class Client : ClientInterface { public: this(Router router, DriverInterface driver, long maxtime = 5000) { - log(Level.Trace, "client constructor"); this.router = router; this.driver = driver; this.maxtime = maxtime; @@ -37,7 +34,7 @@ class Client : ClientInterface { final void run() { log(Level.Trace, "New connection established %s:%d", ip(), port() ); try { - if (driver.openConnection() == false) { log(Level.Verbose, "WARN: Unable to open connection"); stop(); } + if (!driver.openConnection()) { log(Level.Verbose, "WARN: Unable to open connection"); return; } Request request; Response response; scope (exit) { @@ -48,11 +45,11 @@ class Client : ClientInterface { while (running) { if (driver.receive(driver.socket) > 0) { // We've received new data if (!driver.hasHeader()) { - if (driver.inbuffer.data.length > MAX_HEADER_SIZE) { driver.setHeaderTooLarge(response); stop(); continue; } + if (driver.inbuffer.data.length > MAX_HEADER_SIZE) { driver.sendHeaderTooLarge(response); stop(); continue; } } else { - if (driver.endOfHeader > MAX_HEADER_SIZE) { driver.setHeaderTooLarge(response); stop(); continue; } + if (driver.endOfHeader > MAX_HEADER_SIZE) { driver.sendHeaderTooLarge(response); stop(); continue; } size_t limit = (driver.header.indexOf("multipart/") >= 0) ? MAX_UPLOAD_SIZE : MAX_REQUEST_SIZE; - if (driver.inbuffer.data.length > limit) { driver.setPayloadTooLarge(response); stop(); continue; } + if (driver.inbuffer.data.length > limit) { driver.sendPayloadTooLarge(response); stop(); continue; } } // Parse the data and try to create a response (Could fail multiple times) if (!response.ready) { router.route(driver, request, response, maxtime); } @@ -75,7 +72,7 @@ class Client : ClientInterface { } if (lastmodified >= maxtime) { // Client are not allowed to be silent for more than maxtime log(Level.Trace, "inactivity: %s > %s", lastmodified, maxtime); - driver.setTimedOut(response); + driver.sendTimedOut(response); stop(); continue; } log(Level.Trace, "Connection %s:%s (%s msecs) %s", ip, port, starttime, to!string(driver.inbuffer.data)); @@ -96,15 +93,10 @@ class Client : ClientInterface { atomicStore(terminated, true); } - // Number of requests served final @property long requests() const { return(driver ? driver.requests : 0); } - // Start time of the client in mseconds (stored in the connection driver) final @property long starttime() const { return(driver.starttime); } - // When was the client last modified in mseconds (stored in the connection driver) final @property long lastmodified() const { return(driver.lastmodified); } - // Port of the client final @property long port() const { return(driver.port()); } - // ip address of the client final @property string ip() const { return(driver.ip()); } } @@ -118,25 +110,6 @@ void log(in ClientInterface cl, in Request rq, in Response rs) { "%s %s:%s %s%s [%d] %.1fkb in %s ms ", htmltime(), cl.ip, cl.port, rq.shorthost, uri.replace("%", "%%"), cl.requests, bytes/1024f, ms); } -// serve a 408 connection timed out page -void setTimedOut(ref DriverInterface driver, ref Response response) { - if(response.payload && response.payload.type == PayloadType.Script){ to!CGI(response.payload).notifyovertime(); } - response.setPayload(StatusCode.TimedOut, "408 - Connection Timed Out\n", "text/plain"); - driver.send(response, driver.socket); -} - -// serve a 431 request header fields too large page -void setHeaderTooLarge(ref DriverInterface driver, ref Response response) { - response.setPayload(StatusCode.HeaderFieldsTooLarge, "431 - Request Header Fields Too Large\n", "text/plain"); - driver.send(response, driver.socket); -} - -// serve a 413 payload too large page -void setPayloadTooLarge(ref DriverInterface driver, ref Response response) { - response.setPayload(StatusCode.PayloadTooLarge, "413 - Payload Too Large\n", "text/plain"); - driver.send(response, driver.socket); -} - unittest { tag(Level.Always, "FILE", "%s", __FILE__); diff --git a/danode/interfaces.d b/danode/interfaces.d index 5015685..2910828 100644 --- a/danode/interfaces.d +++ b/danode/interfaces.d @@ -3,8 +3,11 @@ module danode.interfaces; import danode.imports; + +import danode.cgi : CGI; import danode.functions : Msecs, sISelect, bodystart, endofheader, fullheader; -import danode.response : Response; +import danode.payload : PayloadType; +import danode.response : Response, setPayload; import danode.statuscode : StatusCode; import danode.log : log, error, Level; @@ -62,32 +65,17 @@ abstract class DriverInterface { } long receiveData(ref char[] buffer); - bool openConnection(); /// Open the connection - void closeConnection(); /// Close the connection - @nogc bool isSecure() const nothrow; /// Are we secure ? + bool openConnection(); + void closeConnection(); + @nogc bool isSecure() const nothrow; // Send upto maxsize bytes from the response to the client void send(ref Response response, Socket conn, ptrdiff_t maxsize = 4096); - // port being used for communication - final @property long port() const { - if (address !is null) return(to!long(address.toPortString())); - return(-1); - } - - // IP address connected to - final @property string ip() const { - if (address !is null) return(address.toAddrString()); - return("0.0.0.0"); - } - - // Milliseconds since start of connection + final @property long port() const { if (address !is null){ return(to!long(address.toPortString())); } return(-1); } + final @property string ip() const { if (address !is null){ return(address.toAddrString()); } return("0.0.0.0"); } final @property long starttime() const { return(Msecs(systime)); } - - // Milliseconds since last modified final @property long lastmodified() const { return(Msecs(modtime)); } - - // Byte input converted to header as string final @property string header() const { return(fullheader(inbuffer.data)); } // Byte input converted to body as string @@ -96,16 +84,30 @@ abstract class DriverInterface { return(to!string(inbuffer.data[bodyStart() .. $])); } - // Where does the HTTP request header end ? final @property ptrdiff_t endOfHeader() const { return(endofheader(inbuffer.data)); } - - // Where does the HTTP request body begin ? final @property ptrdiff_t bodyStart() const { return(bodystart(inbuffer.data)); } - - // Do we have a header separator ? "\r\n\r\n" or "\n\n" final @property bool hasHeader() const { return(endOfHeader > 0); } } +// serve a 408 connection timed out page +void sendTimedOut(ref DriverInterface driver, ref Response response) { + if(response.payload && response.payload.type == PayloadType.Script){ to!CGI(response.payload).notifyovertime(); } + response.setPayload(StatusCode.TimedOut, "408 - Connection Timed Out\n", "text/plain"); + driver.send(response, driver.socket); +} + +// serve a 431 request header fields too large page +void sendHeaderTooLarge(ref DriverInterface driver, ref Response response) { + response.setPayload(StatusCode.HeaderFieldsTooLarge, "431 - Request Header Fields Too Large\n", "text/plain"); + driver.send(response, driver.socket); +} + +// serve a 413 payload too large page +void sendPayloadTooLarge(ref DriverInterface driver, ref Response response) { + response.setPayload(StatusCode.PayloadTooLarge, "413 - Payload Too Large\n", "text/plain"); + driver.send(response, driver.socket); +} + class StringDriver : DriverInterface { public: StatusCode lastStatus; From fe938e71525075caa6f237c818ae047578c753f5 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 16:56:52 +0000 Subject: [PATCH 08/34] Minor: @property void xyz(){ } is unusual --- danode/client.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/danode/client.d b/danode/client.d index b461d92..1d47865 100644 --- a/danode/client.d +++ b/danode/client.d @@ -88,7 +88,7 @@ class Client : ClientInterface { final @property bool running() const { return(!atomicLoad(terminated) && driver.socketReady()); } // Stop the client by setting the terminated flag - final @property void stop() { + final void stop() { log(Level.Trace, "Connection %s:%s stop called", ip, port); atomicStore(terminated, true); } From 1e04a980533a3358d0c18ed35250e0e185f43e03 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:03:07 +0000 Subject: [PATCH 09/34] Moving log into client, dropping some cl. --- danode/client.d | 28 ++++++++++++++-------------- danode/interfaces.d | 16 +--------------- danode/server.d | 6 +++--- 3 files changed, 18 insertions(+), 32 deletions(-) diff --git a/danode/client.d b/danode/client.d index 1d47865..8e35f1e 100644 --- a/danode/client.d +++ b/danode/client.d @@ -6,7 +6,7 @@ import danode.imports; import danode.statuscode : StatusCode; import danode.functions: htmltime, Msecs; -import danode.interfaces : DriverInterface, ClientInterface, StringDriver, sendHeaderTooLarge, sendPayloadTooLarge, sendTimedOut; +import danode.interfaces : DriverInterface, StringDriver, sendHeaderTooLarge, sendPayloadTooLarge, sendTimedOut; import danode.router : Router, runRequest; import danode.response : Response; import danode.request : Request; @@ -17,7 +17,7 @@ immutable size_t MAX_REQUEST_SIZE = 1024 * 1024 * 2; /// 2MB Body immutable size_t MAX_UPLOAD_SIZE = 1024 * 1024 * 100; /// 100MB Multipart uploads immutable size_t MAX_SSE_TIME = 60_000; /// 60 seconds max SSE lifetime -class Client : ClientInterface { +class Client { private: Router router; /// Router class from server DriverInterface driver; /// Driver @@ -63,7 +63,7 @@ class Client : ClientInterface { } if (response.ready && response.completed) { // We've completed the request, response cycle driver.requests++; - if(response.keepalive) { this.log(request, response); } + if(response.keepalive) { logRR(request, response); } request.clearUploadFiles(); // Clean uploaded files driver.inbuffer.clear(); // Clear the input buffer if(!response.keepalive){ stop(); continue; } // No keep alive, then stop this client @@ -77,13 +77,23 @@ class Client : ClientInterface { } log(Level.Trace, "Connection %s:%s (%s msecs) %s", ip, port, starttime, to!string(driver.inbuffer.data)); } - this.log(request, response); + logRR(request, response); } catch(Exception e) { log(Level.Verbose, "Unknown Client Exception: %s", e); stop(); } catch(Error e) { log(Level.Verbose, "Unknown Client Error: %s", e); stop(); } log(Level.Verbose, "Connection %s:%s (%s) closed. %d requests %s (%s msecs)", ip, port, (driver.isSecure() ? "SSL" : "HTTP"), driver.requests, driver.senddata, starttime); } + void logRR(in Request rq, in Response rs) { + string uri; + try { uri = decodeComponent(rq.uri); } catch (Exception e) { uri = rq.uri; } + long bytes = (rs.payload && rs.isRange) ? (rs.rangeEnd - rs.rangeStart + 1) : (rs.payload ? rs.payload.length : 0); + int code = cast(int)(rs.payload ? rs.statuscode.code : 0); + long ms = rq.starttime == SysTime.init ? -1 : Msecs(rq.starttime); + tag(Level.Always, format("%d", code), + "%s %s:%s %s%s [%d] %.1fkb in %s ms ", htmltime(), ip, port, rq.shorthost, uri.replace("%", "%%"), requests, bytes/1024f, ms); + } + // Is the client still running, if the socket was gone it's not otherwise check the terminated flag final @property bool running() const { return(!atomicLoad(terminated) && driver.socketReady()); } @@ -100,16 +110,6 @@ class Client : ClientInterface { final @property string ip() const { return(driver.ip()); } } -void log(in ClientInterface cl, in Request rq, in Response rs) { - string uri; - try { uri = decodeComponent(rq.uri); } catch (Exception e) { uri = rq.uri; } - long bytes = (rs.payload && rs.isRange) ? (rs.rangeEnd - rs.rangeStart + 1) : (rs.payload ? rs.payload.length : 0); - int code = cast(int)(rs.payload ? rs.statuscode.code : 0); - long ms = rq.starttime == SysTime.init ? -1 : Msecs(rq.starttime); - tag(Level.Always, format("%d", code), - "%s %s:%s %s%s [%d] %.1fkb in %s ms ", htmltime(), cl.ip, cl.port, rq.shorthost, uri.replace("%", "%%"), cl.requests, bytes/1024f, ms); -} - unittest { tag(Level.Always, "FILE", "%s", __FILE__); diff --git a/danode/interfaces.d b/danode/interfaces.d index 2910828..76cdc08 100644 --- a/danode/interfaces.d +++ b/danode/interfaces.d @@ -11,26 +11,12 @@ import danode.response : Response, setPayload; import danode.statuscode : StatusCode; import danode.log : log, error, Level; -/* Client interface used by the server */ -interface ClientInterface { - @property bool running(); /// Is the client still handling requests - @property long starttime(); /// When was the client last started - @property long lastmodified(); /// When was the client last modified - @property void stop(); /// Stop the client - - @property long requests() const; /// Number of requests served - @property string ip() const; /// IP location of the client - @property long port() const; /// Port at which the client is connected - - void run(); /// Main client loop and logic -} - /* Connection/Driver interface available to the client */ abstract class DriverInterface { public: Appender!(char[]) inbuffer; /// Input appender buffer Socket socket; /// Client socket for reading and writing - SocketSet socketSet; + SocketSet socketSet; /// SocketSet used for non-blocking select on this connection long requests = 0; /// Number of requests we handled long[long] senddata; /// Size of data send per request SysTime systime; /// Time in ms since this process came alive diff --git a/danode/server.d b/danode/server.d index 1499f6d..062be1d 100644 --- a/danode/server.d +++ b/danode/server.d @@ -35,8 +35,8 @@ class Server { public: this(ushort port = 80, int backlog = 100, string wwwFolder = "www/", string sslFolder = ".ssl/", string sslKey = "server.key", string accountKey = "account.key") { - starttime = Clock.currTime(); // Start the timer - socket = initialize(port, backlog); // Create the HTTP socket + starttime = Clock.currTime(); // Start the timer + socket = initialize(port, backlog); // Create the HTTP socket pool = new WorkerPool(new Router(wwwFolder, socket.localAddress())); version(SSL) { sslPath = sslFolder.resolveFolder(); @@ -44,7 +44,7 @@ class Server { account = accountKey; sslsocket = initialize(443, backlog); // Create the SSL / HTTPs socket } - set = new SocketSet(1); // Create a server socket set + set = new SocketSet(); log(Level.Always, "Server '%s' created backlog: %d", this.hostname(), backlog); } From 20f98d255fba68a3f051c5cbb3337a19ffd6edca Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:14:55 +0000 Subject: [PATCH 10/34] Clean shutdown, minot code cleaning --- danode/client.d | 6 +++--- danode/server.d | 12 ++++++------ danode/signals.d | 10 +++++++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/danode/client.d b/danode/client.d index 8e35f1e..764e771 100644 --- a/danode/client.d +++ b/danode/client.d @@ -63,7 +63,7 @@ class Client { } if (response.ready && response.completed) { // We've completed the request, response cycle driver.requests++; - if(response.keepalive) { logRR(request, response); } + if(response.keepalive) { logConnection(request, response); } request.clearUploadFiles(); // Clean uploaded files driver.inbuffer.clear(); // Clear the input buffer if(!response.keepalive){ stop(); continue; } // No keep alive, then stop this client @@ -77,14 +77,14 @@ class Client { } log(Level.Trace, "Connection %s:%s (%s msecs) %s", ip, port, starttime, to!string(driver.inbuffer.data)); } - logRR(request, response); + logConnection(request, response); } catch(Exception e) { log(Level.Verbose, "Unknown Client Exception: %s", e); stop(); } catch(Error e) { log(Level.Verbose, "Unknown Client Error: %s", e); stop(); } log(Level.Verbose, "Connection %s:%s (%s) closed. %d requests %s (%s msecs)", ip, port, (driver.isSecure() ? "SSL" : "HTTP"), driver.requests, driver.senddata, starttime); } - void logRR(in Request rq, in Response rs) { + void logConnection(in Request rq, in Response rs) { string uri; try { uri = decodeComponent(rq.uri); } catch (Exception e) { uri = rq.uri; } long bytes = (rs.payload && rs.isRange) ? (rs.rangeEnd - rs.rangeStart + 1) : (rs.payload ? rs.payload.length : 0); diff --git a/danode/server.d b/danode/server.d index 062be1d..1693a08 100644 --- a/danode/server.d +++ b/danode/server.d @@ -3,12 +3,14 @@ module danode.server; import danode.imports; + +import danode.log : cv, abort, log, tag, error, Level; import danode.functions : Msecs, sISelect, resolveFolder; import danode.interfaces : DriverInterface; import danode.http : HTTP; import danode.router : Router; +import danode.signals : shutdownSignal; import danode.workerpool : WorkerPool; -import danode.log : cv, abort, log, tag, error, Level; version(SSL) { import danode.acme : checkAndRenew; @@ -82,14 +84,11 @@ class Server { } // Stop the pool and shutdown the server - final void stop(){ pool.stop(); } + final void stop() { pool.stop(); socket.close(); version(SSL) { sslsocket.closeSSL(); } } - // Returns a Duration object holding the server uptime + // Returns a Duration object holding the server uptime final @property Duration uptime() const { return(Clock.currTime() - starttime); } - // Print some server information - final @property void info() { log(Level.Always, "Uptime %s, Connections: %d / queued: %d", uptime, pool.nAlive, pool.queued); } - // Hostname of the server final @property string hostname() { return(socket.hostName()); } version(SSL) { @@ -98,6 +97,7 @@ class Server { } @property bool alive() { + if (shutdownSignal) return false; version(SSL) { return(socket.isAlive() && sslsocket.isAlive()); } else { return(socket.isAlive()); } } diff --git a/danode/signals.d b/danode/signals.d index 3864d69..5bcf6a1 100644 --- a/danode/signals.d +++ b/danode/signals.d @@ -2,11 +2,13 @@ * License: GPLv3 (https://github.com/DannyArends/DaNode) - Danny Arends **/ module danode.signals; +__gshared bool shutdownSignal = false; + version(Posix) { import danode.imports; import core.sys.posix.sys.resource; - import core.sys.posix.signal : signal, SIGPIPE; + import core.sys.posix.signal : signal, SIGPIPE, SIGTERM, SIGINT; import core.sys.posix.unistd : write; import danode.log : cv, log, Level; @@ -16,6 +18,10 @@ version(Posix) { case SIGPIPE: if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] Broken pipe caught, and ignored\n\0".ptr, 41); break; + case SIGTERM, SIGINT: + if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] SIGTERM/SIGINT\n\0".ptr, 24); + atomicStore(shutdownSignal, true); + break; default: if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] Caught\n\0".ptr, 17); break; @@ -29,6 +35,8 @@ version(Posix) { auto res = setrlimit(RLIMIT_NOFILE, &rl); log(Level.Always, "FD limit: %d [%d]", rl.rlim_cur, res); signal(SIGPIPE, &handleSignal); + signal(SIGTERM, &handleSignal); + signal(SIGINT, &handleSignal); } } From fd121f410049bb5b39cf2be5e85a67f175835b57 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:18:58 +0000 Subject: [PATCH 11/34] Call stop(), drain workers, stop socket & sslsocket --- danode/server.d | 5 ++--- danode/signals.d | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/danode/server.d b/danode/server.d index 1693a08..bcda6da 100644 --- a/danode/server.d +++ b/danode/server.d @@ -97,7 +97,7 @@ class Server { } @property bool alive() { - if (shutdownSignal) return false; + if (atomicLoad!(shutdownSignal)) return false; version(SSL) { return(socket.isAlive() && sslsocket.isAlive()); } else { return(socket.isAlive()); } } @@ -116,8 +116,7 @@ class Server { } catch(Exception e) { error("Unspecified top level server exception: %s", e.msg); } catch(Error e) { error("Unspecified top level server error: %s", e.msg); } } - socket.close(); - version (SSL) { sslsocket.closeSSL(); } + stop(); } } diff --git a/danode/signals.d b/danode/signals.d index 5bcf6a1..6b40625 100644 --- a/danode/signals.d +++ b/danode/signals.d @@ -23,7 +23,7 @@ version(Posix) { atomicStore(shutdownSignal, true); break; default: - if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] Caught\n\0".ptr, 17); + if(atomicLoad(cv) > 1) write(2, cast(const(void*)) "[SIG] Caught\n\0".ptr, 16); break; } } From d8f60c9af13d354329fc88c5d41d1aadc2111da3 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:27:43 +0000 Subject: [PATCH 12/34] Minor tweaks --- danode/server.d | 4 ++-- danode/signals.d | 4 ++-- danode/workerpool.d | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/danode/server.d b/danode/server.d index bcda6da..478dd3f 100644 --- a/danode/server.d +++ b/danode/server.d @@ -97,7 +97,7 @@ class Server { } @property bool alive() { - if (atomicLoad!(shutdownSignal)) return false; + if (atomicLoad(shutdownSignal)) return false; version(SSL) { return(socket.isAlive() && sslsocket.isAlive()); } else { return(socket.isAlive()); } } @@ -147,7 +147,7 @@ void main(string[] args) { loadSSL(server.sslPath, server.sslKey); // Load SSL certificates checkAndRenew(server.sslPath, server.sslKey, server.accountKey); // checkAndRenew SSL certificates } - server.run(); + return(server.run()); } unittest { diff --git a/danode/signals.d b/danode/signals.d index 6b40625..57f216f 100644 --- a/danode/signals.d +++ b/danode/signals.d @@ -1,8 +1,8 @@ -/** danode/signals.d - POSIX signal handling: SIGPIPE suppression +/** danode/signals.d - POSIX signal handling: SIGPIPE suppression & clean shutdown via SIGTERM, SIGINT * License: GPLv3 (https://github.com/DannyArends/DaNode) - Danny Arends **/ module danode.signals; -__gshared bool shutdownSignal = false; +shared bool shutdownSignal = false; version(Posix) { import danode.imports; diff --git a/danode/workerpool.d b/danode/workerpool.d index afc5881..f10dec7 100644 --- a/danode/workerpool.d +++ b/danode/workerpool.d @@ -62,7 +62,7 @@ class WorkerPool { synchronized(mutex) { if (stopped) { return; } stopped = true; } foreach (i; 0 .. POOL_SIZE) sem.notify(); // wake all workers to exit foreach (t; workers) t.join(); - log(Level.Always, "WorkerPool stopped"); + log(Level.Trace, "WorkerPool stopped"); } private: From 0f3be7ca402e4345e288ffd3c4cebef49b55753c Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:30:02 +0000 Subject: [PATCH 13/34] Style --- danode/server.d | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/danode/server.d b/danode/server.d index 478dd3f..8165a0a 100644 --- a/danode/server.d +++ b/danode/server.d @@ -74,7 +74,7 @@ class Server { DriverInterface driver = null; if (!secure) driver = new HTTP(accepted, false); version(SSL) { if (secure) driver = new HTTPS(accepted, false); } - if (driver is null) { return(accepted.close()); } + if (driver is null) { accepted.close(); return; } if (!pool.push(driver, ip, isLoopback)) { log(Level.Always, "Rate limit or capacity exceeded [%s]", ip); driver.closeConnection(); From 4685659adeebc62d431c7d4721d7c4607119efe6 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:35:35 +0000 Subject: [PATCH 14/34] Fix a hypothetical symlink swapping attack on shared hosting --- danode/functions.d | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/danode/functions.d b/danode/functions.d index 5d2320e..cc08ebf 100644 --- a/danode/functions.d +++ b/danode/functions.d @@ -49,11 +49,14 @@ string safePath(in string root, in string path) { if (path.canFind("\0")) return null; string full = root ~ (path.startsWith("/") ? path : "/" ~ path); try { + string absroot = root.resolve(); + if (!absroot.endsWith("/")) absroot ~= "/"; if (exists(full)) { string resolved = full.resolve(); - string absroot = root.resolve(); - if (!absroot.endsWith("/")) absroot ~= "/"; if (resolved != absroot[0..$-1] && !resolved.startsWith(absroot)) return null; + } else { + string parent = dirName(full).resolve(); + if (parent != absroot[0..$-1] && !parent.startsWith(absroot)) return null; } } catch (Exception e) { return null; } return full; From 1a1836ebaff8ccf1a9e62d55033ab288e7a8fee9 Mon Sep 17 00:00:00 2001 From: DannyArends Date: Thu, 19 Mar 2026 17:37:24 +0000 Subject: [PATCH 15/34] Add a unit test for safePath --- danode/functions.d | 1 + 1 file changed, 1 insertion(+) diff --git a/danode/functions.d b/danode/functions.d index cc08ebf..07076b6 100644 --- a/danode/functions.d +++ b/danode/functions.d @@ -210,6 +210,7 @@ unittest { assert(safePath("www/localhost", "/\0etc/passwd") is null, "null byte must be blocked"); assert(safePath("www/localhost", "/test.txt") !is null, "valid path must be allowed"); assert(safePath("www/localhost", "/test/1.txt") !is null, "valid subpath must be allowed"); + assert(safePath("www/localhost", "/nonexistent.txt") !is null, "non-existent valid path must be allowed"); // htmlEscape - XSS critical assert(htmlEscape("