Getting started¶
To start, you’ll need to define your global dependencies which your tasks will need access to at run time. This is done with a dataclass or NamedTuple:
worker.py¶
from dataclasses import dataclass
from httpx import AsyncClient
@dataclass
class WorkerContext:
"""
Type safe way of defining the dependencies of your tasks.
e.g. HTTP client, database connection, settings.
"""
http_client: AsyncClient
Now, when creating a Worker object, you can provide an async context manager “lifespan” which will initialize any global dependencies you want to have access to in your tasks:
worker.py¶
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncGenerator
from httpx import AsyncClient
from streaq import Worker
@dataclass
class WorkerContext:
"""
Type safe way of defining the dependencies of your tasks.
e.g. HTTP client, database engine, settings.
"""
http_client: AsyncClient
@asynccontextmanager
async def lifespan() -> AsyncGenerator[WorkerContext, None]:
"""
Here, we initialize the worker's dependencies.
You can also do any startup/shutdown work here
"""
async with AsyncClient() as http_client:
yield WorkerContext(http_client)
my_worker = Worker(redis_url="redis://localhost:6379", lifespan=lifespan)
You can then register async tasks to the worker like this:
worker.py¶
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncGenerator
from httpx import AsyncClient
from streaq import Worker
@dataclass
class WorkerContext:
"""
Type safe way of defining the dependencies of your tasks.
e.g. HTTP client, database connection, settings.
"""
http_client: AsyncClient
@asynccontextmanager
async def lifespan() -> AsyncGenerator[WorkerContext, None]:
"""
Here, we initialize the worker's dependencies.
You can also do any startup/shutdown work here
"""
async with AsyncClient() as http_client:
yield WorkerContext(http_client)
my_worker = Worker(redis_url="redis://localhost:6379", lifespan=lifespan)
@my_worker.task(timeout=5)
async def fetch(url: str) -> int:
res = await my_worker.context.http_client.get(url)
return len(res.text)
Now let’s save the file and spin up a worker which will pick up future tasks:
$ streaq run worker:my_worker
Note
The worker path format for the CLI is module.submodule:object.
Finally, let’s create a script to queue up some tasks via the worker’s async context manager:
script.py¶
from anyio import run
from script import my_worker, fetch
async def main():
async with my_worker:
await fetch.enqueue("https://tastyware.dev/")
# enqueue returns a task object that can be used to get results/info
task = await fetch.enqueue("https://github.com/tastyware/streaq").start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
run(main) # necessary for async code, or you could use an async REPL
We can run this script with $ python script.py. You should see output like:
TaskInfo(fn_name='fetch', enqueue_time=1756365588232, tries=0, scheduled=datetime.datetime(2025, 8, 28, 7, 19, 51, 232000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
TaskResult(fn_name='fetch', enqueue_time=1756365588232, success=True, start_time=1756365591327, finish_time=1756365592081, tries=1, worker_id='12195ce1', _result=303659)
And the worker logs should look like this:
[INFO] 2025-09-23 07:19:48: task fetch □ 45d7ff032e6d42239e9f479a2fc4b70e → worker 12195ce1
[INFO] 2025-09-23 07:19:48: task fetch ■ 45d7ff032e6d42239e9f479a2fc4b70e ← 15
[INFO] 2025-09-23 07:19:51: task fetch □ 65e687f9ba644a1fbe23096fa246dfe1 → worker 12195ce1
[INFO] 2025-09-23 07:19:52: task fetch ■ 65e687f9ba644a1fbe23096fa246dfe1 ← 303659