streaq.utils¶
-
class streaq.utils.TimezoneFormatter(fmt: str | None =
None, datefmt: str | None =None, tz: tzinfo | None =None, **kwargs: Any)¶ Bases:
Formatter- converter(*args: Any) struct_time¶
- localtime([seconds]) -> (tm_year,tm_mon,tm_mday,tm_hour,tm_min,
tm_sec,tm_wday,tm_yday,tm_isdst)
Convert seconds since the Epoch to a time tuple expressing local time. When ‘seconds’ is not passed in, convert the current time instead.
-
streaq.utils.asyncify(fn: Callable[[P], R], limiter: CapacityLimiter | None =
None) Callable[[P], Coroutine[Any, Any, R]]¶ Taken from asyncer v0.0.8
Take a blocking function and create an async one that receives the same positional and keyword arguments, and that when called, calls the original function in a worker thread using anyio.to_thread.run_sync().
If the task waiting for its completion is cancelled, the thread will still run its course but its result will be ignored.
Example usage:
def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: return "stuff" result = await to_thread.asyncify(do_work)( "spam", "ham", kwarg1="a", kwarg2="b" ) print(result)
- streaq.utils.default_log_config(tz: tzinfo, verbose: bool) dict[str, Any]¶
Setup default config. for dictConfig.
- async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], /) tuple[T1, T2]¶
- async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], /) tuple[T1, T2, T3]
- async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], /) tuple[T1, T2, T3, T4]
- async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], awaitable5: Awaitable[T5], /) tuple[T1, T2, T3, T4, T5]
- async streaq.utils.gather(awaitable1: Awaitable[T1], awaitable2: Awaitable[T2], awaitable3: Awaitable[T3], awaitable4: Awaitable[T4], awaitable5: Awaitable[T5], awaitable6: Awaitable[T6], /) tuple[T1, T2, T3, T4, T5, T6]
- async streaq.utils.gather(*awaitables: Awaitable[T1]) tuple[T1, ...]
anyio-compatible implementation of asyncio.gather that runs tasks in a task group and collects the results.
- streaq.utils.import_string(dotted_path: str) Any¶
Taken from pydantic.utils. Import and return the object at a path.