streaq.worker

class streaq.worker.Worker(redis_url: str = 'redis://localhost:6379', redis_pool: ~coredis.pool._basic.ConnectionPool[~typing.Any] | ~coredis.pool._cluster.ClusterConnectionPool | None = None, redis_kwargs: dict[str, ~typing.Any] | None = None, concurrency: int = 16, sync_concurrency: int | None = None, queue_name: str = 'default', priorities: list[str] | None = None, prefetch: int | None = None, lifespan: ~collections.abc.Callable[[], ~contextlib.AbstractAsyncContextManager[__SPHINX_IMMATERIAL_TYPE_VAR__V_C]] = <function _lifespan>, serializer: ~collections.abc.Callable[[~typing.Any], bytes | str | ~collections.abc.Awaitable[bytes | str]] = <built-in function dumps>, deserializer: ~collections.abc.Callable[[bytes], ~typing.Any] = <built-in function loads>, tz: ~datetime.tzinfo = datetime.timezone.utc, handle_signals: bool = True, signing_secret: str | None = None, idle_timeout: ~datetime.timedelta | float = 60, grace_period: int = 0, anyio_backend: ~typing.Literal['asyncio', 'trio'] = 'asyncio', anyio_kwargs: dict[str, ~typing.Any] | None = None, sentinel_nodes: list[tuple[str, int]] | None = None, sentinel_master: str = 'mymaster', sentinel_kwargs: dict[str, ~typing.Any] | None = None, cluster_nodes: list[tuple[str, int]] | None = None, id: str | None = None)

Bases: AsyncContextManagerMixin, Generic[C]

Worker object that fetches and executes tasks from a queue.

Parameters:
redis_url

connection URI for Redis

redis_pool

coredis connection pool for Redis client

redis_kwargs

additional keyword arguments for Redis client

concurrency

number of tasks the worker can run simultaneously

sync_concurrency

max number of synchronous tasks the worker can run simultaneously in separate threads; defaults to the same as concurrency

queue_name

name of queue in Redis

priorities

list of priorities from lowest to highest

prefetch

max number of tasks to prefetch from Redis, defaults to same as concurrency

lifespan

async context manager that wraps worker execution and provides task dependencies

serializer

function to serialize task data for Redis

deserializer

function to deserialize task data from Redis

tz

timezone to use for cron jobs

handle_signals

whether to handle signals for graceful shutdown

signing_secret

if provided, used to sign data stored in Redis, which can improve security especially if using pickle. For binary serializers only. You can generate a key using secrets, for example: secrets.token_urlsafe(32)

idle_timeout

the number of seconds to wait before re-enqueuing idle tasks (either prefetched tasks that don’t run, or running tasks that become unresponsive)

grace_period

the number of seconds after receiving SIGINT or SIGTERM to wait for tasks to finish before performing a hard shutdown

anyio_backend

anyio backend to use, either Trio or asyncio

anyio_kwargs

extra arguments to pass to anyio backend

sentinel_nodes

list of (address, port) tuples to create sentinel from

sentinel_master

name of sentinel master to use

sentinel_kwargs

extra arguments to pass to sentinel (but not instances)

async abort_by_id(task_id: str, timeout: timedelta | int | None = 5) bool

Notify workers that the task should be aborted, then wait for confirmation.

Parameters:
task_id: str

ID of the task to abort

timeout: timedelta | int | None = 5

how long to wait to confirm abortion was successful. None means wait forever, 0 means don’t wait at all.

Returns:

whether the task was aborted successfully

abort_tasks(tasks: set[str]) None

Aborts tasks scheduled for abortion if they’re present on this worker.

burst

whether to shut down the worker when the queue is empty; set via CLI

async consumer(queue: MemoryObjectReceiveStream[StreamMessage], limiter: CapacityLimiter, scope: CancelScope) None

Listen for and run tasks from the queue.

property context : C

Get the workers’s unique context. Only available in running workers.

counters : dict[str, int]

mapping of type of task -> number of tasks of that type eg {"completed": 4, "failed": 1, "retried": 0}

cron(tab: str, *, max_schedule_drift: timedelta | int | None = None, max_tries: int | None = 3, name: str | None = None, silent: bool = False, timeout: timedelta | int | None = datetime.timedelta(seconds=3600), ttl: timedelta | int | None = datetime.timedelta(seconds=300), unique: bool = True)

