diff --git a/aether.asd b/aether.asd index b952691..bbc4aa4 100644 --- a/aether.asd +++ b/aether.asd @@ -19,6 +19,7 @@ (funcall compile))) :serial t :components ((:file "package") + (:file "params") (:file "utilities") (:file "queue") (:file "cheap-heap") diff --git a/src/courier.lisp b/src/courier.lisp index 5e70fc7..2506737 100644 --- a/src/courier.lisp +++ b/src/courier.lisp @@ -10,8 +10,6 @@ ;;; `*LOCAL-COURIER*'. ;;; -(defparameter *local-courier* nil - "Bound to the `COURIER' that services this process.") (defparameter *courier-processing-clock-rate* 100) (defparameter *routing-time-step* 1/100) diff --git a/src/params.lisp b/src/params.lisp new file mode 100644 index 0000000..ef9d4d3 --- /dev/null +++ b/src/params.lisp @@ -0,0 +1,8 @@ +;;;; params.lisp +;;;; +;;;; some global definitions to cope with file circularity + +(in-package #:aether) + +(defparameter *local-courier* nil + "Bound to the `COURIER' that services this process.") diff --git a/src/process/cast.lisp b/src/process/cast.lisp index 20fe7a0..fb9efa2 100644 --- a/src/process/cast.lisp +++ b/src/process/cast.lisp @@ -23,8 +23,8 @@ (declare (ignore aborting? handle-rts? message targets)) (error "PUSH-BROADCAST-FRAME only available in DEFINE-BROADCAST-HANDLER.")) -(defmacro define-broadcast-handler (handler-name - ((process process-type) (message message-type)) +(defmacro define-broadcast-handler (((process process-type) (message message-type) + &key (guard nil guard-p)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `BROADCAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `BROADCAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-BROADCAST-HANDLER' we have access to a pair of helper functions: @@ -33,8 +33,8 @@ WARNING: `RETURN-FROM-CAST' calls `PUSH-BROADCAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." (a:with-gensyms (broadcast-frame reply-channel) - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) (let ((,broadcast-frame nil) (,message (copy-message message)) (,reply-channel (message-reply-channel ,message))) @@ -164,9 +164,9 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t (process-continuation ,process `(CONVERGECAST)) ,@body))))) -(defmacro define-convergecast-handler (handler-name - ((process process-type) - (message message-type)) +(defmacro define-convergecast-handler (((process process-type) + (message message-type) + &optional (guard nil guard-p)) &body body) "This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `CONVERGECAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `CONVERGECAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-CONVERGECAST-HANDLER' we have access to a pair of helper functions: @@ -174,23 +174,23 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t 2. `RETURN-FROM-CAST', which allows the user to terminate the convergecast operation early by sending up an acknowledgement (optionally specifying its contents) to the original sender of `MESSAGE'. It is recommended that a value is provided when returning from a convergecast, as it will be passed to a function (the function provided to the `CONVERGECAST-FRAME') when received by the original sender. WARNING: `RETURN-FROM-CAST' calls `PUSH-CONVERGECAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early." - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + `(define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) ,@(%convergecast-body process message body))) ;; cf. define-message-subordinate (defmacro define-convergecast-subordinate - (handler-name - ((process process-type) - (message message-type)) + (((process process-type) + (message message-type) + &optional (guard nil guard-p)) &body body) "Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process. This macro mimics DEFINE-CONVERGECAST-HANDLER while setting up this manner of parallel execution." (a:with-gensyms (command servicer subprocess) `(progn - (define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + (define-message-handler ((,process ,process-type) (,message ,message-type) + ,@(when guard-p `(:guard ,guard))) (let ((,servicer (spawn-process 'process-message-emissary))) (schedule ,servicer (now)) (setf (process-command-stack ,servicer) (list (list ',command ,process ,message)) diff --git a/src/process/emissary.lisp b/src/process/emissary.lisp index e0b3d00..30090d6 100644 --- a/src/process/emissary.lisp +++ b/src/process/emissary.lisp @@ -8,25 +8,19 @@ () (:documentation "Dummy PROCESS used to host nonblocking message handlers.")) -(define-message-dispatch process-message-emissary - ;; no replies - ) - ;; TODO: consider rolling this together with DEFINE-MESSAGE-HANDLER. is the ;; gain in emulation efficiency really worth it? is there any potential ;; for nasty side effects? ;; TODO: unclear that this plays well with message tracing. (defmacro define-message-subordinate - (handler-name - ((process process-type) (message message-type)) + (((process process-type) (message message-type)) &body body) "Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process. This macro mimics DEFINE-MESSAGE-HANDLER while setting up this manner of parallel execution." (a:with-gensyms (command servicer subprocess) `(progn - (define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) + (define-message-handler ((,process ,process-type) (,message ,message-type)) (let ((,servicer (spawn-process 'process-message-emissary))) (schedule ,servicer (now)) (setf (process-command-stack ,servicer) (list (list ',command ,process ,message)) diff --git a/src/process/lock.lisp b/src/process/lock.lisp index 1faa85c..13bafed 100644 --- a/src/process/lock.lisp +++ b/src/process/lock.lisp @@ -125,8 +125,7 @@ "Sent to release a lock." (result)) -(define-message-handler handle-message-lock - ((process process-lockable) (message message-lock)) +(define-message-handler ((process process-lockable) (message message-lock)) "Attempts to lock PROCESS." (with-slots (reply-channel) message (with-slots (aborting? locked?) process diff --git a/src/process/process.lisp b/src/process/process.lisp index b1a4744..ffb8e2c 100644 --- a/src/process/process.lisp +++ b/src/process/process.lisp @@ -106,90 +106,84 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object.")) (error "Undefined command ~a for server of type ~a." command (type-of server)))) -(defgeneric %message-dispatch (node message) - (:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here.")) - -(defun finish-handler () - "Return from the active aether process handler." +(defgeneric %handle-process-message (process message) + (:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here.") + (:method (process message) + "Bottom message handler: marks the message as not having been handled." + ;; this is *not* implemented via DEFINE-MESSAGE-HANDLER bc we don't want to + ;; write the log entry. + nil)) + +(defun finish-handler (&optional return-value (handled t)) + "Return from the active aether process handler. If `HANDLED' is high, then the handler is understood as having taken care of the message, which is dropped from the inbox." + (declare (ignore handled)) (error () "Not available outside of an aether process handler.")) -(defmacro define-message-handler (handler-name ((process process-type) (message message-type)) &body body) - "Defines a function to be invoked by DEFINE-MESSAGE-DISPATCH." +;; TODO: could add a combo for call-next-method and finish-handler + +(defun dmh-form + (process-and-process-type message-and-message-type + guard gf-name + body) + "Defines a passive message handler, which walks the process mailbox at the start of every tick." (multiple-value-bind (body decls documentation) (a:parse-body body :documentation t) - `(defmethod ,handler-name ((,process ,process-type) - (,message ,message-type)) - ,@decls - ,@(list documentation) - (check-type ,message ,message-type) - (check-type ,process ,process-type) - (macrolet ((finish-handler () '(return-from ,handler-name))) - (flet ((log-entry (&rest initargs) - (when (process-debug? ,process) - (apply #'log-entry - (append initargs - (list :log-level 0 - :time (now) - :source ,process)))))) - (flet ((send-message (destination payload) - (log-entry :entry-type ':send-message - :destination destination - :payload (copy-structure payload)) - (send-message destination payload))) - (declare (ignorable #'send-message)) - ,@body)))))) - -(define-message-handler handle-message-RTS - ((process process) (message message-RTS)) + (destructuring-bind ((process process-type) (message message-type)) + (list (if (consp process-and-process-type) + process-and-process-type + (list process-and-process-type t)) + (if (consp message-and-message-type) + message-and-message-type + (list message-and-message-type t))) + (a:with-gensyms (return-value handled) + `(defmethod ,gf-name ((,process ,process-type) + (,message ,message-type)) + ,@(list documentation) + ,@decls + (flet ((finish-handler (&optional ,return-value (,handled t)) + (return-from ,gf-name (values ,return-value ,handled))) + (log-entry (&rest initargs) + (when (process-debug? ,process) + (apply #'log-entry + (append initargs + (list :log-level 0 + :time (now) + :source ,process)))))) + (declare (ignorable #'finish-handler)) + (flet ((send-message (destination payload) + (log-entry :entry-type ':send-message + :destination destination + :payload (copy-structure payload)) + (send-message destination payload))) + (declare (ignorable #'send-message)) + (unless ,guard + (multiple-value-bind (,return-value ,handled) + (call-next-method) + (finish-handler ,return-value ,handled))) + (when (process-debug? ,process) + (log-entry :time (now) + :entry-type ':handler-invoked + :source ,process + :message-id (message-message-id ,message) + :payload-type ',message-type + :log-level 0)) + (values (progn ,@body) t)))))))) + +(defmacro define-message-handler + ((process-and-process-type message-and-message-type + &key (guard t)) + &body body) + "Defines a passive message handler, which walks the process mailbox at the start of every tick." + (dmh-form process-and-process-type message-and-message-type + guard '%handle-process-message + body)) + +#+#:ignore +(define-message-handler ((process process) (message message-RTS)) "Default handler for when a `PROCESS' receives a `MESSAGE-RTS'. Throws an error." (error "Got an RTS")) -(defmacro define-message-dispatch (node-type &body clauses) - "Defines \"automatic\" message handlers associated to a particular subtype of `PROCESS'. Each handler is specified by a tuple of form (MESSAGE-TYPE MESSAGE-HANDLER &OPTIONAL GUARD). As with `RECEIVE-MESSAGE', each clause is processed in turn, according to the following rules: - - + If supplied, `GUARD' is evaluated with the `PROCESS' in question bound to the place `PROCESS-TYPE'. If `GUARD' evaluates to NIL, proceed to the next clause. - + Check the message queue at the public address for an item of type `MESSAGE-TYPE'. If such a message is found, call the associated `MESSAGE-HANDLER' with lambda triple (PROCESS MESSAGE TIME). Otherwise, proceed to the next clause. - -There is one exception: (CALL-NEXT-METHOD) is also a legal clause, and it references to the message handling table installed via DEFINE-MESSAGE-DISPATCH on this process type's parent. - -NOTES: - + If no clause is matched, execution proceeds to the semantics specified by `DEFINE-PROCESS-UPKEEP'. - + Automatically appends a `MESSAGE-RTS' clause which calls `HANDLE-MESSAGE-RTS' and results in an error. Because of this, we set `CATCH-RTS?' to NIL when processing clauses and building `RECEIVE-MESSAGE' blocks. Otherwise, it would be impossible to override the default handling of `MESSAGE-RTS'es. Additionally, this extra handler is _not_ inherited through (CALL-NEXT-METHOD). - -WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you will probably stall the underlying `PROCESS' if you perform some waiting action here, like the analogue of a `SYNC-RECEIVE'. See DEFINE-MESSAGE-SUBORDINATE for a workaround." - (a:with-gensyms (node message) - `(defmethod %message-dispatch ((,node ,node-type) ,message) - ,@(mapcar - (lambda (clause) - (cond - ((and (listp clause) - (= 1 (length clause)) - (symbolp (first clause)) - (string= "CALL-NEXT-METHOD" (symbol-name (first clause)))) - `(when (call-next-method) - (return-from %message-dispatch t))) - ((and (listp clause) - (member (length clause) '(2 3))) - (destructuring-bind (message-type receiver . rest) clause - `(when (and - (typep ,message ',message-type) - (let ((,node-type ,node)) - (declare (ignorable ,node-type)) - ,(or (first rest) t))) - (when (process-debug? ,node) - (log-entry :time (now) - :entry-type ':handler-invoked - :source ,node - :message-id (message-message-id ,message) - :payload-type ',message-type - :log-level 0)) - (funcall ,receiver ,node ,message) - (return-from %message-dispatch t)))) - (t - (error "Bad DEFINE-MESSAGE-DISPATCH clause: ~a" clause)))) - clauses)))) - -(defun message-dispatch (node) - "Use DEFINE-MESSAGE-DISPATCH to install methods here." +(defun handle-process-inbox (node) + "Use DEFINE-MESSAGE-HANDLER to install methods here." (let* ((address (process-public-address node)) (inbox (gethash (address-channel address) (courier-inboxes *local-courier*)))) @@ -197,9 +191,9 @@ WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you ((= 3 safety) (check-key-secret address)) ((> 3 safety) nil)) (doq (message inbox) - (when (%message-dispatch node message) + (when (nth-value 1 (%handle-process-message node message)) (q-deq-first inbox (lambda (x) (eq x message))) - (return-from message-dispatch t)) + (return-from handle-process-inbox t)) (when (and (process-debug? node) (typep message 'message-RTS)) (log-entry :time (now) @@ -207,8 +201,8 @@ WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you :source node :message-id (message-message-id message) :payload-type (type-of message)) - (handle-message-RTS node message) - (return-from message-dispatch t))))) + (error "Got an RTS") + (return-from handle-process-inbox t))))) ;; TODO: DEFINE-DPU-MACRO and DEFINE-DPU-FLET don't check syntactic sanity at ;; their runtime, they wait for DEFINE-PROCESS-UPKEEP to discover it. @@ -345,7 +339,7 @@ Locally enables the use of the various functions and macro forms defined in dpu- (with-slots (process-key process-clock-rate process-exhaust-inbox?) process (let ((*local-courier* (process-courier process)) (active? nil)) - (when (message-dispatch process) + (when (handle-process-inbox process) (when process-exhaust-inbox? (schedule process (now)) (return))) diff --git a/src/process/rpc.lisp b/src/process/rpc.lisp index 70a40de..788be3d 100644 --- a/src/process/rpc.lisp +++ b/src/process/rpc.lisp @@ -5,18 +5,34 @@ (in-package #:aether) -;; TODO: this traps RETURN-FROM, but not FINISH-WITH-SCHEDULING. -(defmacro define-rpc-handler (handler-name - ((process process-type) (message message-type)) +(defgeneric %handle-rpc-handler (process message) + (:documentation "Use DEFINE-RPC-HANDLER to add entries here.")) + +(defmacro define-rpc-handler ((process-and-process-type message-and-message-type + &key (guard t)) &body body) "Interrupt-based RPC handlers are expected to emit a reply to the caller. This macro augments DEFINE-MESSAGE-HANDLER to reply to the caller with the last evaluated form." - (a:with-gensyms (return-value reply-channel) - `(define-message-handler ,handler-name - ((,process ,process-type) (,message ,message-type)) - (let (,return-value) - (setf ,return-value (block ,handler-name ,@body)) - (a:when-let ((,reply-channel (message-reply-channel ,message))) - (send-message ,reply-channel (make-message-rpc-done :result ,return-value))))))) + (a:with-gensyms (block-name return-value reply-channel handled) + (let ((process (if (listp process-and-process-type) (first process-and-process-type) process-and-process-type)) + (message (if (listp message-and-message-type) (first message-and-message-type) message-and-message-type))) + `(progn + ;; actually computes RPC value + ,(dmh-form process-and-process-type message-and-message-type + guard '%handle-rpc-handler + body) + ;; hook from message handler subsystem to RPC subsystem. + ;; when hook returns, package RPC reply and send. + ,(dmh-form process-and-process-type message-and-message-type + guard '%handle-process-message + `((multiple-value-bind (,return-value ,handled) + (block ,block-name + (flet ((finish-handler (&optional ,return-value (,handled t)) + (return-from ,block-name (values ,return-value ,handled)))) + (declare (ignorable #'finish-handler)) + (%handle-rpc-handler ,process ,message))) + (a:when-let ((,reply-channel (message-reply-channel ,message))) + (send-message ,reply-channel (make-message-rpc-done :result ,return-value))) + (finish-handler ,return-value ,handled)))))))) (defmacro sync-rpc (message (result-place-or-list destination diff --git a/tests/cast.lisp b/tests/cast.lisp index 5e937f2..a91cf6d 100644 --- a/tests/cast.lisp +++ b/tests/cast.lisp @@ -46,14 +46,12 @@ ;;; message handlers ;;; -(define-broadcast-handler handle-broadcast-test - ((process process-tree-cast-test) (message broadcast-test)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test)) "Pushes the `PROCESS's ID to a global list." (push (process-tree-id process) *broadcast-events*) (push-broadcast-frame :targets (process-tree-children process))) -(define-broadcast-handler handle-broadcast-test-abort - ((process process-tree-cast-test) (message broadcast-test-abort)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-abort)) "Pushes the `PROCESS's ID to a global list, and potentially aborts the broadcast if the `PROCESS's ID is equal to `ABORT-ID'." (with-slots (abort-id) message (push (process-tree-id process) *broadcast-events*) @@ -61,29 +59,25 @@ (when (= abort-id (process-tree-id process)) (return-from-cast)))) -(define-broadcast-handler handle-broadcast-test-script - ((process process-tree-cast-test) (message broadcast-test-script)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-script)) "Pushes the `:PUSH-TIMES' command onto the stack." (with-slots (scalar) message (process-continuation process `(PUSH-TIMES ,scalar)) (push-broadcast-frame :targets (process-tree-children process)))) -(define-broadcast-handler handle-broadcast-test-rts - ((process process-tree-cast-test) (message broadcast-test-rts)) +(define-broadcast-handler ((process process-tree-cast-test) (message broadcast-test-rts)) "Pushes the `PROCESS's ID to a global list." (push (process-tree-id process) *broadcast-events*) (push-broadcast-frame :targets (process-tree-children process) :handle-rts? t)) -(define-convergecast-handler handle-convergecast-test - ((process process-tree-cast-test) (message convergecast-test)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test)) "Puts the `PROCESS's ID as `INPUT' to the convergcast frame." (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce+ :input (process-tree-id process))) -(define-convergecast-handler handle-convergecast-test-abort - ((process process-tree-cast-test) (message convergecast-test-abort)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-abort)) "Puts the `PROCESS's ID as `INPUT' to the convergecast frame, unless it is equal to `ABORT-ID', which triggers a `RETURN-FROM-CAST' and thus an abort of the convergecast operation." (with-slots (abort-id) message (push-convergecast-frame :targets (process-tree-children process) @@ -92,8 +86,7 @@ (when (= abort-id (process-tree-id process)) (return-from-cast 0)))) -(define-convergecast-handler handle-convergecast-test-script - ((process process-tree-cast-test) (message convergecast-test-script)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-script)) "Pushes the `:SET-TIMES' command onto the stack." (with-slots (scalar) message (process-continuation process `(SET-TIMES ,scalar)) @@ -101,8 +94,7 @@ :func #'aether::reduce+ :input (process-tree-id process)))) -(define-convergecast-handler handle-convergecast-test-rts - ((process process-tree-cast-test) (message convergecast-test-rts)) +(define-convergecast-handler ((process process-tree-cast-test) (message convergecast-test-rts)) "Puts the `PROCESS's ID as `INPUT' to the convergcast frame. In order to handle RTSes gracefully, our `FUNC' must also handle NILs." (labels ((null+ (input replies) (loop :for value :in (list* input replies) @@ -134,19 +126,6 @@ "Kills the process." (process-die)) -;;; -;;; message dispatch - -(define-message-dispatch process-tree-cast-test - (broadcast-test 'handle-broadcast-test) - (broadcast-test-abort 'handle-broadcast-test-abort) - (broadcast-test-script 'handle-broadcast-test-script) - (broadcast-test-rts 'handle-broadcast-test-rts) - (convergecast-test 'handle-convergecast-test) - (convergecast-test-abort 'handle-convergecast-test-abort) - (convergecast-test-script 'handle-convergecast-test-script) - (convergecast-test-rts 'handle-convergecast-test-rts)) - ;;; ;;; test definitions ;;; diff --git a/tests/examples/coloring.lisp b/tests/examples/coloring.lisp index 4edbedb..c5607ca 100644 --- a/tests/examples/coloring.lisp +++ b/tests/examples/coloring.lisp @@ -49,8 +49,7 @@ (defstruct (message-color-query (:include message)) "An RPC request for a foreign node's current color value.") -(define-rpc-handler handle-message-color-query - ((process process-coloring) (message message-color-query)) +(define-rpc-handler ((process process-coloring) (message message-color-query)) "Responds with this node's current color value." (process-coloring-color process)) @@ -79,8 +78,7 @@ (old nil :type (or null address)) (new nil :type (or null address))) -(define-rpc-handler handle-message-swap-neighbor - ((process process-coloring) (message message-swap-neighbor)) +(define-rpc-handler ((process process-coloring) (message message-swap-neighbor)) "Handles a SWAP-NEIGHBOR message." (let ((new (message-swap-neighbor-new message)) (old (message-swap-neighbor-old message))) @@ -96,8 +94,7 @@ :test #'address=)))) (incf (process-coloring-workloads process)))) -(define-rpc-handler handle-message-inject - ((process process-coloring) (message message-inject)) +(define-rpc-handler ((process process-coloring) (message message-inject)) "Handles an INJECT message." (let ((neighbors (message-inject-neighbors message)) (address (process-public-address process))) @@ -116,8 +113,7 @@ (defstruct (message-kill (:include message)) "Instruct a node to remove itself from the line.") -(define-rpc-handler handle-message-kill - ((process process-coloring) (message message-kill)) +(define-rpc-handler ((process process-coloring) (message message-kill)) "Given a node, tell its neighbors to sew over it, restart their coloring processes, and stop this node." (let ((neighbors (process-coloring-neighbors process)) (address (process-public-address process))) @@ -132,14 +128,6 @@ (setf (process-coloring-workloads process) 0) t)) -;;; generic things - -(define-message-dispatch process-coloring - (message-color-query 'handle-message-color-query) - (message-kill 'handle-message-kill) - (message-inject 'handle-message-inject) - (message-swap-neighbor 'handle-message-swap-neighbor)) - ;;; ;;; tests ;;; diff --git a/tests/examples/data-frame.lisp b/tests/examples/data-frame.lisp index 7dbff3a..4c0f0b6 100644 --- a/tests/examples/data-frame.lisp +++ b/tests/examples/data-frame.lisp @@ -33,8 +33,7 @@ "Requests the computation of n!." (n nil :type (integer 0))) -(define-message-handler handle-message-factorial - ((process arithmetic-server) (message message-factorial)) +(define-message-handler ((process arithmetic-server) (message message-factorial)) "Sets up the computation of n! to service an inbound request." (process-continuation process `(FACTORIAL-STEP ,(message-factorial-n message)) @@ -59,8 +58,7 @@ If appropriate, it first calls FACTORIAL-STEP recursively, which sets up a retur (defstruct (message-factorial-tco (:include message-factorial)) "Also a factorial query, handled slightly differently by the server.") -(define-message-handler handle-message-factorial-tco - ((process arithmetic-server) (message message-factorial-tco)) +(define-message-handler ((process arithmetic-server) (message message-factorial-tco)) "Sets up the computation of n! to service an inbound request." (process-continuation process `(PUSH 1) @@ -78,12 +76,6 @@ This assumes that the return-value has already been set up, contributes the fact `(MULTIPLY) `(FACTORIAL-STEP-TCO ,(1- n))))) -;;; handlers - -(define-message-dispatch arithmetic-server - (message-factorial-tco 'handle-message-factorial-tco) - (message-factorial 'handle-message-factorial)) - ;;; ;;; tests ;;; diff --git a/tests/examples/distributed-mst.lisp b/tests/examples/distributed-mst.lisp index 820b0cc..3a23cc9 100644 --- a/tests/examples/distributed-mst.lisp +++ b/tests/examples/distributed-mst.lisp @@ -144,8 +144,8 @@ ;;; ;; section 3 of the paper pseudocode: response to Connect -(define-message-handler handle-msg-connect - ((node fragment-node) (message msg-connect)) +(define-message-handler ((node fragment-node) (message msg-connect) + :guard (not (eql ':SLEEPING (fragment-node-state node)))) "If we get a Connect message from a lower-level fragment, then mark that edge as a Branch and send an Initate message to absorb that fragment. Else, if the sender is at the end of a Basic edge, then wait (via a self-send). Otherwise, we've received a Connect message from a fragment of equal level, and thus we would like to merge with it. We do so by sending it an Initiate message with a higher level, and with the edge weight. NOTE: the following line is implemented using a guard in `DEFINE-MESSAGE-DISPATCH' @@ -201,8 +201,7 @@ if SN = sleeping then execute procedure wakeup" :weight edge-weight))))))))) ;; section 4 of the paper pseudocode: response to Initiate -(define-message-handler handle-msg-initiate - ((node fragment-node) (message msg-initiate)) +(define-message-handler ((node fragment-node) (message msg-initiate)) "An Initiate is a broadcast that is triggered in response to handling a Connect. In step 1, we update our internal state. In step 2, we continue the broadcast if we have any branches. Finally, in step 3, we push a `TEST' onto the stack if we are in the Find state." (let ((address (process-public-address node))) (with-slots (adjacent-edges find-count) node @@ -242,8 +241,8 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST))))))) ;; section 6 of the paper pseudocode: response to Test -(define-message-handler handle-msg-test - ((node fragment-node) (message msg-test)) +(define-message-handler ((node fragment-node) (message msg-test) + :guard (not (eql ':SLEEPING (fragment-node-state node)))) "Response to a Test message. If our level is <= to that of the message, then we wait (by self-sending the message). Else, if we are part of a different fragment, then we send an Accept. Otherwise, we should reject the edge that this message came from. We only mark the edge as Rejected if it is Basic -- if it's not Basic, then this is essentially a stale Test. However, even in the case that it is stale, we likely want to send back a Reject message to allow the sender to resume progress. Finally, we push a `TEST' onto the stack. @@ -295,8 +294,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST))))))))))) ;; section 7 of the paper pseudocode: response to Accept -(define-message-handler handle-msg-accept - ((node fragment-node) (message msg-accept)) +(define-message-handler ((node fragment-node) (message msg-accept)) "If accepted edge weight is less than `BEST-WEIGHT', update `BEST-WEIGHT' and `BEST-EDGE'. Either way, push `REPORT' onto the stack." ;; extract best-edge, best-wt, test-edge from node (with-slots (adjacent-edges best-edge best-weight test-edge) node @@ -314,8 +312,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(REPORT)))))) ;; section 8 of the paper pseudocode: response to Reject -(define-message-handler handle-msg-reject - ((node fragment-node) (message msg-reject)) +(define-message-handler ((node fragment-node) (message msg-reject)) "Mark edge as Rejected (if it is Basic), and push `TEST' onto the stack." (with-slots (adjacent-edges) node ;; extract j from message @@ -328,8 +325,7 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(TEST)))))) ;; section 10 of the paper pseudocode: response to Report -(define-message-handler handle-msg-report - ((node fragment-node) (message msg-report)) +(define-message-handler ((node fragment-node) (message msg-report)) "If we get a Report from someone that's not our `IN-BRANCH', then we decrement `FIND-COUNT', potentially update our best edge & weight, and then proceed to `REPORT'. Else, if we're in the Find state, wait (via a self-send). Else, if the Report weight is lower than `BEST-WEIGHT', we should `CHANGE-ROOT'. Finally, if the Report weight and `BEST-WEIGHT' are both inifinity, then we're part of the core and have discovered the full MST, so it's time to stop." ;; extract best-edge, best-wt, find-count, in-branch, SN from node (with-slots (best-edge best-weight find-count in-branch) node @@ -374,29 +370,10 @@ if SN = sleeping then execute procedure wakeup" (process-continuation node `(HALT))))))) ;; section 12 of the paper pseudocode: response to Change-root -(define-message-handler handle-msg-change-root - ((node fragment-node) (message msg-change-root)) +(define-message-handler ((node fragment-node) (message msg-change-root)) "Push a `CHANGE-ROOT' command onto the stack." (process-continuation node `(CHANGE-ROOT))) -;;; -;;; message dispatch -;;; - -;; NOTE: the guards are used to force nodes to wake up before handling messages -(define-message-dispatch fragment-node - (msg-connect 'handle-msg-connect - (not (eql (fragment-node-state fragment-node) - ':SLEEPING))) - (msg-initiate 'handle-msg-initiate) - (msg-test 'handle-msg-test - (not (eql (fragment-node-state fragment-node) - ':SLEEPING))) - (msg-accept 'handle-msg-accept) - (msg-reject 'handle-msg-reject) - (msg-report 'handle-msg-report) - (msg-change-root 'handle-msg-change-root)) - ;;; ;;; process upkeep ;;; diff --git a/tests/examples/flooding.lisp b/tests/examples/flooding.lisp index fb427ad..72187b2 100644 --- a/tests/examples/flooding.lisp +++ b/tests/examples/flooding.lisp @@ -51,8 +51,7 @@ ;;; handler definitions ;;; -(define-message-handler handle-message-flood - ((process process-flooding) (message message-flood)) +(define-message-handler ((process process-flooding) (message message-flood)) "Handles a `FLOOD' message. If the `PROCESS's parent is unset, sets it to the `PARENT-ADDRESS' and forwards along the broadcast by pushing the `BROADCAST-FLOOD' command onto the stack. Otherwise, lets the `REPLY-CHANNEL' know that we already have a parent." (with-slots (parent-address reply-channel) message (let ((parent (process-flooding-parent process))) @@ -65,13 +64,6 @@ (setf (process-flooding-parent process) parent-address) (process-continuation process `(BROADCAST-FLOOD ,reply-channel))))))) -;;; -;;; message dispatch -;;; - -(define-message-dispatch process-flooding - (message-flood 'handle-message-flood)) - ;;; ;;; process upkeep ;;; diff --git a/tests/examples/lock.lisp b/tests/examples/lock.lisp index a73c783..4518814 100644 --- a/tests/examples/lock.lisp +++ b/tests/examples/lock.lisp @@ -32,7 +32,7 @@ (define-process-upkeep ((process process-reader)) (START) (process-continuation process `(START))) -(define-rpc-handler handle-message-write ((process process-reader) (message message-write)) +(define-rpc-handler ((process process-reader) (message message-write)) (push (message-write-payload message) (process-reader-receive-list process))) (defmethod process-lockable-targets ((process process-reader)) @@ -62,14 +62,6 @@ `(TRANSMIT) `(START-NO-LOCKS)))) -(define-message-dispatch process-writer - ;; writers don't have to receive any messages - ) - -(define-message-dispatch process-reader - (message-lock 'handle-message-lock) - (message-write 'handle-message-write)) - (defmethod process-lockable-targets ((process process-writer)) nil) diff --git a/tests/examples/tree-coloring.lisp b/tests/examples/tree-coloring.lisp index 4f40baa..db6b4b9 100644 --- a/tests/examples/tree-coloring.lisp +++ b/tests/examples/tree-coloring.lisp @@ -16,16 +16,12 @@ "Sets the color of a process to one of two `COLOR' values." (color 0 :type bit)) -(define-broadcast-handler handle-broadcast-coloring - ((process process-tree-coloring) (message broadcast-coloring)) +(define-broadcast-handler ((process process-tree-coloring) (message broadcast-coloring)) "Sets this process's color according to the message, and inverts the message's color." (setf (process-tree-coloring-color process) (broadcast-coloring-color message)) (setf (broadcast-coloring-color message) (- 1 (broadcast-coloring-color message))) (push-broadcast-frame :targets (process-tree-children process))) -(define-message-dispatch process-tree-coloring - (broadcast-coloring 'handle-broadcast-coloring)) - (deftest test-process-tree-2-coloring () "Sends a message to the root of a tree of processes telling it to 2-color itself and its children, and then checks that the 2-coloring is correct. See `ADD-TREE-PROCESSES' for the fixture used to build the tree of processes." (with-address-dereferencing () diff --git a/tests/examples/tree-operations.lisp b/tests/examples/tree-operations.lisp index 697edd0..a5750a7 100644 --- a/tests/examples/tree-operations.lisp +++ b/tests/examples/tree-operations.lisp @@ -21,23 +21,20 @@ (defstruct (convergecast-tree-leaves (:include message)) "Used to determined the number of leaves in a tree of processes.") -(define-convergecast-handler handle-convergecast-tree-size - ((process process-tree) (message convergecast-tree-size)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-size)) "Computes the size of the tree of processes." (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce+ :input 1)) -(define-convergecast-handler handle-convergecast-tree-depth - ((process process-tree) (message convergecast-tree-depth)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-depth)) "Computes the depth of a tree of processes by incrementing a carry." (incf (convergecast-tree-depth-depth message)) (push-convergecast-frame :targets (process-tree-children process) :func #'aether::reduce-max :input (convergecast-tree-depth-depth message))) -(define-convergecast-handler handle-convergecast-tree-depth-no-carry - ((process process-tree) (message convergecast-tree-depth-no-carry)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-depth-no-carry)) "Computes the depth of a tree of processes without using a carry. Demonstrates the power of `FUNCALL' over `REDUCE'." (flet ((1+reduce-max (input replies) (declare (ignore input)) @@ -46,8 +43,7 @@ :func #'1+reduce-max :input 1))) -(define-convergecast-handler handle-convergecast-tree-leaves - ((process process-tree) (message convergecast-tree-leaves)) +(define-convergecast-handler ((process process-tree) (message convergecast-tree-leaves)) "Computes the number of leaves of a tree of processes." (cond ((process-tree-children process) @@ -59,12 +55,6 @@ :func #'aether::reduce+ :input 1)))) -(define-message-dispatch process-tree - (convergecast-tree-size 'handle-convergecast-tree-size) - (convergecast-tree-depth 'handle-convergecast-tree-depth) - (convergecast-tree-depth-no-carry 'handle-convergecast-tree-depth-no-carry) - (convergecast-tree-leaves 'handle-convergecast-tree-leaves)) - (deftest test-process-tree-operations () "Tests that we can implement some common tree operations in a distributed setting using built-in convergecast facilities. See `ADD-TREE-PROCESSES' for the fixture used to build the tree of processes." (with-address-dereferencing () diff --git a/tests/process.lisp b/tests/process.lisp index 55c9a42..7fc2926 100644 --- a/tests/process.lisp +++ b/tests/process.lisp @@ -34,16 +34,10 @@ ;; the message handlers -(define-message-handler handle-msg-ping - ((box chatterbox) (message msg-ping)) +(define-message-handler ((box chatterbox) (message msg-ping)) (with-slots (reply-channel) message (send-message reply-channel (make-msg-pong :vote (vote box))))) -;;; the message dispatch - -(define-message-dispatch chatterbox - (msg-ping 'handle-msg-ping)) - ;;; the process commands (define-process-upkeep ((box chatterbox)) diff --git a/tests/recursive-lock.lisp b/tests/recursive-lock.lisp index 9b58a75..30136d6 100644 --- a/tests/recursive-lock.lisp +++ b/tests/recursive-lock.lisp @@ -13,8 +13,9 @@ (defstruct (message-test-lock (:include message)) "Demo message type. Supposed to be delayed by an established lock on a process.") -(define-message-handler handle-message-test-lock - ((process process-lockable-test) (message message-test-lock)) +(define-message-handler ((process process-lockable-test) (message message-test-lock) + ; don't reply if locked + :guard (not (process-lockable-locked? process))) "Records the receipt of a test lock message." (push `(,(process-tree-id process) ,(now)) *locking-events*)) @@ -22,12 +23,6 @@ "See `PROCESS-TREE-CHILDREN' for more information." (process-tree-children process)) -(define-message-dispatch process-lockable-test - (message-lock 'handle-message-lock) - (message-test-lock 'handle-message-test-lock - (not (process-lockable-locked? ; don't reply if locked - process-lockable-test)))) - (deftest test-process-lockable-successful () "Tests that message processing can be blocked by a recursive lock." (with-address-dereferencing () ; used by process-lockable-targets