diff --git a/state/task_log.h b/state/task_log.h new file mode 100644 index 0000000..d5aa4f0 --- /dev/null +++ b/state/task_log.h @@ -0,0 +1,33 @@ +#ifndef TASK_LOG_H +#define TASK_LOG_H + +#include "db.h" +#include "task.h" + +/* The task log is a message bus that is used for all communication between + * local and global schedulers (and also persisted to the state database). + * Here are examples of events that are recorded by the task log: + * + * 1) local scheduler writes it when submits a task to the global scheduler; + * 2) global scheduler reads it to get the task submitted by local schedulers; + * 3) global scheduler writes it when assigning the task to a local scheduler; + * 4) local scheduler reads it to get its tasks assigned by global scheduler; + * 5) local scheduler writes it when a task finishes execution; + * 6) global scheduler reads it to get the tasks that have finished; */ + +/* Callback for subscribing to the task log. */ +typedef void (*task_log_callback)(task_instance* task_instance, void *userdata); + +/* Initially add a task instance to the task log. */ +void task_log_add_task(db_handle *db, task_iid task_iid, task_instance* task_instance); + +/* Update task instance in the task log. */ +void task_log_update_task(db_handle *db, task_iid task_iid, int32_t state, node_id node); + +/* Register callback for a certain event. The node specifies the node whose + * events we want to listen to. If you want to listen to all events for this node, + * use state_filter = TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * If you want to register to updates from all nodes, set node = NIL_ID. */ +void task_log_register_callback(db_handle *db, task_log_callback callback, node_id node, int32_t state_filter, void *userdata); + +#endif /* TASK_LOG_H */ diff --git a/state/task_queue.h b/state/task_queue.h deleted file mode 100644 index 9296870..0000000 --- a/state/task_queue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef TASK_QUEUE_H -#define TASK_QUEUE_H - -#include "db.h" -#include "task.h" - -/* The task ID is a deterministic hash of the function ID that - * the task executes and the argument IDs or argument values */ -typedef unique_id task_id; - -/* The task instance ID is a globally unique ID generated which - * identifies this particular execution of the task */ -typedef unique_id task_iid; - -/* The node id is an identifier for the node the task is - * scheduled on */ -typedef unique_id node_id; - -/* Callback for subscribing to the task queue. The only argument this - * callback gets is the task_id of the. */ -typedef void (*task_queue_callback)(task_iid *task_iid, task_spec *task); - -/* Submit task to the global scheduler. */ -void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task); - -/* Submit task to a local scheduler based on the decision made by the global - * scheduler. */ -void task_queue_schedule_task(db_handle *db, task_iid task_iid, node_id node); - -/* Subscribe to task queue. */ -void task_queue_register_callback(db_handle *db, task_queue_callback callback); - -#endif diff --git a/task.h b/task.h index 96c97b8..6603f1a 100644 --- a/task.h +++ b/task.h @@ -1,7 +1,8 @@ + #ifndef TASK_H #define TASK_H -/* This API specifies the task data structure. It is in C so we can +/* This API specifies the task data structures. It is in C so we can * easily construct tasks from other languages like Python. The datastructures * are also defined in such a way that memory is contiguous and all pointers * are relative, so that we can memcpy the datastructure and ship it over the @@ -15,6 +16,24 @@ typedef unique_id function_id; typedef unique_id object_id; +/* The task ID is a deterministic hash of the function ID that + * the task executes and the argument IDs or argument values */ +typedef unique_id task_id; + +/* The task instance ID is a globally unique ID generated which + * identifies this particular execution of the task */ +typedef unique_id task_iid; + +/* The node id is an identifier for the node the task is + * scheduled on */ +typedef unique_id node_id; + +/* + * TASK SPECIFICATIONS: Contain all the information neccessary + * to execute the task (function id, arguments, return object ids). + * + */ + typedef struct task_spec_impl task_spec; /* If argument is passed by value or reference. */ @@ -68,4 +87,45 @@ void print_task(task_spec *spec, UT_string *output); /* Parse task as printed by print_task. */ task_spec *parse_task(char *task_string, int64_t task_length); +/* + * SCHEDULED TASK: Contains information about a scheduled task: + * the task iid, the task specification and the task status + * (WAITING, SCHEDULED, RUNNING, DONE) and which node the + * task is scheduled on. + * + */ + +/* The scheduling_state can be used as a flag when we are listening for an event, + * for example TASK_WAITING | TASK_SCHEDULED. */ +enum scheduling_state { + TASK_WAITING = 1, + TASK_SCHEDULED = 2, + TASK_RUNNING = 4, + TASK_DONE = 8 +}; + +/* A task instance is one execution of a task specification. + * It has a unique instance id, a state of execution (see scheduling_state) + * and a node it is scheduled on or running on. */ +typedef struct task_instance_impl task_instance; + +/* Allocate and initialize a new task instance. Must be freed with + * scheduled_task_free after use. */ +task_instance *make_task_instance(task_iid task_iid, task_spec *task, int32_t state, node_id node); + +/* Size of task instance structure in bytes. */ +int64_t task_instance_size(task_instance *instance); + +/* Instance ID of the task instance. */ +task_iid *task_instance_id(task_instance *instance); + +/* Node this task instance has been assigned to or is running on. */ +node_id *task_instance_node(task_instance *instance); + +/* Task specification of this task instance. */ +task_spec *task_instance_task_spec(task_instance *instance); + +/* Free this task instance datastructure. */ +void task_instance_free(task_instance *instance); + #endif