Registers a task to be run at regular intervals as specified.

Parameters:
tab: str

crontab for scheduling, follows the specification here.

max_schedule_drift: timedelta | int | None = None

maximum amount of time a cron task can be delayed from its scheduled execution time before getting discarded. If None, no check is performed.

max_tries: int | None = 3

number of times to retry the task should it fail during execution

name: str | None = None

use a custom name for the cron job instead of the function name

silent: bool = False

whether to silence task logs; defaults to False

timeout: timedelta | int | None = datetime.timedelta(seconds=3600)

time after which to abort the task, if None will never time out

ttl: timedelta | int | None = datetime.timedelta(seconds=300)

time to store results in Redis, if None will never expire

unique: bool = True

whether multiple instances of the task can exist simultaneously

async deserialize(data: Any) Any

Wrap deserializer to validate signature from last 32 bytes if applicable.

async enqueue_many(tasks: Sequence[Task[Any, Any]]) None

Enqueue multiple tasks for immediate execution. This uses a Redis pipeline, so it’s more efficient than awaiting each individual task. Not reliable for tasks with dependencies, which should be enqueued individually.

Parameters:
tasks: Sequence[Task[Any, Any]]

sequence of task objects to enqueue

Example usage:

# importantly, we're not using `await` here
tasks = [foobar.enqueue(i) for i in range(10)]
async with worker:
    await worker.enqueue_many(tasks)
enqueue_unsafe(fn_name: str, *args: Any, **kwargs: Any) Task[Any, Any]

Allows for enqueuing a task that is registered elsewhere without having access to the worker it’s registered to. This is unsafe because it doesn’t check if the task is registered with the worker and doesn’t enforce types, so it should only be used if you need to separate the task queuing and task execution code. You also lose the ability to control certain parameters like uniqueness and queue expiration time. Consider using type stubs instead as explained here.

Parameters:
fn_name: str

name of the function to run

*args: Any

positional arguments for the task

**kwargs: Any

keyword arguments for the task

Returns:

task object

async fail_task_dependents(dependents: list[str], otherwise: str | None) None

Fail dependents for the given task.

async finish_failed_task(msg: StreamMessage, exc: BaseException, tries: int, created_time: int, fn_name: str = 'Unknown', ttl: timedelta | int | None = 300, otherwise: str | None = None) None

Serialize a failed task with metadata and handle failure.

async finish_task(msg: StreamMessage, finish: bool, schedule: int | None, return_value: Any, start_time: int, finish_time: int, created_time: int, fn_name: str, success: bool, silent: bool, ttl: timedelta | int | None, triggers: str | None, lock_key: str | None, tries: int, otherwise: str | None) None

Cleanup for a task that executed successfully or will be retried.

async get_tasks_by_status(status: Literal[TaskStatus.SCHEDULED] | Literal[TaskStatus.QUEUED], *, priority: str | None = None, limit: int = 100) list[TaskInfo]
async get_tasks_by_status(status: Literal[TaskStatus.RUNNING], *, limit: int = 100) list[TaskInfo]
async get_tasks_by_status(status: Literal[TaskStatus.DONE], *, limit: int = 100) list[TaskResult[Any]]
async get_tasks_by_status(status: TaskStatus, *, limit: int = 100) list[TaskInfo] | list[TaskResult[Any]]

Get tasks by their status.

Parameters:
status: Literal[TaskStatus.SCHEDULED] | Literal[TaskStatus.QUEUED]
status: Literal[TaskStatus.RUNNING]
status: Literal[TaskStatus.DONE]
status: TaskStatus

the task status to filter by

priority: str | None = None

filter by priority queue

limit: int = 100

maximum number of tasks to return

Returns:

list of tasks with given status

id

unique ID of worker

include(other: Worker[Any]) None

Copy another worker’s tasks and cron jobs to the current worker.

This works by modifying the included worker’s tasks to point to this worker instead. Since only one worker should be running per process, this generally works as expected. If you want to run a worker that is included in another worker elsewhere, make sure the included worker isn’t aware of its parent worker at import time.

Parameters:
other: Worker[Any]

worker to copy tasks and cron jobs from

async info_by_id(task_id: str) TaskInfo | None

Fetch info about an unfinished task.

Parameters:
task_id: str

ID of the task to get info for

Returns:

