Skip to content

Bigger Applications

In larger FastAPI projects, code is split across multiple packages and modules. This guide shows how to structure quiv in that setup — shared scheduler instance, tasks defined in separate files, API endpoints for runtime control, WebSocket progress updates, and graceful cancellation.

Project structure

myapp/
├── main.py              # FastAPI app, lifespan, WebSocket
├── scheduler.py         # Quiv instance (shared singleton)
├── tasks/
│   ├── __init__.py
│   ├── cleanup.py       # DB cleanup task
│   └── report.py        # Report generation task
└── routes/
    ├── __init__.py
    └── tasks.py          # Task control endpoints

1) Create the scheduler instance

Define the Quiv instance in its own module so every other file can import it. Do not call start() here — that happens in the FastAPI lifespan.

# myapp/scheduler.py
from quiv import Quiv

scheduler = Quiv(
    pool_size=4,
    history_retention_seconds=7200,
    timezone="America/New_York",
)

Since Quiv lazily resolves the asyncio event loop, this works at module level before FastAPI or uvicorn creates a loop.

2) Create a logging context (optional)

Create a logging context that holds the trace_id which can be later used for logs

# myapp/config/logging_context.py
import asyncio
from contextvars import ContextVar, Token
from functools import wraps
import uuid

# Define a ContextVar to hold the Trace ID
trace_id_var: ContextVar[str | None] = ContextVar("trace_id", default=None)


def get_trace_id() -> str | None:
    """Returns the current Trace ID."""
    return trace_id_var.get()


def get_new_trace_id() -> str:
    """Generates and returns a new Trace ID without setting it in the context."""
    return str(uuid.uuid4())


def generate_trace_id(trace_id: str | None = None) -> Token:
    """Generates a new Trace ID and sets it in the context."""
    if trace_id is None:
        trace_id = str(uuid.uuid4())
    return trace_id_var.set(trace_id)


def clear_trace_id(token: Token) -> None:
    """Clears the Trace ID from the context."""
    trace_id_var.reset(token)


def with_logging_context(func):
    """Decorator to add a Trace ID to the logging context for the duration of the function call.
    Args:
        func: The function to decorate.
    Returns:
        The decorated function with Trace ID management.
    """

    @wraps(func)
    async def async_wrapper(*args, **kwargs):
        # 1. Extract or Create
        trace_id = kwargs.get("_job_id") or kwargs.get("trace_id")

        token = generate_trace_id(trace_id)  # Falls back to uuid4() internally
        try:
            return await func(*args, **kwargs)
        finally:
            clear_trace_id(token)

    @wraps(func)
    def sync_wrapper(*args, **kwargs):
        # 1. Extract or Create
        trace_id = kwargs.get("_job_id") or kwargs.get("trace_id")

        token = generate_trace_id(trace_id)
        try:
            return func(*args, **kwargs)
        finally:
            clear_trace_id(token)

    # Return the appropriate wrapper based on the source function
    return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

Make the logging adapter or logging handler retrieve the trace_id and add it to logs.

For full working code with logging context, stop events, and progress hooks, see Trailarr.

3) Define tasks in separate files

Each task file imports the shared scheduler to register its task via add_task(). Tasks are plain functions — sync or async.

Cleanup task (sync, with stop event)

# myapp/tasks/cleanup.py
import logging
import threading
import time

from config.logging_context import with_logging_context

logger = logging.getLogger(__name__)

@with_logging_context
def cleanup_stale_records(
    days: int,
    _job_id: str | None = None,
    _stop_event: threading.Event | None = None,
):
    """Delete records older than `days` from the database."""

    # quiv injects _job_id and with_logging_context decorator stores it as trace_id
    # logs handler will get it using get_trace_id and adds it to all logs
    # so logs from the task will be logged with that trace_id
    # Attach job_id as trace context for this run

    batches = 10
    for batch in range(1, batches + 1):
        if _stop_event and _stop_event.is_set():
            logger.info("Cleanup cancelled at batch %d/%d", batch, batches)
            return

        # ... delete a batch of old records ...
        time.sleep(1)  # simulate work

    logger.info("Cleanup finished: processed %d batches", batches)

Report task (sync, with stop event and progress hook)

# myapp/tasks/report.py
import logging
import threading
import time
from typing import Callable

from config.logging_context import with_logging_context

logger = logging.getLogger(__name__)

@with_logging_context
def generate_report(
    report_type: str,
    _job_id: str | None = None,
    _stop_event: threading.Event | None = None,
    _progress_hook: Callable | None = None,
):
    """Generate a report with progress updates."""
    steps = 5
    for step in range(1, steps + 1):
        if _stop_event and _stop_event.is_set():
            logger.info("Report generation cancelled at step %d/%d", step, steps)
            return

        # ... do a chunk of report work ...
        time.sleep(2)  # simulate work

        if _progress_hook:
            _progress_hook(
                step=step,
                total=steps,
                report_type=report_type,
            )

    logger.info("Report '%s' generated successfully", report_type)

4) Register tasks and wire up the lifespan

The main module registers tasks, starts the scheduler on startup, and shuts it down on teardown. This is also where you set up WebSocket-based progress callbacks.

# myapp/main.py
import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from quiv import Event
from quiv.models import Task, Job

from myapp.scheduler import scheduler
from myapp.tasks.cleanup import cleanup_stale_records
from myapp.tasks.report import generate_report

logger = logging.getLogger(__name__)

# ---------- WebSocket connection manager ----------

class ConnectionManager:
    """Track active WebSocket connections for progress broadcasts."""

    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.connections.remove(websocket)

    async def broadcast(self, message: dict):
        for ws in self.connections:
            try:
                await ws.send_json(message)
            except Exception:
                pass

