streaq.worker¶
- class streaq.worker.Worker(redis_url: str = 'redis://localhost:6379', redis_pool: ~coredis.pool._basic.ConnectionPool[~typing.Any] | ~coredis.pool._cluster.ClusterConnectionPool | None = None, redis_kwargs: dict[str, ~typing.Any] | None = None, concurrency: int = 16, sync_concurrency: int | None = None, queue_name: str = 'default', priorities: list[str] | None = None, prefetch: int | None = None, lifespan: ~collections.abc.Callable[[], ~contextlib.AbstractAsyncContextManager[__SPHINX_IMMATERIAL_TYPE_VAR__V_C]] = <function _lifespan>, serializer: ~collections.abc.Callable[[~typing.Any], bytes | str | ~collections.abc.Awaitable[bytes | str]] = <built-in function dumps>, deserializer: ~collections.abc.Callable[[bytes], ~typing.Any] = <built-in function loads>, tz: ~datetime.tzinfo = datetime.timezone.utc, handle_signals: bool = True, signing_secret: str | None = None, idle_timeout: ~datetime.timedelta | float = 60, grace_period: int = 0, anyio_backend: ~typing.Literal['asyncio', 'trio'] = 'asyncio', anyio_kwargs: dict[str, ~typing.Any] | None = None, sentinel_nodes: list[tuple[str, int]] | None = None, sentinel_master: str = 'mymaster', sentinel_kwargs: dict[str, ~typing.Any] | None = None, cluster_nodes: list[tuple[str, int]] | None = None, id: str | None = None)¶
Bases:
AsyncContextManagerMixin,Generic[C]Worker object that fetches and executes tasks from a queue.
- Parameters:¶
- redis_url
connection URI for Redis
- redis_pool
coredis connection pool for Redis client
- redis_kwargs
additional keyword arguments for Redis client
- concurrency
number of tasks the worker can run simultaneously
- sync_concurrency
max number of synchronous tasks the worker can run simultaneously in separate threads; defaults to the same as
concurrency- queue_name
name of queue in Redis
- priorities
list of priorities from lowest to highest
- prefetch
max number of tasks to prefetch from Redis, defaults to same as
concurrency- lifespan
async context manager that wraps worker execution and provides task dependencies
- serializer
function to serialize task data for Redis
- deserializer
function to deserialize task data from Redis
- tz
timezone to use for cron jobs
- handle_signals
whether to handle signals for graceful shutdown
- signing_secret
if provided, used to sign data stored in Redis, which can improve security especially if using pickle. For binary serializers only. You can generate a key using secrets, for example: secrets.token_urlsafe(32)
- idle_timeout
the number of seconds to wait before re-enqueuing idle tasks (either prefetched tasks that don’t run, or running tasks that become unresponsive)
- grace_period
the number of seconds after receiving SIGINT or SIGTERM to wait for tasks to finish before performing a hard shutdown
- anyio_backend
anyio backend to use, either Trio or asyncio
- anyio_kwargs
extra arguments to pass to anyio backend
- sentinel_nodes
list of (address, port) tuples to create sentinel from
- sentinel_master
name of sentinel master to use
- sentinel_kwargs
extra arguments to pass to sentinel (but not instances)
-
async abort_by_id(task_id: str, timeout: timedelta | int | None =
5) bool¶ Notify workers that the task should be aborted, then wait for confirmation.
- abort_tasks(tasks: set[str]) None¶
Aborts tasks scheduled for abortion if they’re present on this worker.
- burst¶
whether to shut down the worker when the queue is empty; set via CLI
- async consumer(queue: MemoryObjectReceiveStream[StreamMessage], limiter: CapacityLimiter, scope: CancelScope) None¶
Listen for and run tasks from the queue.
- property context : C¶
Get the workers’s unique context. Only available in running workers.
- counters : dict[str, int]¶
mapping of type of task -> number of tasks of that type eg
{"completed": 4, "failed": 1, "retried": 0}
-
cron(tab: str, *, max_schedule_drift: timedelta | int | None =
None, max_tries: int | None =3, name: str | None =None, silent: bool =False, timeout: timedelta | int | None =datetime.timedelta(seconds=3600), ttl: timedelta | int | None =datetime.timedelta(seconds=300), unique: bool =True)¶ Registers a task to be run at regular intervals as specified.
- Parameters:¶
- tab: str¶
crontab for scheduling, follows the specification here.
- max_schedule_drift: timedelta | int | None =
None¶ maximum amount of time a cron task can be delayed from its scheduled execution time before getting discarded. If None, no check is performed.
- max_tries: int | None =
3¶ number of times to retry the task should it fail during execution
- name: str | None =
None¶ use a custom name for the cron job instead of the function name
- silent: bool =
False¶ whether to silence task logs; defaults to False
- timeout: timedelta | int | None =
datetime.timedelta(seconds=3600)¶ time after which to abort the task, if None will never time out
- ttl: timedelta | int | None =
datetime.timedelta(seconds=300)¶ time to store results in Redis, if None will never expire
- unique: bool =
True¶ whether multiple instances of the task can exist simultaneously
- async deserialize(data: Any) Any¶
Wrap deserializer to validate signature from last 32 bytes if applicable.
- async enqueue_many(tasks: Sequence[Task[Any, Any]]) None¶
Enqueue multiple tasks for immediate execution. This uses a Redis pipeline, so it’s more efficient than awaiting each individual task. Not reliable for tasks with dependencies, which should be enqueued individually.
Example usage:
# importantly, we're not using `await` here tasks = [foobar.enqueue(i) for i in range(10)] async with worker: await worker.enqueue_many(tasks)
- enqueue_unsafe(fn_name: str, *args: Any, **kwargs: Any) Task[Any, Any]¶
Allows for enqueuing a task that is registered elsewhere without having access to the worker it’s registered to. This is unsafe because it doesn’t check if the task is registered with the worker and doesn’t enforce types, so it should only be used if you need to separate the task queuing and task execution code. You also lose the ability to control certain parameters like uniqueness and queue expiration time. Consider using type stubs instead as explained here.
- async fail_task_dependents(dependents: list[str], otherwise: str | None) None¶
Fail dependents for the given task.
-
async finish_failed_task(msg: StreamMessage, exc: BaseException, tries: int, created_time: int, fn_name: str =
'Unknown', ttl: timedelta | int | None =300, otherwise: str | None =None) None¶ Serialize a failed task with metadata and handle failure.
- async finish_task(msg: StreamMessage, finish: bool, schedule: int | None, return_value: Any, start_time: int, finish_time: int, created_time: int, fn_name: str, success: bool, silent: bool, ttl: timedelta | int | None, triggers: str | None, lock_key: str | None, tries: int, otherwise: str | None) None¶
Cleanup for a task that executed successfully or will be retried.
-
async get_tasks_by_status(status: Literal[TaskStatus.SCHEDULED] | Literal[TaskStatus.QUEUED], *, priority: str | None =
None, limit: int =100) list[TaskInfo]¶ -
async get_tasks_by_status(status: Literal[TaskStatus.RUNNING], *, limit: int =
100) list[TaskInfo] -
async get_tasks_by_status(status: Literal[TaskStatus.DONE], *, limit: int =
100) list[TaskResult[Any]] -
async get_tasks_by_status(status: TaskStatus, *, limit: int =
100) list[TaskInfo] | list[TaskResult[Any]] Get tasks by their status.
- id¶
unique ID of worker
- include(other: Worker[Any]) None¶
Copy another worker’s tasks and cron jobs to the current worker.
This works by modifying the included worker’s tasks to point to this worker instead. Since only one worker should be running per process, this generally works as expected. If you want to run a worker that is included in another worker elsewhere, make sure the included worker isn’t aware of its parent worker at import time.
- middleware(new_middleware: Callable[[Callable[[...], Coroutine[Any, Any, Any]]], Callable[[...], Coroutine[Any, Any, Any]]]) RegisteredMiddleware¶
Registers the given middleware with the worker.
- async producer(queue: MemoryObjectSendStream[StreamMessage], limiter: CapacityLimiter, scope: CancelScope) None¶
Listen for new tasks or stale tasks from the stream and add them to the queue.
-
async queue_size(include_scheduled: bool =
True) int¶ Returns the number of tasks currently queued in Redis.
- property redis : Redis[str] | RedisCluster[str]¶
Worker’s coredis client instance. Only available inside the worker’s context manager.
- registry : dict[str, AsyncRegisteredTask[Any, Any] | SyncRegisteredTask[Any, Any]]¶
mapping of task name -> task wrapper
- async renew_idle_timeouts(scope: CancelScope) None¶
Periodically renew idle timeout for running tasks. This allows the queue to be resilient to sudden shutdowns. Additionally marks worker as healthy.
-
async result_by_id(task_id: str, timeout: timedelta | int | None =
None) TaskResult[Any]¶ Wait for and return the given task’s result, optionally with a timeout.
- async run_async(*, task_status: TaskStatus[None] = <anyio._core._tasks._IgnoredTaskStatus object>) None¶
Async function to run the worker, finally closes worker connections. Groups together and runs worker tasks.
- async run_consumers(receive: ~anyio.streams.memory.MemoryObjectReceiveStream[~streaq.types.StreamMessage], renew_scope: ~anyio.CancelScope, produce_scope: ~anyio.CancelScope, *, task_status: ~anyio.abc.TaskStatus[~anyio.CapacityLimiter] = <anyio._core._tasks._IgnoredTaskStatus object>) None¶
Run all consumers in a dedicated task group, finally clean up.
- async run_task(msg: StreamMessage) None¶
Execute the registered task, then store the result in Redis.
- async schedule_cron_jobs(ready: tuple[str, ...]) None¶
Schedules any pending cron jobs for future execution.
- async schedule_delayed_tasks() None¶
Schedule tasks in the delayed queue for execution, schedule cron jobs, and cancel tasks marked for abortion.
- async serialize(data: Any) str | bytes¶
Wrap serializer to append signature as last 32 bytes if applicable.
- async signal_handler(scope: CancelScope) None¶
Gracefully shutdown the worker when a signal is received. Doesn’t work on Windows!
- async status_by_id(task_id: str) TaskStatus¶
Fetch the current status of the given task.
- task(fn: Callable[[P], Coroutine[Any, Any, R]], /) AsyncRegisteredTask[P, R]¶
- task(fn: Callable[[P], R], /) SyncRegisteredTask[P, R]
-
task(*, expire: timedelta | int | None =
None, max_tries: int | None =3, name: str | None =None, silent: bool =False, timeout: timedelta | int | None =None, ttl: timedelta | int | None =timedelta(minutes=5), unique: bool =False) TaskDecorator Registers a task with the worker which can later be enqueued by the user.
- Parameters:¶
- expire: timedelta | int | None =
None time after which to dequeue the task, if None will never be dequeued
- max_tries: int | None =
3 number of times to retry the task should it fail during execution
- name: str | None =
None use a custom name for the task instead of the function name
- silent: bool =
False whether to silence task logs; defaults to False
- timeout: timedelta | int | None =
None time after which to abort the task, if None will never time out
- ttl: timedelta | int | None =
timedelta(minutes=5) time to store results in Redis, if None will never expire
- unique: bool =
False whether multiple instances of the task can exist simultaneously
- expire: timedelta | int | None =