Skip to content

Progress Callbacks

Progress callbacks let task handlers report live progress back to your application. This is useful for updating UIs, broadcasting WebSocket messages, logging metrics, or tracking long-running work.

How it works

When a handler calls _progress_hook(...), quiv dispatches the registered progress callback for that task. The dispatch path depends on whether an asyncio event loop is available and whether the callback is sync or async.

flowchart TD
    A["Handler calls _progress_hook(...)"] --> B{Callback registered?}
    B -- No --> C[Return silently]
    B -- Yes --> D{Event loop available?}
    D -- Yes --> E{Async callback?}
    D -- No --> F{Async callback?}
    E -- Yes --> G["run_coroutine_threadsafe()
    on main loop"]
    E -- No --> H["call_soon_threadsafe()
    on main loop"]
    F -- Yes --> I["Run in temporary event loop"]
    F -- No --> J["Call directly on
    worker thread"]

The four dispatch paths

Event loop Callback type What happens
Available Async Dispatched via run_coroutine_threadsafe on the main loop
Available Sync Dispatched via call_soon_threadsafe on the main loop
Unavailable Sync Called directly on the worker thread
Unavailable Async Run in a temporary event loop on the worker thread

Event loop resolution

quiv does not require an event loop at startup. The main loop is lazily resolved the first time a progress callback fires:

sequenceDiagram
    participant App as Application
    participant Q as Quiv()
    participant UV as Uvicorn / asyncio

    App->>Q: Quiv() at module level
    Note over Q: _main_loop = None
    UV->>UV: Event loop starts
    App->>Q: scheduler.start()
    Note over Q: Loop thread begins
    Q->>Q: Handler calls _progress_hook
    Q->>Q: _resolve_main_loop()
    Q->>UV: asyncio.get_running_loop()
    Note over Q: _main_loop cached
    Q->>UV: Dispatch callback on loop

This means Quiv() can be instantiated at module level before FastAPI or uvicorn creates an event loop — the common pattern for larger applications.

Adding a progress callback

Pass a progress_callback when adding a task:

async def on_progress(**payload):
    print("progress", payload)

scheduler.add_task(
    task_name="my-task",
    func=my_handler,
    interval=60,
    progress_callback=on_progress,
)

Writing a handler with progress reporting

Add _progress_hook to your handler's signature. quiv inspects the signature and only injects it if the parameter is present.

import threading
from typing import Callable


def process_records(
    batch_size: int,
    _stop_event: threading.Event | None = None,
    _progress_hook: Callable | None = None,
):
    records = fetch_records(batch_size)
    total = len(records)

    for i, record in enumerate(records, 1):
        if _stop_event and _stop_event.is_set():
            return

        process(record)

        if _progress_hook:
            _progress_hook(
                step=i,
                total=total,
                pct=round(i / total * 100),
            )

The handler does not need to know whether the callback is sync or async, or whether an event loop exists. It just calls _progress_hook(...) and quiv handles the dispatch.

Async progress callback

Async callbacks run on the main event loop via run_coroutine_threadsafe. This is ideal for FastAPI apps where you want to broadcast to WebSocket clients:

from fastapi import WebSocket

connected_clients: list[WebSocket] = []


async def on_progress(**payload):
    for ws in connected_clients:
        await ws.send_json({"event": "progress", "data": payload})


scheduler.add_task(
    task_name="etl-pipeline",
    func=run_etl,
    interval=3600,
    progress_callback=on_progress,
)

Since the callback runs on FastAPI's event loop, you can safely use await with WebSockets, database sessions, or any async API.

Sync progress callback

Sync callbacks work identically from the handler's perspective. When an event loop is available, they run on the main loop via call_soon_threadsafe. When no loop is available (e.g. a plain script), they run directly on the worker thread.

import logging

logger = logging.getLogger(__name__)


def log_progress(**payload):
    logger.info("Task progress: %s", payload)


scheduler.add_task(
    task_name="cleanup",
    func=cleanup_handler,
    interval=300,
    progress_callback=log_progress,
)

Without an event loop

In scripts that don't use asyncio, sync progress callbacks still work — they run directly on the worker thread that executes the handler:

from quiv import Quiv

scheduler = Quiv()


def on_progress(**payload):
    print(f"Step {payload['step']}/{payload['total']}")


def my_task(_progress_hook=None):
    for i in range(1, 6):
        if _progress_hook:
            _progress_hook(step=i, total=5)


scheduler.add_task(
    task_name="script-task",
    func=my_task,
    interval=10,
    progress_callback=on_progress,
)
scheduler.start()

Async progress callbacks also work in this scenario — they run in a temporary event loop on the worker thread, so await calls inside the callback will execute correctly.

Error handling

If a progress callback raises an exception, quiv logs the error but does not fail the job. The handler continues running. This prevents a broken callback from disrupting task execution.

flowchart TD
    A[Handler runs] --> B["_progress_hook(...)"]
    B --> C[Callback dispatched]
    C --> D{Callback raises?}
    D -- No --> E[Continue]
    D -- Yes --> F[Log error]
    F --> E

Payload conventions

_progress_hook accepts any *args and **kwargs. There is no enforced schema, but a useful pattern is:

_progress_hook(
    step=3,        # current step
    total=10,      # total steps
    stage="load",  # descriptive label
    pct=30,        # percentage complete
)

The progress callback receives exactly what the handler passes — quiv uses the task_id internally to look up the registered callback, but this is consumed by the dispatch layer and not forwarded to the callback.