Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aether.asd
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
(funcall compile)))
:serial t
:components ((:file "package")
(:file "params")
(:file "utilities")
(:file "queue")
(:file "cheap-heap")
Expand Down
2 changes: 0 additions & 2 deletions src/courier.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
;;; `*LOCAL-COURIER*'.
;;;

(defparameter *local-courier* nil
"Bound to the `COURIER' that services this process.")
(defparameter *courier-processing-clock-rate* 100)
(defparameter *routing-time-step* 1/100)

Expand Down
8 changes: 8 additions & 0 deletions src/params.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
;;;; params.lisp
;;;;
;;;; some global definitions to cope with file circularity

(in-package #:aether)

(defparameter *local-courier* nil
"Bound to the `COURIER' that services this process.")
28 changes: 14 additions & 14 deletions src/process/cast.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
(declare (ignore aborting? handle-rts? message targets))
(error "PUSH-BROADCAST-FRAME only available in DEFINE-BROADCAST-HANDLER."))

(defmacro define-broadcast-handler (handler-name
((process process-type) (message message-type))
(defmacro define-broadcast-handler (((process process-type) (message message-type)
&key (guard nil guard-p))
&body body)
"This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `BROADCAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `BROADCAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-BROADCAST-HANDLER' we have access to a pair of helper functions:

Expand All @@ -33,8 +33,8 @@

WARNING: `RETURN-FROM-CAST' calls `PUSH-BROADCAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early."
(a:with-gensyms (broadcast-frame reply-channel)
`(define-message-handler ,handler-name
((,process ,process-type) (,message ,message-type))
`(define-message-handler ((,process ,process-type) (,message ,message-type)
,@(when guard-p `(:guard ,guard)))
(let ((,broadcast-frame nil)
(,message (copy-message message))
(,reply-channel (message-reply-channel ,message)))
Expand Down Expand Up @@ -164,33 +164,33 @@ Where `REPLIES' is assumed to be a `LIST'. Additionally, when `HANDLE-RTS?' is t
(process-continuation ,process `(CONVERGECAST))
,@body)))))

(defmacro define-convergecast-handler (handler-name
((process process-type)
(message message-type))
(defmacro define-convergecast-handler (((process process-type)
(message message-type)
&optional (guard nil guard-p))
&body body)
"This macro augments `DEFINE-MESSAGE-HANDLER', by pushing a `CONVERGECAST' command onto the `PROCESS's command stack. This command takes no arguments, but expects a `CONVERGECAST-FRAME' to be on the data stack when it is called. Thus, either the handler must prepare the data frame, or it must be prepared by whatever script is pushed onto the command stack by the handler. Additionally, inside the context of `DEFINE-CONVERGECAST-HANDLER' we have access to a pair of helper functions:

1. `PUSH-CONVERGECAST-FRAME', which creates a `CONVERGECAST-FRAME' and pushes it onto the data stack. If no `MESSAGE' is provided to the function, it defaults to the `MESSAGE' currently being handled. If a user calls `PUSH-BROADCAST-FRAME' twice in the same handler, an error will be raised.
2. `RETURN-FROM-CAST', which allows the user to terminate the convergecast operation early by sending up an acknowledgement (optionally specifying its contents) to the original sender of `MESSAGE'. It is recommended that a value is provided when returning from a convergecast, as it will be passed to a function (the function provided to the `CONVERGECAST-FRAME') when received by the original sender.

WARNING: `RETURN-FROM-CAST' calls `PUSH-CONVERGECAST-FRAME' as part of the aborting process. If a frame has already been pushed onto the data stack, we instead alter that frame rather than pushing an additional one (which could have strange consequences). Additionally, it is important to note that `RETURN-FROM-CAST' uses `FINISH-WITH-SCHEDULING' in order to return from the handler early."
`(define-message-handler ,handler-name
((,process ,process-type) (,message ,message-type))
`(define-message-handler ((,process ,process-type) (,message ,message-type)
,@(when guard-p `(:guard ,guard)))
,@(%convergecast-body process message body)))

;; cf. define-message-subordinate
(defmacro define-convergecast-subordinate
(handler-name
((process process-type)
(message message-type))
(((process process-type)
(message message-type)
&optional (guard nil guard-p))
&body body)
"Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process.

This macro mimics DEFINE-CONVERGECAST-HANDLER while setting up this manner of parallel execution."
(a:with-gensyms (command servicer subprocess)
`(progn
(define-message-handler ,handler-name
((,process ,process-type) (,message ,message-type))
(define-message-handler ((,process ,process-type) (,message ,message-type)
,@(when guard-p `(:guard ,guard)))
(let ((,servicer (spawn-process 'process-message-emissary)))
(schedule ,servicer (now))
(setf (process-command-stack ,servicer) (list (list ',command ,process ,message))
Expand Down
10 changes: 2 additions & 8 deletions src/process/emissary.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,19 @@
()
(:documentation "Dummy PROCESS used to host nonblocking message handlers."))

(define-message-dispatch process-message-emissary
;; no replies
)

;; TODO: consider rolling this together with DEFINE-MESSAGE-HANDLER. is the
;; gain in emulation efficiency really worth it? is there any potential
;; for nasty side effects?
;; TODO: unclear that this plays well with message tracing.
(defmacro define-message-subordinate
(handler-name
((process process-type) (message message-type))
(((process process-type) (message message-type))
&body body)
"Interrupt-based RPC handlers are expected to quickly return control to the main thread of execution, and any maneuvers which take nontrivial simulation time are modeled as commands pushed onto the underlying process's command stack. However, this is executed serially with whatever the process was doing when it received the interrupt. It is sometimes more convenient to process the tasks in parallel, which we model by delegating the new task to a newly spawned side-process.

This macro mimics DEFINE-MESSAGE-HANDLER while setting up this manner of parallel execution."
(a:with-gensyms (command servicer subprocess)
`(progn
(define-message-handler ,handler-name
((,process ,process-type) (,message ,message-type))
(define-message-handler ((,process ,process-type) (,message ,message-type))
(let ((,servicer (spawn-process 'process-message-emissary)))
(schedule ,servicer (now))
(setf (process-command-stack ,servicer) (list (list ',command ,process ,message))
Expand Down
3 changes: 1 addition & 2 deletions src/process/lock.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@
"Sent to release a lock."
(result))

(define-message-handler handle-message-lock
((process process-lockable) (message message-lock))
(define-message-handler ((process process-lockable) (message message-lock))
"Attempts to lock PROCESS."
(with-slots (reply-channel) message
(with-slots (aborting? locked?) process
Expand Down
160 changes: 77 additions & 83 deletions src/process/process.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -106,109 +106,103 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object."))
(error "Undefined command ~a for server of type ~a."
command (type-of server))))

(defgeneric %message-dispatch (node message)
(:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here."))

(defun finish-handler ()
"Return from the active aether process handler."
(defgeneric %handle-process-message (process message)
(:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here.")
(:method (process message)
"Bottom message handler: marks the message as not having been handled."
;; this is *not* implemented via DEFINE-MESSAGE-HANDLER bc we don't want to
;; write the log entry.
nil))

(defun finish-handler (&optional return-value (handled t))
"Return from the active aether process handler. If `HANDLED' is high, then the handler is understood as having taken care of the message, which is dropped from the inbox."
(declare (ignore handled))
(error () "Not available outside of an aether process handler."))

(defmacro define-message-handler (handler-name ((process process-type) (message message-type)) &body body)
"Defines a function to be invoked by DEFINE-MESSAGE-DISPATCH."
;; TODO: could add a combo for call-next-method and finish-handler

(defun dmh-form
(process-and-process-type message-and-message-type
guard gf-name
body)
"Defines a passive message handler, which walks the process mailbox at the start of every tick."
(multiple-value-bind (body decls documentation) (a:parse-body body :documentation t)
`(defmethod ,handler-name ((,process ,process-type)
(,message ,message-type))
,@decls
,@(list documentation)
(check-type ,message ,message-type)
(check-type ,process ,process-type)
(macrolet ((finish-handler () '(return-from ,handler-name)))
(flet ((log-entry (&rest initargs)
(when (process-debug? ,process)
(apply #'log-entry
(append initargs
(list :log-level 0
:time (now)
:source ,process))))))
(flet ((send-message (destination payload)
(log-entry :entry-type ':send-message
:destination destination
:payload (copy-structure payload))
(send-message destination payload)))
(declare (ignorable #'send-message))
,@body))))))

(define-message-handler handle-message-RTS
((process process) (message message-RTS))
(destructuring-bind ((process process-type) (message message-type))
(list (if (consp process-and-process-type)
process-and-process-type
(list process-and-process-type t))
(if (consp message-and-message-type)
message-and-message-type
(list message-and-message-type t)))
(a:with-gensyms (return-value handled)
`(defmethod ,gf-name ((,process ,process-type)
(,message ,message-type))
,@(list documentation)
,@decls
(flet ((finish-handler (&optional ,return-value (,handled t))
(return-from ,gf-name (values ,return-value ,handled)))
(log-entry (&rest initargs)
(when (process-debug? ,process)
(apply #'log-entry
(append initargs
(list :log-level 0
:time (now)
:source ,process))))))
(declare (ignorable #'finish-handler))
(flet ((send-message (destination payload)
(log-entry :entry-type ':send-message
:destination destination
:payload (copy-structure payload))
(send-message destination payload)))
(declare (ignorable #'send-message))
(unless ,guard
(multiple-value-bind (,return-value ,handled)
(call-next-method)
(finish-handler ,return-value ,handled)))
(when (process-debug? ,process)
(log-entry :time (now)
:entry-type ':handler-invoked
:source ,process
:message-id (message-message-id ,message)
:payload-type ',message-type
:log-level 0))
(values (progn ,@body) t))))))))

(defmacro define-message-handler
((process-and-process-type message-and-message-type
&key (guard t))
&body body)
"Defines a passive message handler, which walks the process mailbox at the start of every tick."
(dmh-form process-and-process-type message-and-message-type
guard '%handle-process-message
body))

#+#:ignore
(define-message-handler ((process process) (message message-RTS))
"Default handler for when a `PROCESS' receives a `MESSAGE-RTS'. Throws an error."
(error "Got an RTS"))

(defmacro define-message-dispatch (node-type &body clauses)
"Defines \"automatic\" message handlers associated to a particular subtype of `PROCESS'. Each handler is specified by a tuple of form (MESSAGE-TYPE MESSAGE-HANDLER &OPTIONAL GUARD). As with `RECEIVE-MESSAGE', each clause is processed in turn, according to the following rules:

+ If supplied, `GUARD' is evaluated with the `PROCESS' in question bound to the place `PROCESS-TYPE'. If `GUARD' evaluates to NIL, proceed to the next clause.
+ Check the message queue at the public address for an item of type `MESSAGE-TYPE'. If such a message is found, call the associated `MESSAGE-HANDLER' with lambda triple (PROCESS MESSAGE TIME). Otherwise, proceed to the next clause.

There is one exception: (CALL-NEXT-METHOD) is also a legal clause, and it references to the message handling table installed via DEFINE-MESSAGE-DISPATCH on this process type's parent.

NOTES:
+ If no clause is matched, execution proceeds to the semantics specified by `DEFINE-PROCESS-UPKEEP'.
+ Automatically appends a `MESSAGE-RTS' clause which calls `HANDLE-MESSAGE-RTS' and results in an error. Because of this, we set `CATCH-RTS?' to NIL when processing clauses and building `RECEIVE-MESSAGE' blocks. Otherwise, it would be impossible to override the default handling of `MESSAGE-RTS'es. Additionally, this extra handler is _not_ inherited through (CALL-NEXT-METHOD).

WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you will probably stall the underlying `PROCESS' if you perform some waiting action here, like the analogue of a `SYNC-RECEIVE'. See DEFINE-MESSAGE-SUBORDINATE for a workaround."
(a:with-gensyms (node message)
`(defmethod %message-dispatch ((,node ,node-type) ,message)
,@(mapcar
(lambda (clause)
(cond
((and (listp clause)
(= 1 (length clause))
(symbolp (first clause))
(string= "CALL-NEXT-METHOD" (symbol-name (first clause))))
`(when (call-next-method)
(return-from %message-dispatch t)))
((and (listp clause)
(member (length clause) '(2 3)))
(destructuring-bind (message-type receiver . rest) clause
`(when (and
(typep ,message ',message-type)
(let ((,node-type ,node))
(declare (ignorable ,node-type))
,(or (first rest) t)))
(when (process-debug? ,node)
(log-entry :time (now)
:entry-type ':handler-invoked
:source ,node
:message-id (message-message-id ,message)
:payload-type ',message-type
:log-level 0))
(funcall ,receiver ,node ,message)
(return-from %message-dispatch t))))
(t
(error "Bad DEFINE-MESSAGE-DISPATCH clause: ~a" clause))))
clauses))))

(defun message-dispatch (node)
"Use DEFINE-MESSAGE-DISPATCH to install methods here."
(defun handle-process-inbox (node)
"Use DEFINE-MESSAGE-HANDLER to install methods here."
(let* ((address (process-public-address node))
(inbox (gethash (address-channel address)
(courier-inboxes *local-courier*))))
(policy-cond:policy-cond
((= 3 safety) (check-key-secret address))
((> 3 safety) nil))
(doq (message inbox)
(when (%message-dispatch node message)
(when (nth-value 1 (%handle-process-message node message))
(q-deq-first inbox (lambda (x) (eq x message)))
(return-from message-dispatch t))
(return-from handle-process-inbox t))
(when (and (process-debug? node)
(typep message 'message-RTS))
(log-entry :time (now)
:entry-type ':handler-invoked
:source node
:message-id (message-message-id message)
:payload-type (type-of message))
(handle-message-RTS node message)
(return-from message-dispatch t)))))
(error "Got an RTS")
(return-from handle-process-inbox t)))))

;; TODO: DEFINE-DPU-MACRO and DEFINE-DPU-FLET don't check syntactic sanity at
;; their runtime, they wait for DEFINE-PROCESS-UPKEEP to discover it.
Expand Down Expand Up @@ -345,7 +339,7 @@ Locally enables the use of the various functions and macro forms defined in dpu-
(with-slots (process-key process-clock-rate process-exhaust-inbox?) process
(let ((*local-courier* (process-courier process))
(active? nil))
(when (message-dispatch process)
(when (handle-process-inbox process)
(when process-exhaust-inbox?
(schedule process (now))
(return)))
Expand Down
Loading
Loading