ws_manager = ConnectionManager()


# ---------- Progress callback ----------

async def on_report_progress(**payload):
    """Forward task progress to all connected WebSocket clients."""
    logger.info("Report progress: %s", payload)
    await ws_manager.broadcast({"event": "progress", "data": payload})


# ---------- Lifespan ----------

async def on_job_event(event: Event, task: Task, job: Job) -> None:
    """Forward job lifecycle events to WebSocket clients."""
    payload: dict = {"event": event.value, "task": task.task_name, "job_id": job.id}
    if job.duration_seconds is not None:
        payload["duration_seconds"] = job.duration_seconds
    if job.error_message is not None:
        payload["error"] = job.error_message
    await ws_manager.broadcast(payload)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Register event listeners
    scheduler.add_listener(Event.JOB_STARTED, on_job_event)
    scheduler.add_listener(Event.JOB_COMPLETED, on_job_event)
    scheduler.add_listener(Event.JOB_FAILED, on_job_event)

    # Register and start tasks
    scheduler.add_task(
        task_name="db-cleanup",
        func=cleanup_stale_records,
        interval=3600,
        kwargs={"days": 30},
    )
    scheduler.add_task(
        task_name="weekly-report",
        func=generate_report,
        interval=604800,
        delay=10,
        kwargs={"report_type": "weekly-summary"},
        progress_callback=on_report_progress,
    )
    scheduler.start()
    yield
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)


# ---------- WebSocket endpoint ----------

@app.websocket("/ws/progress")
async def progress_websocket(websocket: WebSocket):
    await ws_manager.connect(websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        ws_manager.disconnect(websocket)

5) API endpoints for runtime control

A separate router imports the same scheduler instance to expose task management endpoints.

# myapp/routes/tasks.py
from fastapi import APIRouter, HTTPException

from myapp.scheduler import scheduler

router = APIRouter(prefix="/tasks", tags=["tasks"])


@router.post("/{task_id}/run")
def run_task_now(task_id: str):
    """Trigger a scheduled task to run immediately."""
    try:
        count = scheduler.run_task_immediately(task_id)
    except Exception as e:
        raise HTTPException(status_code=404, detail=str(e))
    return {"queued": count}


@router.post("/{task_id}/pause")
def pause_task(task_id: str):
    """Pause a task by id."""
    try:
        scheduler.pause_task(task_id)
    except Exception as e:
        raise HTTPException(status_code=404, detail=str(e))
    return {"status": "paused"}


@router.post("/{task_id}/resume")
def resume_task(task_id: str, delay: int = 0):
    """Resume a paused task, optionally with a delay."""
    try:
        scheduler.resume_task(task_id, delay=delay)
    except Exception as e:
        raise HTTPException(status_code=404, detail=str(e))
    return {"status": "resumed"}


@router.get("/{task_id}")
def get_task(task_id: str):
    """Get a single task by id."""
    try:
        return scheduler.get_task(task_id)
    except Exception as e:
        raise HTTPException(status_code=404, detail=str(e))


@router.get("/")
def list_tasks():
    """List all scheduled tasks."""
    return scheduler.get_all_tasks()


@router.get("/jobs")
def list_jobs(status: str | None = None):
    """List jobs, optionally filtered by status."""
    return scheduler.get_all_jobs(status=status)


@router.post("/jobs/{job_id}/cancel")
def cancel_job(job_id: str):
    """Cancel a running job."""
    cancelled = scheduler.cancel_job(job_id)
    if not cancelled:
        raise HTTPException(status_code=404, detail="Job not found or not running")
    return {"status": "cancelled"}

Task IDs in your API

Since add_task() returns a task_id (UUID string), you can store it in your application state or return it to clients. All runtime operations (pause_task, resume_task, run_task_immediately, remove_task) use task_id as the identifier.

Task and Job are SQLModel objects, so FastAPI serializes them directly — no manual conversion needed. All datetime fields (next_run_at, started_at, ended_at) are guaranteed to be timezone-aware UTC, so the JSON output will include a +00:00 suffix that browsers can parse and display in the user's local timezone.

Register the router in your app:

# add to myapp/main.py
from myapp.routes.tasks import router as tasks_router

app.include_router(tasks_router)

Run the full example

A complete runnable version of this app is in the examples/fastapi_app directory. From the repository root:

uv run uvicorn examples.fastapi_app.main:app --reload

Then open http://127.0.0.1:8000/docs for interactive API docs, or connect to ws://127.0.0.1:8000/ws/progress for live progress updates.

Key takeaways

  • Single instance, shared everywhere. Create Quiv in one module and import it wherever needed. This avoids multiple schedulers and duplicate DB files.
  • Module-level init is safe. Quiv() does not require a running asyncio loop at creation time. The event loop is resolved lazily when progress callbacks fire.
  • Lifespan owns the lifecycle. Call start() and shutdown() in the FastAPI lifespan so the scheduler is tied to the app process.
  • Tasks are plain functions. Define them anywhere. They only need _stop_event and _progress_hook in their signature if they want cancellation or progress support. See Cancellation and Progress Callbacks for detailed guides.
  • Progress goes through WebSocket. Async progress callbacks run on FastAPI's event loop, so they can broadcast to WebSocket clients directly. See Progress Callbacks for dispatch details.
  • Event listeners for observability. Use add_listener() to react to task and job lifecycle events. Async listeners run on the main loop, so they can broadcast to WebSocket clients alongside progress callbacks. See Event Listeners for the full event list.