streaq.task¶
- class streaq.task.AsyncRegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any], fn: AsyncTask[P, R])¶
Bases:
RegisteredTask,Generic[P,R]Registered task definition for an async function that allows spawning new tasks to be enqueued.
- class streaq.task.RegisteredMiddleware(_wrapped: Callable[[Callable[[...], Coroutine[Any, Any, Any]]], Callable[[...], Coroutine[Any, Any, Any]]])¶
Bases:
objectRegistered middleware definition, allows for accessing task context.
- property context : TaskContext¶
Get the current task’s unique context. Only available in running middlewares.
- class streaq.task.RegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any])¶
Bases:
objectBase task registry definition containing task properties from the decorator.
-
build_context(id: str, tries: int =
1) TaskContext¶ Creates the context for a task to be run given task metadata
- property context : TaskContext¶
Get the current task’s unique context. Only available in running tasks.
-
build_context(id: str, tries: int =
- class streaq.task.SyncRegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any], fn: SyncTask[P, R])¶
Bases:
RegisteredTask,Generic[P,R]Registered task definition for a sync function that allows spawning new tasks to be enqueued.
- class streaq.task.Task(args: tuple[Any, ...], kwargs: dict[str, Any], parent: RegisteredTask, worker: Worker[Any], id: str = <factory>, _after: Task[Any, Any] | None = None, after: list[str] = <factory>, delay: timedelta | int | None = None, schedule: datetime | str | None = None, priority: str | None = None, _triggers: Task[Any, Any] | None = None, _fails_over: bool = False, _prev_source: str | None = None)¶
Bases:
Generic[P,R]Represents a task that has been enqueued or scheduled.
Awaiting the object directly will enqueue it.
- async info() TaskInfo | None¶
Fetch info about a previously enqueued task.
- Returns:¶
task info, unless task has finished or doesn’t exist
- otherwise(task: AsyncRegisteredTask[P, R] | SyncRegisteredTask[P, R]) Task[P, R]¶
Enqueues the given task as a fallback of this one. If this task fails, the other task will be run with the same arguments. If this task succeeds, the other task will be skipped and return the same result.
- Parameters:¶
- task: AsyncRegisteredTask[P, R] | SyncRegisteredTask[P, R]¶
task to fall back to
- Returns:¶
task object for newly created, dependent task
-
async result(timeout: timedelta | int | None =
None) TaskResult[R]¶ Wait for and return the task’s result, optionally with a timeout.
- async serialize(enqueue_time: int) Any¶
Serializes the task data for sending to the queue.
-
start(after: str | Iterable[str] | None =
None, delay: timedelta | int | None =None, schedule: datetime | str | None =None, priority: str | None =None) Task[P, R]¶ Configure the task to modify schedule, queue, or dependencies.
- Parameters:¶
- after: str | Iterable[str] | None =
None¶ task ID(s) to wait for before running this task
- delay: timedelta | int | None =
None¶ duration to wait before running the task
- schedule: datetime | str | None =
None¶ datetime at which to run the task, or crontab for repeated scheduling, follows the specification here.
- priority: str | None =
None¶ priority queue to insert the task
- after: str | Iterable[str] | None =
- Returns:¶
self
- async status() TaskStatus¶
Fetch the current status of the task.
- Returns:¶
current task status
- then(task: AsyncRegisteredTask[Concatenate[R, POther], ROther] | SyncRegisteredTask[Concatenate[R, POther], ROther], *_: __SPHINX_IMMATERIAL_TYPE_VAR__P_POther, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P_POther) Task[Concatenate[R, POther], ROther]¶
- then(task: Callable[[Unpack[Ts]], Coroutine[Any, Any, ROther]] | Callable[[Unpack[Ts]], ROther], **kwargs: Any) Task[Any, ROther]
Enqueues the given task as a dependent of this one. Positional arguments must come from the previous task’s output (tuple outputs will be unpacked), and any additional arguments can be passed as kwargs.
- Parameters:¶
- task: AsyncRegisteredTask[Concatenate[R, POther], ROther] | SyncRegisteredTask[Concatenate[R, POther], ROther]¶
- task: Callable[[Unpack[Ts]], Coroutine[Any, Any, ROther]] | Callable[[Unpack[Ts]], ROther]
task to feed output to
- Returns:¶
task object for newly created, dependent task
- class streaq.task.TaskInfo(task_id: str, fn_name: str, created_time: int, args: tuple[~typing.Any, ...]=(), kwargs: dict[str, ~typing.Any]=<factory>, tries: int = 0, scheduled: datetime | None = None, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, status: TaskStatus = TaskStatus.NOT_FOUND)¶
Bases:
objectDataclass containing information about an unfinished task (running or enqueued).
- class streaq.task.TaskResult(task_id: str, fn_name: str, created_time: int, enqueue_time: int, success: bool, start_time: int, finish_time: int, tries: int, worker_id: str, _result: R | BaseException)¶
Bases:
Generic[R]Dataclass wrapping the result of a task with additional information like run time and whether execution terminated successfully.
- class streaq.task.TaskStatus(value)¶
Bases:
StrEnumEnum of possible task statuses:
-
DONE =
'done'¶ task was completed
-
NOT_FOUND =
'missing'¶ task doesn’t exist in Redis
-
QUEUED =
'queued'¶ task is in the live queue
-
RUNNING =
'running'¶ task is running on a worker
-
SCHEDULED =
'scheduled'¶ task is in the delayed queue
-
DONE =