Middleware¶
Creating middleware¶
You can define middleware to wrap task execution. This has a host of potential applications, like observability and exception handling. Here’s an example which times function execution:
import time
from typing import Any
from streaq.types import ReturnCoroutine
@worker.middleware
def timer(task: ReturnCoroutine) -> ReturnCoroutine:
async def wrapper(*args, **kwargs) -> Any:
start_time = time.perf_counter()
result = await task(*args, **kwargs)
print(f"Executed task {timer.context.task_id} in {time.perf_counter() - start_time:.3f}s")
return result
return wrapper
Middleware are structured as wrapped functions for maximum flexibility–not only can you run code before/after execution, you can also access and even modify the arguments or results. Often you’ll use the task context which can be accessed as seen here at RegisteredMiddleware.context.
Note
If you’re trying to set up observability with OpenTelemetry, the opentelemetry-instrumentation-streaq package provides automated, end-to-end distributed tracing for
your streaQ task queues.
Stacking middleware¶
You can register as many middleware as you like to a worker, which will run them in the same order they were registered.
from streaq import StreaqRetry
@worker.middleware
def timer(task: ReturnCoroutine) -> ReturnCoroutine:
async def wrapper(*args, **kwargs) -> Any:
start_time = time.perf_counter()
result = await task(*args, **kwargs)
print(f"Executed task {timer.context.task_id} in {time.perf_counter() - start_time:.3f}s")
return result
return wrapper
# retry all exceptions up to a max of 3 tries
@worker.middleware
def retry(task: ReturnCoroutine) -> ReturnCoroutine:
async def wrapper(*args, **kwargs) -> Any:
try:
return await task(*args, **kwargs)
except Exception as e:
if retry.context.tries <= 3:
raise StreaqRetry("Retrying on error!", delay=1) from e
else:
raise e
return wrapper