streaq.utils

class streaq.utils.TimezoneFormatter(fmt: str | None = None, datefmt: str | None = None, tz: tzinfo | None = None, **kwargs: Any)

Bases: Formatter

converter(*args: Any) struct_time
localtime([seconds]) -> (tm_year,tm_mon,tm_mday,tm_hour,tm_min,

tm_sec,tm_wday,tm_yday,tm_isdst)

Convert seconds since the Epoch to a time tuple expressing local time. When ‘seconds’ is not passed in, convert the current time instead.

streaq.utils.asyncify(fn: Callable[[P], R], limiter: CapacityLimiter | None = None) Callable[[P], Coroutine[Any, Any, R]]

Taken from asyncer v0.0.8

Take a blocking function and create an async one that receives the same positional and keyword arguments, and that when called, calls the original function in a worker thread using anyio.to_thread.run_sync().

If the task waiting for its completion is cancelled, the thread will still run its course but its result will be ignored.

Example usage:

def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
    return "stuff"

result = await to_thread.asyncify(do_work)(
    "spam",
    "ham",
    kwarg1="a",
    kwarg2="b"
)
print(result)
Parameters:
fn: Callable[[P], R]

a blocking regular callable (e.g. a function)

limiter: CapacityLimiter | None = None

a CapacityLimiter instance to limit the number of concurrent threads running the blocking function.

Returns:

An async function that takes the same positional and keyword arguments as the original one, that when called runs the same original function in a thread worker and returns the result.

streaq.utils.default_log_config(tz: tzinfo, verbose: bool) dict[str, Any]

Setup default config. for dictConfig.

Parameters:
tz: tzinfo

timezone for logs

verbose: bool

level: DEBUG if True, INFO if False

Returns:

dict suitable for logging.config.dictConfig

async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], /) tuple[T1, T2]
async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], /) tuple[T1, T2, T3]
async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], /) tuple[T1, T2, T3, T4]
async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], awaitable5: Awaitable[T5], /) tuple[T1, T2, T3, T4, T5]
async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], awaitable5: Awaitable[T5], awaitable6: Awaitable[T6], /) tuple[T1, T2, T3, T4, T5, T6]
async streaq.utils.gather(*awaitables: Awaitable[T1]) tuple[T1, ...]

anyio-compatible implementation of asyncio.gather that runs tasks in a task group and collects the results.

streaq.utils.import_string(dotted_path: str) Any

Taken from pydantic.utils. Import and return the object at a path.

streaq.utils.now_ms() int

Get current time in milliseconds.

streaq.utils.to_tuple(val: Any) tuple[Any, ...]

Turn the given value into a tuple of one element, unless it’s already a tuple, in which case it’s left untouched.