Python implementation of actor, built around cooperative scheduling with generator-based tasks.
pip install actressfrom actress import task
def worker(name: str, delay_ms: int):
yield from task.sleep(delay_ms)
yield from task.send(f"done:{name}")
return name
def main():
a = yield from task.fork(worker("a", 10))
b = yield from task.fork(worker("b", 5))
result_a = yield from task.join(a)
result_b = yield from task.join(b)
return [result_a, result_b]
result = await task.fork(main())
# result == ["a", "b"] (order depends on join order; messages are concurrent)Task[T, X, M]: a generator representing concurrent work.- returns
Ton success - can fail with
X(runtime-enforced by exceptions) - can emit messages
M
- returns
Effect[M]: a generator stream of messages/events.Fork[T, X, M]: handle returned bytask.fork(...); can beawait-ed oryield from-ed.
Tasks run on a cooperative scheduler. They yield control explicitly via library operations like sleep, wait, suspend, and message-producing effects.
Task represents a unit of computation that runs concurrently, a light-weight process. You can spawn many tasks and the cooperative scheduler interleaves execution.
Tasks have three type variables:
- Variable
Tdescribes return type of successful computation. - Variable
Xdescribes error type of failed computation (raised exceptions). - Variable
Mdescribes type of messages this task may produce.
Python does not enforce exception type checking at runtime, so
Xis descriptive rather than guaranteed.
Creates a new concurrent task. It is the primary way to activate a task from outside task context, and usually how you start main work. It returns Fork[T, Exception, M], which is awaitable.
async def entry():
result = await task.fork(main())
print(result)
def main():
return 0
yieldYou can also start concurrent tasks from other tasks. Forked tasks are detached from the parent task unless joined.
def main():
worker = yield from task.fork(work())
print("prints first")
def work():
print("prints second")
yieldWhen a task forks, it gets a Fork reference that can be used to join that task back in. The joining task is suspended until fork completes, then resumes with fork return value. If fork fails, join raises the same error.
Messages from the fork propagate through the task it is joined with.
def main():
worker = yield from task.fork(work())
yield from do_some_other_work()
try:
value = yield from task.join(worker)
except Exception:
passForked task may be aborted by another task if it has a reference to it.
def main():
worker = yield from task.fork(work())
yield from task.sleep(10)
yield from task.abort(worker, Exception("die"))Forked task may be exited successfully by another task if it has a reference to it.
def main():
worker = yield from task.fork(work())
result = yield from do_something_concurrently()
yield from task.exit_(worker, result)Starts concurrent detached task. This is a lightweight alternative to fork, however detached tasks cannot be joined, aborted, or exited via handle.
task.spawn(work())creates task work that spawns provided task when executed. Unlikefork, it is not awaitable as aFork.
def main():
yield from task.spawn(work())
response = yield from task.wait(fetch())
def work():
try:
pass
except Exception:
pass
yieldExecutes top-level task work directly (without returning a Fork).
def app_main():
try:
while True:
break
yield
except Exception:
return
task.main(app_main())More commonly tasks describe asynchronous operations that may fail (HTTP requests, database operations, etc.) and do not produce messages.
These tasks are similar to futures/promises, but they describe asynchronous operations rather than in-flight result objects.
Gets controller of the currently running task. Controller is usually obtained when a task needs to suspend until an outside event occurs.
Suspends current task, which can later be resumed from another task or external callback by calling task.resume(controller_or_fork).
This task never fails, although it may never resume.
finallyblocks still run if execution is aborted.
def sleep_ms(duration):
controller = yield from task.current()
loop = asyncio.get_running_loop()
handle = loop.call_later(duration / 1000, lambda: task.resume(controller))
try:
yield from task.suspend()
finally:
handle.cancel()Suspends execution for the given duration (milliseconds), after which execution resumes (unless task is terminated/aborted in the meantime).
def work():
print("I'm going to take a small nap")
yield from task.sleep(200)
print("I am back to work")Provides equivalent of await in task functions. It takes a value you can wait on (Awaitable[T] | T) and suspends execution until result is available.
Useful when dealing with sometimes async operations.
def fetch_json(url):
response = yield from task.wait(fetch(url))
json_data = yield from task.wait(response.json())
return json_dataExecution is suspended even if input value is not awaitable. Scheduler resumes in the same event-loop turn after processing other queued tasks, avoiding race conditions in mixed sync/async flows.
Takes iterable of tasks and runs them concurrently, returning results in the same order as input tasks (not completion order). If any task fails, all other tasks are aborted and error is thrown into caller.
Effect is another task variant: instead of describing asynchronous operations that may return/fail, it describes asynchronous operations that may produce a cascade of messages.
Effects represent finite streams and complete.
Creates an effect that sends the given message.
def work(url):
try:
response = yield from task.wait(fetch(url))
value = yield from task.wait(response.json())
yield from task.send({"ok": True, "value": value})
except Exception as error:
yield from task.send({"ok": False, "error": error})Turns a task (that never fails or sends messages) into an effect of its result.
Takes several effects and merges them into a single tagged effect so source can be identified via type field.
task.listen({
"read": task.effect(db_read()),
"write": task.effect(db_write()),
})Returns empty Effect, that is produces no messages. Kind of like [] or "", useful when you need to interact with an API that takes Effect, but in your case you produce none.
Takes several effects and combines them into one.
Takes several tasks and creates a combined effect from their results.
Helpers for tagged effect streams.
Runs feedback loops where each emitted message schedules another effect.
Transforms task result:
- success ->
resolve(value) - failure ->
reject(error)
This library tracks the same model, but there are Python-specific differences:
- API names adapted for Python keywords:
all_instead ofallnone_instead ofnoneexit_instead ofexitthen_instead ofthen
- Tasks/effects are Python generators, not iterators/promises.
pip install -e ".[dev]"
pytest -qAll welcome! storacha.network is open-source.
Dual-licensed under Apache-2.0 OR MIT