From b6c4ffae9895be8b291b79da05d65fc3a96a7b18 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 14:11:05 -0700 Subject: [PATCH 1/6] move *local-courier* into separate file to avoid circularity --- aether.asd | 1 + src/courier.lisp | 2 -- src/params.lisp | 8 ++++++++ 3 files changed, 9 insertions(+), 2 deletions(-) create mode 100644 src/params.lisp diff --git a/aether.asd b/aether.asd index b952691..bbc4aa4 100644 --- a/aether.asd +++ b/aether.asd @@ -19,6 +19,7 @@ (funcall compile))) :serial t :components ((:file "package") + (:file "params") (:file "utilities") (:file "queue") (:file "cheap-heap") diff --git a/src/courier.lisp b/src/courier.lisp index 5e70fc7..2506737 100644 --- a/src/courier.lisp +++ b/src/courier.lisp @@ -10,8 +10,6 @@ ;;; `*LOCAL-COURIER*'. ;;; -(defparameter *local-courier* nil - "Bound to the `COURIER' that services this process.") (defparameter *courier-processing-clock-rate* 100) (defparameter *routing-time-step* 1/100) diff --git a/src/params.lisp b/src/params.lisp new file mode 100644 index 0000000..ef9d4d3 --- /dev/null +++ b/src/params.lisp @@ -0,0 +1,8 @@ +;;;; params.lisp +;;;; +;;;; some global definitions to cope with file circularity + +(in-package #:aether) + +(defparameter *local-courier* nil + "Bound to the `COURIER' that services this process.") From d1611cc2f385d94fa7a286b47270722c10696d58 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 14:11:30 -0700 Subject: [PATCH 2/6] remove define-message-dispatch --- src/process/cast.lisp | 18 ++-- src/process/emissary.lisp | 10 +- src/process/lock.lisp | 3 +- src/process/process.lisp | 147 ++++++++++++---------------- src/process/rpc.lisp | 27 +++-- tests/cast.lisp | 37 ++----- tests/examples/coloring.lisp | 20 +--- tests/examples/data-frame.lisp | 12 +-- tests/examples/distributed-mst.lisp | 41 ++------ tests/examples/flooding.lisp | 10 +- tests/examples/lock.lisp | 10 +- tests/examples/tree-coloring.lisp | 6 +- tests/examples/tree-operations.lisp | 18 +--- tests/process.lisp | 8 +- tests/recursive-lock.lisp | 11 +-- 15 files changed, 126 insertions(+), 252 deletions(-) diff --git a/src/process/cast.lisp b/src/process/cast.lisp index 20fe7a0..e823cf2 100644 --- a/src/process/cast.lisp +++ b/src/process/cast.lisp @@ -23,8 +23,7 @@ (declare (ignore aborting? handle-rts? message targets)) (error "PUSH-BROADCAST-FRAME only available in DEFINE-BROADCAST-HANDLER.")) -(defmacro define-broadcast-handler (handler-name - ((process process-type) (message message-type)) +(defmacro define-broadcast-handler (((process process-type) (message message-type)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `BROADCAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `BROADCAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-BROADCAST-HANDLER' we have access to a pair of helper functions: @@ -33,8 +32,7 @@ WARNING: `RETURN-FROM-CAST' calls `PUSH-BROADCAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." (a:with-gensyms (broadcast-frame reply-channel) - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type)) (let ((,broadcast-frame nil) (,message (copy-message message)) (,reply-channel (message-reply-channel ,message))) @@ -164,8 +162,7 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t (process-continuation ,process `(CONVERGECAST)) ,@body))))) -(defmacro define-convergecast-handler (handler-name - ((process process-type) +(defmacro define-convergecast-handler (((process process-type) (message message-type)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `CONVERGECAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `CONVERGECAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-CONVERGECAST-HANDLER' we have access to a pair of helper functions: @@ -174,14 +171,12 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t 2. `RETURN-FROM-CAST', which allows the user to terminate the convergecast operation early by sending up an acknowledgement (optionally specifying its contents) to the original sender of `MESSAGE'. It is recommended that a value is provided when returning from a convergecast, as it will be passed to a function (the function provided to the `CONVERGECAST-FRAME') when received by the original sender. WARNING: `RETURN-FROM-CAST' calls `PUSH-CONVERGECAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type)) ,@(%convergecast-body process message body))) ;; cf. define-message-subordinate (defmacro define-convergecast-subordinate - (handler-name - ((process process-type) + (((process process-type) (message message-type)) &body body) "Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process. @@ -189,8 +184,7 @@ WARNING: `RETURN-FROM-CAST' calls `PUSH-CONVERGECAST-FRAME' as part of the abort This macro mimics DEFINE-CONVERGECAST-HANDLER while setting up this manner of parallel execution." (a:with-gensyms (command servicer subprocess) `(progn - (define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + (define-message-handler ((,process ,process-type) (,message ,message-type)) (let ((,servicer (spawn-process 'process-message-emissary))) (schedule ,servicer (now)) (setf (process-command-stack ,servicer) (list (list ',command ,process ,message)) diff --git a/src/process/emissary.lisp b/src/process/emissary.lisp index e0b3d00..30090d6 100644 --- a/src/process/emissary.lisp +++ b/src/process/emissary.lisp @@ -8,25 +8,19 @@ () (:documentation "Dummy PROCESS used to host nonblocking message handlers.")) -(define-message-dispatch process-message-emissary - ;; no replies - ) - ;; TODO: consider rolling this together with DEFINE-MESSAGE-HANDLER. is the ;; gain in emulation efficiency really worth it? is there any potential ;; for nasty side effects? ;; TODO: unclear that this plays well with message tracing. (defmacro define-message-subordinate - (handler-name - ((process process-type) (message message-type)) + (((process process-type) (message message-type)) &body body) "Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process. This macro mimics DEFINE-MESSAGE-HANDLER while setting up this manner of parallel execution." (a:with-gensyms (command servicer subprocess) `(progn - (define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + (define-message-handler ((,process ,process-type) (,message ,message-type)) (let ((,servicer (spawn-process 'process-message-emissary))) (schedule ,servicer (now)) (setf (process-command-stack ,servicer) (list (list ',command ,process ,message)) diff --git a/src/process/lock.lisp b/src/process/lock.lisp index 1faa85c..13bafed 100644 --- a/src/process/lock.lisp +++ b/src/process/lock.lisp @@ -125,8 +125,7 @@ "Sent to release a lock." (result)) -(define-message-handler handle-message-lock - ((process process-lockable) (message message-lock)) +(define-message-handler ((process process-lockable) (message message-lock)) "Attempts to lock PROCESS." (with-slots (reply-channel) message (with-slots (aborting? locked?) process diff --git a/src/process/process.lisp b/src/process/process.lisp index b1a4744..7df12b8 100644 --- a/src/process/process.lisp +++ b/src/process/process.lisp @@ -106,90 +106,71 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) (error "Undefined command ~a for server of type ~a." command (type-of server)))) -(defgeneric %message-dispatch (node message) - (:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here.")) - -(defun finish-handler () - "Return from the active aether process handler." +(defgeneric %handle-process-message (process message) + (:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here.") + (:method (process message) + "Bottom message handler: marks the message as not having been handled." + ;; this is *not* implemented via DEFINE-MESSAGE-HANDLER bc we don't want to + ;; write the log entry. + nil)) + +(defun finish-handler (&optional (handled t)) + "Return from the active aether process handler. If `HANDLED' is high, then the handler is understood as having taken care of the message, which is dropped from the inbox." + (declare (ignore handled)) (error () "Not available outside of an aether process handler.")) -(defmacro define-message-handler (handler-name ((process process-type) (message message-type)) &body body) - "Defines a function to be invoked by DEFINE-MESSAGE-DISPATCH." +(defmacro define-message-handler + ((process-and-process-type message-and-message-type + &key (guard nil guard-p)) + &body body) + "Defines a passive message handler, which walks the process mailbox at the start of every tick." (multiple-value-bind (body decls documentation) (a:parse-body body :documentation t) - `(defmethod ,handler-name ((,process ,process-type) - (,message ,message-type)) - ,@decls - ,@(list documentation) - (check-type ,message ,message-type) - (check-type ,process ,process-type) - (macrolet ((finish-handler () '(return-from ,handler-name))) - (flet ((log-entry (&rest initargs) - (when (process-debug? ,process) - (apply #'log-entry - (append initargs - (list :log-level 0 - :time (now) - :source ,process)))))) - (flet ((send-message (destination payload) - (log-entry :entry-type ':send-message - :destination destination - :payload (copy-structure payload)) - (send-message destination payload))) - (declare (ignorable #'send-message)) - ,@body)))))) - -(define-message-handler handle-message-RTS - ((process process) (message message-RTS)) + (destructuring-bind ((process process-type) (message message-type)) + (list (if (consp process-and-process-type) + process-and-process-type + (list process-and-process-type t)) + (if (consp message-and-message-type) + message-and-message-type + (list message-and-message-type t))) + (a:with-gensyms (handled) + `(defmethod %handle-process-message ((,process ,process-type) + (,message ,message-type)) + ,@(list documentation) + ,@decls + (flet ((finish-handler (&optional (,handled t)) + (return-from %handle-process-message ,handled)) + (log-entry (&rest initargs) + (when (process-debug? ,process) + (apply #'log-entry + (append initargs + (list :log-level 0 + :time (now) + :source ,process)))))) + (declare (ignorable #'finish-handler)) + (flet ((send-message (destination payload) + (log-entry :entry-type ':send-message + :destination destination + :payload (copy-structure payload)) + (send-message destination payload))) + (declare (ignorable #'send-message)) + ,(if guard-p `(unless ,guard (finish-handler (call-next-method)))) + (when (process-debug? ,process) + (log-entry :time (now) + :entry-type ':handler-invoked + :source ,process + :message-id (message-message-id ,message) + :payload-type ',message-type + :log-level 0)) + ,@body + (finish-handler t)))))))) + +#+#:ignore +(define-message-handler ((process process) (message message-RTS)) "Default handler for when a `PROCESS' receives a `MESSAGE-RTS'. Throws an error." (error "Got an RTS")) -(defmacro define-message-dispatch (node-type &body clauses) - "Defines \"automatic\" message handlers associated to a particular subtype of `PROCESS'. Each handler is specified by a tuple of form (MESSAGE-TYPE MESSAGE-HANDLER &OPTIONAL GUARD). As with `RECEIVE-MESSAGE', each clause is processed in turn, according to the following rules: - - + If supplied, `GUARD' is evaluated with the `PROCESS' in question bound to the place `PROCESS-TYPE'. If `GUARD' evaluates to NIL, proceed to the next clause. - + Check the message queue at the public address for an item of type `MESSAGE-TYPE'. If such a message is found, call the associated `MESSAGE-HANDLER' with lambda triple (PROCESS MESSAGE TIME). Otherwise, proceed to the next clause. - -There is one exception: (CALL-NEXT-METHOD) is also a legal clause, and it references to the message handling table installed via DEFINE-MESSAGE-DISPATCH on this process type's parent. - -NOTES: - + If no clause is matched, execution proceeds to the semantics specified by `DEFINE-PROCESS-UPKEEP'. - + Automatically appends a `MESSAGE-RTS' clause which calls `HANDLE-MESSAGE-RTS' and results in an error. Because of this, we set `CATCH-RTS?' to NIL when processing clauses and building `RECEIVE-MESSAGE' blocks. Otherwise, it would be impossible to override the default handling of `MESSAGE-RTS'es. Additionally, this extra handler is _not_ inherited through (CALL-NEXT-METHOD). - -WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you will probably stall the underlying `PROCESS' if you perform some waiting action here, like the analogue of a `SYNC-RECEIVE'. See DEFINE-MESSAGE-SUBORDINATE for a workaround." - (a:with-gensyms (node message) - `(defmethod %message-dispatch ((,node ,node-type) ,message) - ,@(mapcar - (lambda (clause) - (cond - ((and (listp clause) - (= 1 (length clause)) - (symbolp (first clause)) - (string= "CALL-NEXT-METHOD" (symbol-name (first clause)))) - `(when (call-next-method) - (return-from %message-dispatch t))) - ((and (listp clause) - (member (length clause) '(2 3))) - (destructuring-bind (message-type receiver . rest) clause - `(when (and - (typep ,message ',message-type) - (let ((,node-type ,node)) - (declare (ignorable ,node-type)) - ,(or (first rest) t))) - (when (process-debug? ,node) - (log-entry :time (now) - :entry-type ':handler-invoked - :source ,node - :message-id (message-message-id ,message) - :payload-type ',message-type - :log-level 0)) - (funcall ,receiver ,node ,message) - (return-from %message-dispatch t)))) - (t - (error "Bad DEFINE-MESSAGE-DISPATCH clause: ~a" clause)))) - clauses)))) - -(defun message-dispatch (node) - "Use DEFINE-MESSAGE-DISPATCH to install methods here." +(defun handle-process-inbox (node) + "Use DEFINE-MESSAGE-HANDLER to install methods here." (let* ((address (process-public-address node)) (inbox (gethash (address-channel address) (courier-inboxes *local-courier*)))) @@ -197,9 +178,9 @@ WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you ((= 3 safety) (check-key-secret address)) ((> 3 safety) nil)) (doq (message inbox) - (when (%message-dispatch node message) + (when (%handle-process-message node message) (q-deq-first inbox (lambda (x) (eq x message))) - (return-from message-dispatch t)) + (return-from handle-process-inbox t)) (when (and (process-debug? node) (typep message 'message-RTS)) (log-entry :time (now) @@ -207,8 +188,8 @@ WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you :source node :message-id (message-message-id message) :payload-type (type-of message)) - (handle-message-RTS node message) - (return-from message-dispatch t))))) + (error "Got an RTS") + (return-from handle-process-inbox t))))) ;; TODO: DEFINE-DPU-MACRO and DEFINE-DPU-FLET don't check syntactic sanity at ;; their runtime, they wait for DEFINE-PROCESS-UPKEEP to discover it. @@ -345,7 +326,7 @@ Locally enables the use of the various functions and macro forms defined in dpu- (with-slots (process-key process-clock-rate process-exhaust-inbox?) process (let ((*local-courier* (process-courier process)) (active? nil)) - (when (message-dispatch process) + (when (handle-process-inbox process) (when process-exhaust-inbox? (schedule process (now)) (return))) diff --git a/src/process/rpc.lisp b/src/process/rpc.lisp index 70a40de..2ddbaee 100644 --- a/src/process/rpc.lisp +++ b/src/process/rpc.lisp @@ -5,18 +5,29 @@ (in-package #:aether) +(defun finish-rpc-handler (&optional return-value (handled t)) + "Early escape from DEFINE-RPC-HANDLER. Like FINISH-HANDLER, but with a retval." + (declare (ignore return-value handled)) + (error () "Cannot call FINISH-RPC-HANDLER outside of a DEFINE-RPC-HANDLER body.")) + ;; TODO: this traps RETURN-FROM, but not FINISH-WITH-SCHEDULING. -(defmacro define-rpc-handler (handler-name - ((process process-type) (message message-type)) +(defmacro define-rpc-handler (((process process-type) (message message-type) + &key (guard nil guard-p)) &body body) "Interrupt-based RPC handlers are expected to emit a reply to the caller. This macro augments DEFINE-MESSAGE-HANDLER to reply to the caller with the last evaluated form." - (a:with-gensyms (return-value reply-channel) - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) - (let (,return-value) - (setf ,return-value (block ,handler-name ,@body)) + (a:with-gensyms (block-name return-value reply-channel handled) + `(define-message-handler + ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) + (multiple-value-bind (,return-value ,handled) + (block ,block-name + (flet ((finish-handler (&optional ,return-value (,handled t)) + (return-from ,block-name (values ,return-value ,handled)))) + (declare (ignorable #'finish-handler)) + (values (progn ,@body) t))) (a:when-let ((,reply-channel (message-reply-channel ,message))) - (send-message ,reply-channel (make-message-rpc-done :result ,return-value))))))) + (send-message ,reply-channel (make-message-rpc-done :result ,return-value))) + (finish-handler ,handled))))) (defmacro sync-rpc (message (result-place-or-list destination diff --git a/tests/cast.lisp b/tests/cast.lisp index 5e937f2..a91cf6d 100644 --- a/tests/cast.lisp +++ b/tests/cast.lisp @@ -46,14 +46,12 @@ ;;; message handlers ;;; -(define-broadcast-handler handle-broadcast-test - ((process process-tree-cast-test) (message broadcast-test)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test)) "Pushes the `PROCESS's ID to a global list." (push (process-tree-id process) *broadcast-events*) (push-broadcast-frame :targets (process-tree-children process))) -(define-broadcast-handler handle-broadcast-test-abort - ((process process-tree-cast-test) (message broadcast-test-abort)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-abort)) "Pushes the `PROCESS's ID to a global list, and potentially aborts the broadcast if the `PROCESS's ID is equal to `ABORT-ID'." (with-slots (abort-id) message (push (process-tree-id process) *broadcast-events*) @@ -61,29 +59,25 @@ (when (= abort-id (process-tree-id process)) (return-from-cast)))) -(define-broadcast-handler handle-broadcast-test-script - ((process process-tree-cast-test) (message broadcast-test-script)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-script)) "Pushes the `:PUSH-TIMES' command onto the stack." (with-slots (scalar) message (process-continuation process `(PUSH-TIMES ,scalar)) (push-broadcast-frame :targets (process-tree-children process)))) -(define-broadcast-handler handle-broadcast-test-rts - ((process process-tree-cast-test) (message broadcast-test-rts)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-rts)) "Pushes the `PROCESS's ID to a global list." (push (process-tree-id process) *broadcast-events*) (push-broadcast-frame :targets (process-tree-children process) :handle-rts? t)) -(define-convergecast-handler handle-convergecast-test - ((process process-tree-cast-test) (message convergecast-test)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test)) "Puts the `PROCESS's ID as `INPUT' to the convergcast frame." (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce+ :input (process-tree-id process))) -(define-convergecast-handler handle-convergecast-test-abort - ((process process-tree-cast-test) (message convergecast-test-abort)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-abort)) "Puts the `PROCESS's ID as `INPUT' to the convergecast frame, unless it is equal to `ABORT-ID', which triggers a `RETURN-FROM-CAST' and thus an abort of the convergecast operation." (with-slots (abort-id) message (push-convergecast-frame :targets (process-tree-children process) @@ -92,8 +86,7 @@ (when (= abort-id (process-tree-id process)) (return-from-cast 0)))) -(define-convergecast-handler handle-convergecast-test-script - ((process process-tree-cast-test) (message convergecast-test-script)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-script)) "Pushes the `:SET-TIMES' command onto the stack." (with-slots (scalar) message (process-continuation process `(SET-TIMES ,scalar)) @@ -101,8 +94,7 @@ :func #'aether::reduce+ :input (process-tree-id process)))) -(define-convergecast-handler handle-convergecast-test-rts - ((process process-tree-cast-test) (message convergecast-test-rts)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-rts)) "Puts the `PROCESS's ID as `INPUT' to the convergcast frame. In order to handle RTSes gracefully, our `FUNC' must also handle NILs." (labels ((null+ (input replies) (loop :for value :in (list* input replies) @@ -134,19 +126,6 @@ "Kills the process." (process-die)) -;;; -;;; message dispatch - -(define-message-dispatch process-tree-cast-test - (broadcast-test 'handle-broadcast-test) - (broadcast-test-abort 'handle-broadcast-test-abort) - (broadcast-test-script 'handle-broadcast-test-script) - (broadcast-test-rts 'handle-broadcast-test-rts) - (convergecast-test 'handle-convergecast-test) - (convergecast-test-abort 'handle-convergecast-test-abort) - (convergecast-test-script 'handle-convergecast-test-script) - (convergecast-test-rts 'handle-convergecast-test-rts)) - ;;; ;;; test definitions ;;; diff --git a/tests/examples/coloring.lisp b/tests/examples/coloring.lisp index 4edbedb..c5607ca 100644 --- a/tests/examples/coloring.lisp +++ b/tests/examples/coloring.lisp @@ -49,8 +49,7 @@ (defstruct (message-color-query (:include message)) "An RPC request for a foreign node's current color value.") -(define-rpc-handler handle-message-color-query - ((process process-coloring) (message message-color-query)) +(define-rpc-handler ((process process-coloring) (message message-color-query)) "Responds with this node's current color value." (process-coloring-color process)) @@ -79,8 +78,7 @@ (old nil :type (or null address)) (new nil :type (or null address))) -(define-rpc-handler handle-message-swap-neighbor - ((process process-coloring) (message message-swap-neighbor)) +(define-rpc-handler ((process process-coloring) (message message-swap-neighbor)) "Handles a SWAP-NEIGHBOR message." (let ((new (message-swap-neighbor-new message)) (old (message-swap-neighbor-old message))) @@ -96,8 +94,7 @@ :test #'address=)))) (incf (process-coloring-workloads process)))) -(define-rpc-handler handle-message-inject - ((process process-coloring) (message message-inject)) +(define-rpc-handler ((process process-coloring) (message message-inject)) "Handles an INJECT message." (let ((neighbors (message-inject-neighbors message)) (address (process-public-address process))) @@ -116,8 +113,7 @@ (defstruct (message-kill (:include message)) "Instruct a node to remove itself from the line.") -(define-rpc-handler handle-message-kill - ((process process-coloring) (message message-kill)) +(define-rpc-handler ((process process-coloring) (message message-kill)) "Given a node, tell its neighbors to sew over it, restart their coloring processes, and stop this node." (let ((neighbors (process-coloring-neighbors process)) (address (process-public-address process))) @@ -132,14 +128,6 @@ (setf (process-coloring-workloads process) 0) t)) -;;; generic things - -(define-message-dispatch process-coloring - (message-color-query 'handle-message-color-query) - (message-kill 'handle-message-kill) - (message-inject 'handle-message-inject) - (message-swap-neighbor 'handle-message-swap-neighbor)) - ;;; ;;; tests ;;; diff --git a/tests/examples/data-frame.lisp b/tests/examples/data-frame.lisp index 7dbff3a..4c0f0b6 100644 --- a/tests/examples/data-frame.lisp +++ b/tests/examples/data-frame.lisp @@ -33,8 +33,7 @@ "Requests the computation of n!." (n nil :type (integer 0))) -(define-message-handler handle-message-factorial - ((process arithmetic-server) (message message-factorial)) +(define-message-handler ((process arithmetic-server) (message message-factorial)) "Sets up the computation of n! to service an inbound request." (process-continuation process `(FACTORIAL-STEP ,(message-factorial-n message)) @@ -59,8 +58,7 @@ If appropriate, it first calls FACTORIAL-STEP recursively, which sets up a retur (defstruct (message-factorial-tco (:include message-factorial)) "Also a factorial query, handled slightly differently by the server.") -(define-message-handler handle-message-factorial-tco - ((process arithmetic-server) (message message-factorial-tco)) +(define-message-handler ((process arithmetic-server) (message message-factorial-tco)) "Sets up the computation of n! to service an inbound request." (process-continuation process `(PUSH 1) @@ -78,12 +76,6 @@ This assumes that the return-value has already been set up, contributes the fact `(MULTIPLY) `(FACTORIAL-STEP-TCO ,(1- n))))) -;;; handlers - -(define-message-dispatch arithmetic-server - (message-factorial-tco 'handle-message-factorial-tco) - (message-factorial 'handle-message-factorial)) - ;;; ;;; tests ;;; diff --git a/tests/examples/distributed-mst.lisp b/tests/examples/distributed-mst.lisp index 820b0cc..3a23cc9 100644 --- a/tests/examples/distributed-mst.lisp +++ b/tests/examples/distributed-mst.lisp @@ -144,8 +144,8 @@ ;;; ;; section 3 of the paper pseudocode: response to Connect -(define-message-handler handle-msg-connect - ((node fragment-node) (message msg-connect)) +(define-message-handler ((node fragment-node) (message msg-connect) + :guard (not (eql ':SLEEPING (fragment-node-state node)))) "If we get a Connect message from a lower-level fragment, then mark that edge as a Branch and send an Initate message to absorb that fragment. Else, if the sender is at the end of a Basic edge, then wait (via a self-send). Otherwise, we've received a Connect message from a fragment of equal level, and thus we would like to merge with it. We do so by sending it an Initiate message with a higher level, and with the edge weight. NOTE: the following line is implemented using a guard in `DEFINE-MESSAGE-DISPATCH' @@ -201,8 +201,7 @@ if SN = sleeping then execute procedure wakeup" :weight edge-weight))))))))) ;; section 4 of the paper pseudocode: response to Initiate -(define-message-handler handle-msg-initiate - ((node fragment-node) (message msg-initiate)) +(define-message-handler ((node fragment-node) (message msg-initiate)) "An Initiate is a broadcast that is triggered in response to handling a Connect. In step 1, we update our internal state. In step 2, we continue the broadcast if we have any branches. Finally, in step 3, we push a `TEST' onto the stack if we are in the Find state." (let ((address (process-public-address node))) (with-slots (adjacent-edges find-count) node @@ -242,8 +241,8 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST))))))) ;; section 6 of the paper pseudocode: response to Test -(define-message-handler handle-msg-test - ((node fragment-node) (message msg-test)) +(define-message-handler ((node fragment-node) (message msg-test) + :guard (not (eql ':SLEEPING (fragment-node-state node)))) "Response to a Test message. If our level is <= to that of the message, then we wait (by self-sending the message). Else, if we are part of a different fragment, then we send an Accept. Otherwise, we should reject the edge that this message came from. We only mark the edge as Rejected if it is Basic -- if it's not Basic, then this is essentially a stale Test. However, even in the case that it is stale, we likely want to send back a Reject message to allow the sender to resume progress. Finally, we push a `TEST' onto the stack. @@ -295,8 +294,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST))))))))))) ;; section 7 of the paper pseudocode: response to Accept -(define-message-handler handle-msg-accept - ((node fragment-node) (message msg-accept)) +(define-message-handler ((node fragment-node) (message msg-accept)) "If accepted edge weight is less than `BEST-WEIGHT', update `BEST-WEIGHT' and `BEST-EDGE'. Either way, push `REPORT' onto the stack." ;; extract best-edge, best-wt, test-edge from node (with-slots (adjacent-edges best-edge best-weight test-edge) node @@ -314,8 +312,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(REPORT)))))) ;; section 8 of the paper pseudocode: response to Reject -(define-message-handler handle-msg-reject - ((node fragment-node) (message msg-reject)) +(define-message-handler ((node fragment-node) (message msg-reject)) "Mark edge as Rejected (if it is Basic), and push `TEST' onto the stack." (with-slots (adjacent-edges) node ;; extract j from message @@ -328,8 +325,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST)))))) ;; section 10 of the paper pseudocode: response to Report -(define-message-handler handle-msg-report - ((node fragment-node) (message msg-report)) +(define-message-handler ((node fragment-node) (message msg-report)) "If we get a Report from someone that's not our `IN-BRANCH', then we decrement `FIND-COUNT', potentially update our best edge & weight, and then proceed to `REPORT'. Else, if we're in the Find state, wait (via a self-send). Else, if the Report weight is lower than `BEST-WEIGHT', we should `CHANGE-ROOT'. Finally, if the Report weight and `BEST-WEIGHT' are both inifinity, then we're part of the core and have discovered the full MST, so it's time to stop." ;; extract best-edge, best-wt, find-count, in-branch, SN from node (with-slots (best-edge best-weight find-count in-branch) node @@ -374,29 +370,10 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(HALT))))))) ;; section 12 of the paper pseudocode: response to Change-root -(define-message-handler handle-msg-change-root - ((node fragment-node) (message msg-change-root)) +(define-message-handler ((node fragment-node) (message msg-change-root)) "Push a `CHANGE-ROOT' command onto the stack." (process-continuation node `(CHANGE-ROOT))) -;;; -;;; message dispatch -;;; - -;; NOTE: the guards are used to force nodes to wake up before handling messages -(define-message-dispatch fragment-node - (msg-connect 'handle-msg-connect - (not (eql (fragment-node-state fragment-node) - ':SLEEPING))) - (msg-initiate 'handle-msg-initiate) - (msg-test 'handle-msg-test - (not (eql (fragment-node-state fragment-node) - ':SLEEPING))) - (msg-accept 'handle-msg-accept) - (msg-reject 'handle-msg-reject) - (msg-report 'handle-msg-report) - (msg-change-root 'handle-msg-change-root)) - ;;; ;;; process upkeep ;;; diff --git a/tests/examples/flooding.lisp b/tests/examples/flooding.lisp index fb427ad..72187b2 100644 --- a/tests/examples/flooding.lisp +++ b/tests/examples/flooding.lisp @@ -51,8 +51,7 @@ ;;; handler definitions ;;; -(define-message-handler handle-message-flood - ((process process-flooding) (message message-flood)) +(define-message-handler ((process process-flooding) (message message-flood)) "Handles a `FLOOD' message. If the `PROCESS's parent is unset, sets it to the `PARENT-ADDRESS' and forwards along the broadcast by pushing the `BROADCAST-FLOOD' command onto the stack. Otherwise, lets the `REPLY-CHANNEL' know that we already have a parent." (with-slots (parent-address reply-channel) message (let ((parent (process-flooding-parent process))) @@ -65,13 +64,6 @@ (setf (process-flooding-parent process) parent-address) (process-continuation process `(BROADCAST-FLOOD ,reply-channel))))))) -;;; -;;; message dispatch -;;; - -(define-message-dispatch process-flooding - (message-flood 'handle-message-flood)) - ;;; ;;; process upkeep ;;; diff --git a/tests/examples/lock.lisp b/tests/examples/lock.lisp index a73c783..4518814 100644 --- a/tests/examples/lock.lisp +++ b/tests/examples/lock.lisp @@ -32,7 +32,7 @@ (define-process-upkeep ((process process-reader)) (START) (process-continuation process `(START))) -(define-rpc-handler handle-message-write ((process process-reader) (message message-write)) +(define-rpc-handler ((process process-reader) (message message-write)) (push (message-write-payload message) (process-reader-receive-list process))) (defmethod process-lockable-targets ((process process-reader)) @@ -62,14 +62,6 @@ `(TRANSMIT) `(START-NO-LOCKS)))) -(define-message-dispatch process-writer - ;; writers don't have to receive any messages - ) - -(define-message-dispatch process-reader - (message-lock 'handle-message-lock) - (message-write 'handle-message-write)) - (defmethod process-lockable-targets ((process process-writer)) nil) diff --git a/tests/examples/tree-coloring.lisp b/tests/examples/tree-coloring.lisp index 4f40baa..db6b4b9 100644 --- a/tests/examples/tree-coloring.lisp +++ b/tests/examples/tree-coloring.lisp @@ -16,16 +16,12 @@ "Sets the color of a process to one of two `COLOR' values." (color 0 :type bit)) -(define-broadcast-handler handle-broadcast-coloring - ((process process-tree-coloring) (message broadcast-coloring)) +(define-broadcast-handler ((process process-tree-coloring) (message broadcast-coloring)) "Sets this process's color according to the message, and inverts the message's color." (setf (process-tree-coloring-color process) (broadcast-coloring-color message)) (setf (broadcast-coloring-color message) (- 1 (broadcast-coloring-color message))) (push-broadcast-frame :targets (process-tree-children process))) -(define-message-dispatch process-tree-coloring - (broadcast-coloring 'handle-broadcast-coloring)) - (deftest test-process-tree-2-coloring () "Sends a message to the root of a tree of processes telling it to 2-color itself and its children, and then checks that the 2-coloring is correct. See `ADD-TREE-PROCESSES' for the fixture used to build the tree of processes." (with-address-dereferencing () diff --git a/tests/examples/tree-operations.lisp b/tests/examples/tree-operations.lisp index 697edd0..a5750a7 100644 --- a/tests/examples/tree-operations.lisp +++ b/tests/examples/tree-operations.lisp @@ -21,23 +21,20 @@ (defstruct (convergecast-tree-leaves (:include message)) "Used to determined the number of leaves in a tree of processes.") -(define-convergecast-handler handle-convergecast-tree-size - ((process process-tree) (message convergecast-tree-size)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-size)) "Computes the size of the tree of processes." (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce+ :input 1)) -(define-convergecast-handler handle-convergecast-tree-depth - ((process process-tree) (message convergecast-tree-depth)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-depth)) "Computes the depth of a tree of processes by incrementing a carry." (incf (convergecast-tree-depth-depth message)) (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce-max :input (convergecast-tree-depth-depth message))) -(define-convergecast-handler handle-convergecast-tree-depth-no-carry - ((process process-tree) (message convergecast-tree-depth-no-carry)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-depth-no-carry)) "Computes the depth of a tree of processes without using a carry. Demonstrates the power of `FUNCALL' over `REDUCE'." (flet ((1+reduce-max (input replies) (declare (ignore input)) @@ -46,8 +43,7 @@ :func #'1+reduce-max :input 1))) -(define-convergecast-handler handle-convergecast-tree-leaves - ((process process-tree) (message convergecast-tree-leaves)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-leaves)) "Computes the number of leaves of a tree of processes." (cond ((process-tree-children process) @@ -59,12 +55,6 @@ :func #'aether::reduce+ :input 1)))) -(define-message-dispatch process-tree - (convergecast-tree-size 'handle-convergecast-tree-size) - (convergecast-tree-depth 'handle-convergecast-tree-depth) - (convergecast-tree-depth-no-carry 'handle-convergecast-tree-depth-no-carry) - (convergecast-tree-leaves 'handle-convergecast-tree-leaves)) - (deftest test-process-tree-operations () "Tests that we can implement some common tree operations in a distributed setting using built-in convergecast facilities. See `ADD-TREE-PROCESSES' for the fixture used to build the tree of processes." (with-address-dereferencing () diff --git a/tests/process.lisp b/tests/process.lisp index 55c9a42..7fc2926 100644 --- a/tests/process.lisp +++ b/tests/process.lisp @@ -34,16 +34,10 @@ ;; the message handlers -(define-message-handler handle-msg-ping - ((box chatterbox) (message msg-ping)) +(define-message-handler ((box chatterbox) (message msg-ping)) (with-slots (reply-channel) message (send-message reply-channel (make-msg-pong :vote (vote box))))) -;;; the message dispatch - -(define-message-dispatch chatterbox - (msg-ping 'handle-msg-ping)) - ;;; the process commands (define-process-upkeep ((box chatterbox)) diff --git a/tests/recursive-lock.lisp b/tests/recursive-lock.lisp index 9b58a75..30136d6 100644 --- a/tests/recursive-lock.lisp +++ b/tests/recursive-lock.lisp @@ -13,8 +13,9 @@ (defstruct (message-test-lock (:include message)) "Demo message type. Supposed to be delayed by an established lock on a process.") -(define-message-handler handle-message-test-lock - ((process process-lockable-test) (message message-test-lock)) +(define-message-handler ((process process-lockable-test) (message message-test-lock) + ; don't reply if locked + :guard (not (process-lockable-locked? process))) "Records the receipt of a test lock message." (push `(,(process-tree-id process) ,(now)) *locking-events*)) @@ -22,12 +23,6 @@ "See `PROCESS-TREE-CHILDREN' for more information." (process-tree-children process)) -(define-message-dispatch process-lockable-test - (message-lock 'handle-message-lock) - (message-test-lock 'handle-message-test-lock - (not (process-lockable-locked? ; don't reply if locked - process-lockable-test)))) - (deftest test-process-lockable-successful () "Tests that message processing can be blocked by a recursive lock." (with-address-dereferencing () ; used by process-lockable-targets From d79157c1e25d6026f14fd9436adec8f933053cc5 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 15:59:52 -0700 Subject: [PATCH 3/6] guards on cast handlers --- src/process/cast.lisp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/process/cast.lisp b/src/process/cast.lisp index e823cf2..fb9efa2 100644 --- a/src/process/cast.lisp +++ b/src/process/cast.lisp @@ -23,7 +23,8 @@ (declare (ignore aborting? handle-rts? message targets)) (error "PUSH-BROADCAST-FRAME only available in DEFINE-BROADCAST-HANDLER.")) -(defmacro define-broadcast-handler (((process process-type) (message message-type)) +(defmacro define-broadcast-handler (((process process-type) (message message-type) + &key (guard nil guard-p)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `BROADCAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `BROADCAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-BROADCAST-HANDLER' we have access to a pair of helper functions: @@ -32,7 +33,8 @@ WARNING: `RETURN-FROM-CAST' calls `PUSH-BROADCAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." (a:with-gensyms (broadcast-frame reply-channel) - `(define-message-handler ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) (let ((,broadcast-frame nil) (,message (copy-message message)) (,reply-channel (message-reply-channel ,message))) @@ -163,7 +165,8 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t ,@body))))) (defmacro define-convergecast-handler (((process process-type) - (message message-type)) + (message message-type) + &optional (guard nil guard-p)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `CONVERGECAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `CONVERGECAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-CONVERGECAST-HANDLER' we have access to a pair of helper functions: @@ -171,20 +174,23 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t 2. `RETURN-FROM-CAST', which allows the user to terminate the convergecast operation early by sending up an acknowledgement (optionally specifying its contents) to the original sender of `MESSAGE'. It is recommended that a value is provided when returning from a convergecast, as it will be passed to a function (the function provided to the `CONVERGECAST-FRAME') when received by the original sender. WARNING: `RETURN-FROM-CAST' calls `PUSH-CONVERGECAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." - `(define-message-handler ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) ,@(%convergecast-body process message body))) ;; cf. define-message-subordinate (defmacro define-convergecast-subordinate (((process process-type) - (message message-type)) + (message message-type) + &optional (guard nil guard-p)) &body body) "Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process. This macro mimics DEFINE-CONVERGECAST-HANDLER while setting up this manner of parallel execution." (a:with-gensyms (command servicer subprocess) `(progn - (define-message-handler ((,process ,process-type) (,message ,message-type)) + (define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) (let ((,servicer (spawn-process 'process-message-emissary))) (schedule ,servicer (now)) (setf (process-command-stack ,servicer) (list (list ',command ,process ,message)) From c69e07ab7a9be1a3a46708cb6d0bc5917e3bca24 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 16:12:20 -0700 Subject: [PATCH 4/6] make progress signal scheme more compatible with c-n-m --- src/process/process.lisp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/process/process.lisp b/src/process/process.lisp index 7df12b8..34f7ccc 100644 --- a/src/process/process.lisp +++ b/src/process/process.lisp @@ -114,7 +114,7 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) ;; write the log entry. nil)) -(defun finish-handler (&optional (handled t)) +(defun finish-handler (&optional return-value (handled t)) "Return from the active aether process handler. If `HANDLED' is high, then the handler is understood as having taken care of the message, which is dropped from the inbox." (declare (ignore handled)) (error () "Not available outside of an aether process handler.")) @@ -132,13 +132,13 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) (if (consp message-and-message-type) message-and-message-type (list message-and-message-type t))) - (a:with-gensyms (handled) + (a:with-gensyms (return-value handled) `(defmethod %handle-process-message ((,process ,process-type) (,message ,message-type)) ,@(list documentation) ,@decls - (flet ((finish-handler (&optional (,handled t)) - (return-from %handle-process-message ,handled)) + (flet ((finish-handler (&optional ,return-value (,handled t)) + (return-from %handle-process-message (values ,return-value ,handled))) (log-entry (&rest initargs) (when (process-debug? ,process) (apply #'log-entry @@ -153,7 +153,10 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) :payload (copy-structure payload)) (send-message destination payload))) (declare (ignorable #'send-message)) - ,(if guard-p `(unless ,guard (finish-handler (call-next-method)))) + ,(if guard-p `(unless ,guard + (multiple-value-bind (,return-value ,handled) + (call-next-method) + (finish-handler ,return-value ,handled)))) (when (process-debug? ,process) (log-entry :time (now) :entry-type ':handler-invoked @@ -161,8 +164,7 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) :message-id (message-message-id ,message) :payload-type ',message-type :log-level 0)) - ,@body - (finish-handler t)))))))) + (values (progn ,@body) t)))))))) #+#:ignore (define-message-handler ((process process) (message message-RTS)) @@ -178,7 +180,7 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) ((= 3 safety) (check-key-secret address)) ((> 3 safety) nil)) (doq (message inbox) - (when (%handle-process-message node message) + (when (nth-value 1 (%handle-process-message node message)) (q-deq-first inbox (lambda (x) (eq x message))) (return-from handle-process-inbox t)) (when (and (process-debug? node) From b274850200ef60394f8800c902ae47ab21a716c0 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 16:41:47 -0700 Subject: [PATCH 5/6] improve c-n-m within rpc handlers --- src/process/process.lisp | 31 +++++++++++++++++++---------- src/process/rpc.lisp | 43 ++++++++++++++++++++++------------------ 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/process/process.lisp b/src/process/process.lisp index 34f7ccc..b234097 100644 --- a/src/process/process.lisp +++ b/src/process/process.lisp @@ -119,10 +119,10 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) (declare (ignore handled)) (error () "Not available outside of an aether process handler.")) -(defmacro define-message-handler - ((process-and-process-type message-and-message-type - &key (guard nil guard-p)) - &body body) +(defun dmh-form + (process-and-process-type message-and-message-type + guard gf-name + body) "Defines a passive message handler, which walks the process mailbox at the start of every tick." (multiple-value-bind (body decls documentation) (a:parse-body body :documentation t) (destructuring-bind ((process process-type) (message message-type)) @@ -133,12 +133,12 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) message-and-message-type (list message-and-message-type t))) (a:with-gensyms (return-value handled) - `(defmethod %handle-process-message ((,process ,process-type) - (,message ,message-type)) + `(defmethod ,gf-name ((,process ,process-type) + (,message ,message-type)) ,@(list documentation) ,@decls (flet ((finish-handler (&optional ,return-value (,handled t)) - (return-from %handle-process-message (values ,return-value ,handled))) + (return-from ,gf-name (values ,return-value ,handled))) (log-entry (&rest initargs) (when (process-debug? ,process) (apply #'log-entry @@ -153,10 +153,10 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) :payload (copy-structure payload)) (send-message destination payload))) (declare (ignorable #'send-message)) - ,(if guard-p `(unless ,guard - (multiple-value-bind (,return-value ,handled) - (call-next-method) - (finish-handler ,return-value ,handled)))) + (unless ,guard + (multiple-value-bind (,return-value ,handled) + (call-next-method) + (finish-handler ,return-value ,handled))) (when (process-debug? ,process) (log-entry :time (now) :entry-type ':handler-invoked @@ -166,6 +166,15 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) :log-level 0)) (values (progn ,@body) t)))))))) +(defmacro define-message-handler + ((process-and-process-type message-and-message-type + &key (guard t)) + &body body) + "Defines a passive message handler, which walks the process mailbox at the start of every tick." + (dmh-form process-and-process-type message-and-message-type + guard '%handle-process-message + body)) + #+#:ignore (define-message-handler ((process process) (message message-RTS)) "Default handler for when a `PROCESS' receives a `MESSAGE-RTS'. Throws an error." diff --git a/src/process/rpc.lisp b/src/process/rpc.lisp index 2ddbaee..788be3d 100644 --- a/src/process/rpc.lisp +++ b/src/process/rpc.lisp @@ -5,29 +5,34 @@ (in-package #:aether) -(defun finish-rpc-handler (&optional return-value (handled t)) - "Early escape from DEFINE-RPC-HANDLER. Like FINISH-HANDLER, but with a retval." - (declare (ignore return-value handled)) - (error () "Cannot call FINISH-RPC-HANDLER outside of a DEFINE-RPC-HANDLER body.")) +(defgeneric %handle-rpc-handler (process message) + (:documentation "Use DEFINE-RPC-HANDLER to add entries here.")) -;; TODO: this traps RETURN-FROM, but not FINISH-WITH-SCHEDULING. -(defmacro define-rpc-handler (((process process-type) (message message-type) - &key (guard nil guard-p)) +(defmacro define-rpc-handler ((process-and-process-type message-and-message-type + &key (guard t)) &body body) "Interrupt-based RPC handlers are expected to emit a reply to the caller. This macro augments DEFINE-MESSAGE-HANDLER to reply to the caller with the last evaluated form." (a:with-gensyms (block-name return-value reply-channel handled) - `(define-message-handler - ((,process ,process-type) (,message ,message-type) - ,@(when guard-p `(:guard ,guard))) - (multiple-value-bind (,return-value ,handled) - (block ,block-name - (flet ((finish-handler (&optional ,return-value (,handled t)) - (return-from ,block-name (values ,return-value ,handled)))) - (declare (ignorable #'finish-handler)) - (values (progn ,@body) t))) - (a:when-let ((,reply-channel (message-reply-channel ,message))) - (send-message ,reply-channel (make-message-rpc-done :result ,return-value))) - (finish-handler ,handled))))) + (let ((process (if (listp process-and-process-type) (first process-and-process-type) process-and-process-type)) + (message (if (listp message-and-message-type) (first message-and-message-type) message-and-message-type))) + `(progn + ;; actually computes RPC value + ,(dmh-form process-and-process-type message-and-message-type + guard '%handle-rpc-handler + body) + ;; hook from message handler subsystem to RPC subsystem. + ;; when hook returns, package RPC reply and send. + ,(dmh-form process-and-process-type message-and-message-type + guard '%handle-process-message + `((multiple-value-bind (,return-value ,handled) + (block ,block-name + (flet ((finish-handler (&optional ,return-value (,handled t)) + (return-from ,block-name (values ,return-value ,handled)))) + (declare (ignorable #'finish-handler)) + (%handle-rpc-handler ,process ,message))) + (a:when-let ((,reply-channel (message-reply-channel ,message))) + (send-message ,reply-channel (make-message-rpc-done :result ,return-value))) + (finish-handler ,return-value ,handled)))))))) (defmacro sync-rpc (message (result-place-or-list destination From 89fb7c72ea6ab7606658c24a1b1794e7d1770c63 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sat, 28 Mar 2026 16:57:17 -0700 Subject: [PATCH 6/6] TODO --- src/process/process.lisp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/process/process.lisp b/src/process/process.lisp index b234097..ffb8e2c 100644 --- a/src/process/process.lisp +++ b/src/process/process.lisp @@ -119,6 +119,8 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) (declare (ignore handled)) (error () "Not available outside of an aether process handler.")) +;; TODO: could add a combo for call-next-method and finish-handler + (defun dmh-form (process-and-process-type message-and-message-type guard gf-name