From d70094465e85c41d13aa7391d080e302b910e9cd Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 27 Sep 2016 16:33:43 -0700 Subject: [PATCH 1/2] new task log APIs --- state/task_log.h | 22 +++++++++++++++++ state/task_queue.h | 33 ------------------------- task.h | 61 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 82 insertions(+), 34 deletions(-) create mode 100644 state/task_log.h delete mode 100644 state/task_queue.h diff --git a/state/task_log.h b/state/task_log.h new file mode 100644 index 0000000..1a0ff43 --- /dev/null +++ b/state/task_log.h @@ -0,0 +1,22 @@ +#ifndef TASK_LOG_H +#define TASK_LOG_H + +#include "db.h" +#include "task.h" + +/* Callback for subscribing to the task log. */ +typedef void (*task_log_callback)(scheduled_task* task, void *userdata); + +/* Initially add a task to the task log. This adds the scheduled task. */ +void task_log_add_task(db_handle *db, task_iid task_iid, scheduled_task* task); + +/* Update task in the task log. This will append the new status to the + * task_iid entry of the task log. */ +void task_log_update_task(db_handle *db, task_iid task_iid, int32_t state, node_id node); + +/* Register callback for a certain event. The node specifies the node whose + * events we want to listen to. If you want to listen to all events for this node, + * use state_filter = TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. */ +void task_log_register_callback(db_handle *db, task_log_callback callback, node_id node, int32_t state_filter, void *userdata); + +#endif /* TASK_LOG_H */ diff --git a/state/task_queue.h b/state/task_queue.h deleted file mode 100644 index 9296870..0000000 --- a/state/task_queue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef TASK_QUEUE_H -#define TASK_QUEUE_H - -#include "db.h" -#include "task.h" - -/* The task ID is a deterministic hash of the function ID that - * the task executes and the argument IDs or argument values */ -typedef unique_id task_id; - -/* The task instance ID is a globally unique ID generated which - * identifies this particular execution of the task */ -typedef unique_id task_iid; - -/* The node id is an identifier for the node the task is - * scheduled on */ -typedef unique_id node_id; - -/* Callback for subscribing to the task queue. The only argument this - * callback gets is the task_id of the. */ -typedef void (*task_queue_callback)(task_iid *task_iid, task_spec *task); - -/* Submit task to the global scheduler. */ -void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task); - -/* Submit task to a local scheduler based on the decision made by the global - * scheduler. */ -void task_queue_schedule_task(db_handle *db, task_iid task_iid, node_id node); - -/* Subscribe to task queue. */ -void task_queue_register_callback(db_handle *db, task_queue_callback callback); - -#endif diff --git a/task.h b/task.h index 96c97b8..f212a4d 100644 --- a/task.h +++ b/task.h @@ -1,7 +1,7 @@ #ifndef TASK_H #define TASK_H -/* This API specifies the task data structure. It is in C so we can +/* This API specifies the task data structures. It is in C so we can * easily construct tasks from other languages like Python. The datastructures * are also defined in such a way that memory is contiguous and all pointers * are relative, so that we can memcpy the datastructure and ship it over the @@ -15,6 +15,24 @@ typedef unique_id function_id; typedef unique_id object_id; +/* The task ID is a deterministic hash of the function ID that + * the task executes and the argument IDs or argument values */ +typedef unique_id task_id; + +/* The task instance ID is a globally unique ID generated which + * identifies this particular execution of the task */ +typedef unique_id task_iid; + +/* The node id is an identifier for the node the task is + * scheduled on */ +typedef unique_id node_id; + +/* + * TASK SPECIFICATIONS: Contain all the information neccessary + * to execute the task (function id, arguments, return object ids). + * + */ + typedef struct task_spec_impl task_spec; /* If argument is passed by value or reference. */ @@ -68,4 +86,45 @@ void print_task(task_spec *spec, UT_string *output); /* Parse task as printed by print_task. */ task_spec *parse_task(char *task_string, int64_t task_length); +/* + * SCHEDULED TASK: Contains information about a scheduled task: + * the task iid, the task specification and the task status + * (WAITING, SCHEDULED, RUNNING, DONE) and which node the + * task is scheduled on. + * + */ + +/* The scheduling_state can be used as a flag when we are listening for an event, + * for example TASK_WAITING | TASK_SCHEDULED. */ +enum scheduling_state { + TASK_WAITING = 0, + TASK_SCHEDULED = 1, + TASK_RUNNING = 2, + TASK_DONE = 4 +}; + +typedef struct scheduled_task_impl scheduled_task; + +/* Allocate and initialize a new scheduled task. Must be freed with + * scheduled_task_free after use. */ +scheduled_task *make_scheduled_task(task_iid task_iid, task_spec *task, int32_t state, node_id node); + +/* Size of task log entry in bytes. */ +int64_t scheduled_task_size(scheduled_task *task); + +/* Task instance id. */ +task_iid *scheduled_task_iid(scheduled_task *task); + +/* Task specification of this scheduled task. */ +task_spec *scheduled_task_spec(scheduled_task *task); + +/* scheduling_state of the scheduled task. */ +int32_t *scheduled_task_state(scheduled_task *task); + +/* Node this task has been scheduled on. */ +node_id *scheduled_task_node(scheduled_task *task); + +/* Free this scheduled task. */ +void scheduled_task_free(scheduled_task *task); + #endif From e3cdcb8bde553f4164a26b70c08278fec938a32d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 27 Sep 2016 19:07:12 -0700 Subject: [PATCH 2/2] update API --- state/task_log.h | 23 +++++++++++++++++------ task.h | 41 +++++++++++++++++++++-------------------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/state/task_log.h b/state/task_log.h index 1a0ff43..d5aa4f0 100644 --- a/state/task_log.h +++ b/state/task_log.h @@ -4,19 +4,30 @@ #include "db.h" #include "task.h" +/* The task log is a message bus that is used for all communication between + * local and global schedulers (and also persisted to the state database). + * Here are examples of events that are recorded by the task log: + * + * 1) local scheduler writes it when submits a task to the global scheduler; + * 2) global scheduler reads it to get the task submitted by local schedulers; + * 3) global scheduler writes it when assigning the task to a local scheduler; + * 4) local scheduler reads it to get its tasks assigned by global scheduler; + * 5) local scheduler writes it when a task finishes execution; + * 6) global scheduler reads it to get the tasks that have finished; */ + /* Callback for subscribing to the task log. */ -typedef void (*task_log_callback)(scheduled_task* task, void *userdata); +typedef void (*task_log_callback)(task_instance* task_instance, void *userdata); -/* Initially add a task to the task log. This adds the scheduled task. */ -void task_log_add_task(db_handle *db, task_iid task_iid, scheduled_task* task); +/* Initially add a task instance to the task log. */ +void task_log_add_task(db_handle *db, task_iid task_iid, task_instance* task_instance); -/* Update task in the task log. This will append the new status to the - * task_iid entry of the task log. */ +/* Update task instance in the task log. */ void task_log_update_task(db_handle *db, task_iid task_iid, int32_t state, node_id node); /* Register callback for a certain event. The node specifies the node whose * events we want to listen to. If you want to listen to all events for this node, - * use state_filter = TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. */ + * use state_filter = TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * If you want to register to updates from all nodes, set node = NIL_ID. */ void task_log_register_callback(db_handle *db, task_log_callback callback, node_id node, int32_t state_filter, void *userdata); #endif /* TASK_LOG_H */ diff --git a/task.h b/task.h index f212a4d..6603f1a 100644 --- a/task.h +++ b/task.h @@ -1,3 +1,4 @@ + #ifndef TASK_H #define TASK_H @@ -97,34 +98,34 @@ task_spec *parse_task(char *task_string, int64_t task_length); /* The scheduling_state can be used as a flag when we are listening for an event, * for example TASK_WAITING | TASK_SCHEDULED. */ enum scheduling_state { - TASK_WAITING = 0, - TASK_SCHEDULED = 1, - TASK_RUNNING = 2, - TASK_DONE = 4 + TASK_WAITING = 1, + TASK_SCHEDULED = 2, + TASK_RUNNING = 4, + TASK_DONE = 8 }; -typedef struct scheduled_task_impl scheduled_task; +/* A task instance is one execution of a task specification. + * It has a unique instance id, a state of execution (see scheduling_state) + * and a node it is scheduled on or running on. */ +typedef struct task_instance_impl task_instance; -/* Allocate and initialize a new scheduled task. Must be freed with +/* Allocate and initialize a new task instance. Must be freed with * scheduled_task_free after use. */ -scheduled_task *make_scheduled_task(task_iid task_iid, task_spec *task, int32_t state, node_id node); - -/* Size of task log entry in bytes. */ -int64_t scheduled_task_size(scheduled_task *task); +task_instance *make_task_instance(task_iid task_iid, task_spec *task, int32_t state, node_id node); -/* Task instance id. */ -task_iid *scheduled_task_iid(scheduled_task *task); +/* Size of task instance structure in bytes. */ +int64_t task_instance_size(task_instance *instance); -/* Task specification of this scheduled task. */ -task_spec *scheduled_task_spec(scheduled_task *task); +/* Instance ID of the task instance. */ +task_iid *task_instance_id(task_instance *instance); -/* scheduling_state of the scheduled task. */ -int32_t *scheduled_task_state(scheduled_task *task); +/* Node this task instance has been assigned to or is running on. */ +node_id *task_instance_node(task_instance *instance); -/* Node this task has been scheduled on. */ -node_id *scheduled_task_node(scheduled_task *task); +/* Task specification of this task instance. */ +task_spec *task_instance_task_spec(task_instance *instance); -/* Free this scheduled task. */ -void scheduled_task_free(scheduled_task *task); +/* Free this task instance datastructure. */ +void task_instance_free(task_instance *instance); #endif