task info, unless task has finished or doesn’t exist

middleware(new_middleware: Callable[[Callable[[...], Coroutine[Any, Any, Any]]], Callable[[...], Coroutine[Any, Any, Any]]]) RegisteredMiddleware

Registers the given middleware with the worker.

middlewares : list[Middleware]

list of middlewares added to the worker

next_run(tab: str) int

Given a cron tab, get the next run time in ms.

Parameters:
tab: str

cron tab to calculate next run for

async producer(queue: MemoryObjectSendStream[StreamMessage], limiter: CapacityLimiter, scope: CancelScope) None

Listen for new tasks or stale tasks from the stream and add them to the queue.

async queue_size(include_scheduled: bool = True) int

Returns the number of tasks currently queued in Redis.

Parameters:
include_scheduled: bool = True

whether to include tasks in the delayed queue also

property redis : Redis[str] | RedisCluster[str]

Worker’s coredis client instance. Only available inside the worker’s context manager.

registry : dict[str, AsyncRegisteredTask[Any, Any] | SyncRegisteredTask[Any, Any]]

mapping of task name -> task wrapper

async renew_idle_timeouts(scope: CancelScope) None

Periodically renew idle timeout for running tasks. This allows the queue to be resilient to sudden shutdowns. Additionally marks worker as healthy.

async result_by_id(task_id: str, timeout: timedelta | int | None = None) TaskResult[Any]

Wait for and return the given task’s result, optionally with a timeout.

Parameters:
task_id: str

ID of the task to get results for

timeout: timedelta | int | None = None

amount of time to wait before raising a TimeoutError

Returns:

wrapped result object

async run_async(*, task_status: TaskStatus[None] = <anyio._core._tasks._IgnoredTaskStatus object>) None

Async function to run the worker, finally closes worker connections. Groups together and runs worker tasks.

async run_consumers(receive: ~anyio.streams.memory.MemoryObjectReceiveStream[~streaq.types.StreamMessage], renew_scope: ~anyio.CancelScope, produce_scope: ~anyio.CancelScope, *, task_status: ~anyio.abc.TaskStatus[~anyio.CapacityLimiter] = <anyio._core._tasks._IgnoredTaskStatus object>) None

Run all consumers in a dedicated task group, finally clean up.

run_sync() None

Sync function to run the worker, finally closes worker connections.

async run_task(msg: StreamMessage) None

Execute the registered task, then store the result in Redis.

running() int

Get the number of currently running tasks in the worker.

async schedule_cron_jobs(ready: tuple[str, ...]) None

Schedules any pending cron jobs for future execution.

async schedule_delayed_tasks() None

Schedule tasks in the delayed queue for execution, schedule cron jobs, and cancel tasks marked for abortion.

async serialize(data: Any) str | bytes

Wrap serializer to append signature as last 32 bytes if applicable.

async signal_handler(scope: CancelScope) None

Gracefully shutdown the worker when a signal is received. Doesn’t work on Windows!

async status_by_id(task_id: str) TaskStatus

Fetch the current status of the given task.

Parameters:
task_id: str

ID of the task to check

Returns:

status of the task

task(fn: Callable[[P], Coroutine[Any, Any, R]], /) AsyncRegisteredTask[P, R]
task(fn: Callable[[P], R], /) SyncRegisteredTask[P, R]
task(*, expire: timedelta | int | None = None, max_tries: int | None = 3, name: str | None = None, silent: bool = False, timeout: timedelta | int | None = None, ttl: timedelta | int | None = timedelta(minutes=5), unique: bool = False) TaskDecorator

Registers a task with the worker which can later be enqueued by the user.

Parameters:
expire: timedelta | int | None = None

time after which to dequeue the task, if None will never be dequeued

max_tries: int | None = 3

number of times to retry the task should it fail during execution

name: str | None = None

use a custom name for the task instead of the function name

silent: bool = False

whether to silence task logs; defaults to False

timeout: timedelta | int | None = None

time after which to abort the task, if None will never time out

ttl: timedelta | int | None = timedelta(minutes=5)

time to store results in Redis, if None will never expire

unique: bool = False

whether multiple instances of the task can exist simultaneously

async unschedule_by_id(task_id: str) bool

Stop scheduling the repeating task if registered.

Parameters:
task_id: str

ID of the task to unregister

Returns:

whether the task was unscheduled successfully