streaq.task

class streaq.task.AsyncRegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any], fn: AsyncTask[P, R])

Bases: RegisteredTask, Generic[P, R]

Registered task definition for an async function that allows spawning new tasks to be enqueued.

enqueue(*args: __SPHINX_IMMATERIAL_TYPE_VAR__P_P, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P_P) Task[P, R]

Serialize the task and send it to the queue for later execution by an active worker. Though this isn’t async, it should be awaited as it returns an object that should be.

class streaq.task.RegisteredMiddleware(_wrapped: Callable[[Callable[[...], Coroutine[Any, Any, Any]]], Callable[[...], Coroutine[Any, Any, Any]]])

Bases: object

Registered middleware definition, allows for accessing task context.

property context : TaskContext

Get the current task’s unique context. Only available in running middlewares.

class streaq.task.RegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any])

Bases: object

Base task registry definition containing task properties from the decorator.

build_context(id: str, tries: int = 1) TaskContext

Creates the context for a task to be run given task metadata

property context : TaskContext

Get the current task’s unique context. Only available in running tasks.

class streaq.task.SyncRegisteredTask(*, expire: timedelta | int | None, max_schedule_drift: timedelta | int | None, max_tries: int | None, silent: bool, timeout: timedelta | int | None, ttl: timedelta | int | None, unique: bool, fn_name: str, crontab: str | None, worker: Worker[Any], fn: SyncTask[P, R])

Bases: RegisteredTask, Generic[P, R]

Registered task definition for a sync function that allows spawning new tasks to be enqueued.

enqueue(*args: __SPHINX_IMMATERIAL_TYPE_VAR__P_P, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P_P) Task[P, R]

Serialize the task and send it to the queue for later execution by an active worker. Though this isn’t async, it should be awaited as it returns an object that should be.

class streaq.task.Task(args: tuple[Any, ...], kwargs: dict[str, Any], parent: RegisteredTask, worker: Worker[Any], id: str = <factory>, _after: Task[Any, Any] | None = None, after: list[str] = <factory>, delay: timedelta | int | None = None, schedule: datetime | str | None = None, priority: str | None = None, _triggers: Task[Any, Any] | None = None, _fails_over: bool = False, _prev_source: str | None = None)

Bases: Generic[P, R]

Represents a task that has been enqueued or scheduled.

Awaiting the object directly will enqueue it.

async abort(timeout: timedelta | int = 5) bool

Notify workers that the task should be aborted.

Parameters:
timeout: timedelta | int = 5

how long to wait to confirm abortion was successful

Returns:

whether the task was aborted successfully

async info() TaskInfo | None

Fetch info about a previously enqueued task.

Returns:

task info, unless task has finished or doesn’t exist

otherwise(task: AsyncRegisteredTask[P, R] | SyncRegisteredTask[P, R]) Task[P, R]

Enqueues the given task as a fallback of this one. If this task fails, the other task will be run with the same arguments. If this task succeeds, the other task will be skipped and return the same result.

Parameters:
task: AsyncRegisteredTask[P, R] | SyncRegisteredTask[P, R]

task to fall back to

Returns:

task object for newly created, dependent task

async result(timeout: timedelta | int | None = None) TaskResult[R]

Wait for and return the task’s result, optionally with a timeout.

Parameters:
timeout: timedelta | int | None = None

amount of time to wait before raising a TimeoutError

Returns:

wrapped result object

async serialize(enqueue_time: int) Any

Serializes the task data for sending to the queue.

Parameters:
enqueue_time: int

the time at which the task was enqueued

Returns:

serialized task data

start(after: str | Iterable[str] | None = None, delay: timedelta | int | None = None, schedule: datetime | str | None = None, priority: str | None = None) Task[P, R]

Configure the task to modify schedule, queue, or dependencies.

Parameters:
after: str | Iterable[str] | None = None

task ID(s) to wait for before running this task

delay: timedelta | int | None = None

duration to wait before running the task

schedule: datetime | str | None = None

datetime at which to run the task, or crontab for repeated scheduling, follows the specification here.

priority: str | None = None

priority queue to insert the task

Returns:

self

async status() TaskStatus

Fetch the current status of the task.

Returns:

current task status

then(task: AsyncRegisteredTask[Concatenate[R, POther], ROther] | SyncRegisteredTask[Concatenate[R, POther], ROther], *_: __SPHINX_IMMATERIAL_TYPE_VAR__P_POther, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P_POther) Task[Concatenate[R, POther], ROther]
then(task: Callable[[Unpack[Ts]], Coroutine[Any, Any, ROther]] | Callable[[Unpack[Ts]], ROther], **kwargs: Any) Task[Any, ROther]

Enqueues the given task as a dependent of this one. Positional arguments must come from the previous task’s output (tuple outputs will be unpacked), and any additional arguments can be passed as kwargs.

Parameters:
task: AsyncRegisteredTask[Concatenate[R, POther], ROther] | SyncRegisteredTask[Concatenate[R, POther], ROther]
task: Callable[[Unpack[Ts]], Coroutine[Any, Any, ROther]] | Callable[[Unpack[Ts]], ROther]

task to feed output to

Returns:

task object for newly created, dependent task

async unschedule() bool

Stop scheduling the repeating task if registered.

Returns:

whether the task was unscheduled successfully

class streaq.task.TaskInfo(task_id: str, fn_name: str, created_time: int, args: tuple[~typing.Any, ...]=(), kwargs: dict[str, ~typing.Any]=<factory>, tries: int = 0, scheduled: datetime | None = None, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, status: TaskStatus = TaskStatus.NOT_FOUND)

Bases: object

Dataclass containing information about an unfinished task (running or enqueued).

class streaq.task.TaskResult(task_id: str, fn_name: str, created_time: int, enqueue_time: int, success: bool, start_time: int, finish_time: int, tries: int, worker_id: str, _result: R | BaseException)

Bases: Generic[R]

Dataclass wrapping the result of a task with additional information like run time and whether execution terminated successfully.

class streaq.task.TaskStatus(value)

Bases: StrEnum

Enum of possible task statuses:

DONE = 'done'

task was completed

NOT_FOUND = 'missing'

task doesn’t exist in Redis

QUEUED = 'queued'

task is in the live queue

RUNNING = 'running'

task is running on a worker

SCHEDULED = 'scheduled'

task is in the delayed queue