Skip to content

quiv Logo


Python Code style: black License: MIT PyPI Pulls

Build Tests Type Check GitHub Issues GitHub last commit

quiv is a lightweight background task scheduler for Python applications.

It is designed to work especially well with FastAPI apps that need predictable, in-process background task orchestration.

Supports Python 3.10 through 3.14.

It provides:

  • threadpool-backed execution
  • support for sync and async task handlers
  • cooperative cancellation (_stop_event)
  • progress callbacks routed to your main async loop (_progress_hook)
  • event listeners for task and job lifecycle events
  • persistent task/job state via SQLModel + SQLite

When to use quiv

Use quiv when you need in-process background scheduling for app-level jobs, for example:

  • polling APIs every N seconds
  • periodic cleanup tasks
  • one-shot delayed jobs
  • progress-aware long-running workloads

Install

uv add quiv
pip install quiv

Quick example

from contextlib import asynccontextmanager

from fastapi import FastAPI

from quiv import Quiv

scheduler = Quiv(timezone="UTC")


def ping(_progress_hook=None):
    for i in range(30):
        # do some work
        if _progress_hook:
            _progress_hook(message="ping", progress=i, total=30)


async def on_progress(**payload):
    # Replace with websocket broadcast, logging, metrics, etc.
    print("progress", payload)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    scheduler.start()
    yield
    # Shutdown
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)

@app.post("/start-heartbeat")
def start_heartbeat():
    task_id = scheduler.add_task(
        task_name="heartbeat",
        func=ping,
        interval=30,
        progress_callback=on_progress,
    )
    return {"task_id": task_id}

Async handlers work the same way:

async def fetch_updates(_stop_event=None):
    # async handlers run in thread-local event loops
    await some_async_api_call()

scheduler.add_task(task_name="fetch", func=fetch_updates, interval=60)

FastAPI usage

For a full FastAPI integration example (startup/shutdown lifecycle plus _stop_event and _progress_hook), see the FastAPI section in Getting Started.

Concepts

  • Task: scheduling definition (interval, run_once, args/kwargs, status)
  • Job: one execution record of a task
  • Task statuses: active, running, paused
  • Job statuses: scheduled, running, completed, cancelled, failed

Why quiv?

Python has several task schedulers — APScheduler, arq, rq, sched, schedule, and others. quiv was born out of gaps none of them filled well.

Cooperative cancellation

I am the developor of Trailarr, an open-source app for downloading and managing trailers for media libraries, Trailarr is a fastapi app at it's core and was using APScheduler for background tasks and things that shouldn't block the main thread/async loop.

As the app grew, users started requesting a way to stop long-running tasks mid-execution. None of the existing schedulers offered a clean mechanism for this.

quiv solves it with _stop_event: a per-job threading.Event that is injected into your handler so you can check it at natural breakpoints and exit early when cancellation is requested.

Progress callbacks across thread boundaries

Apps with a frontend or any sort of UI often need background tasks to report progress back to the main thread — for example, to push websocket messages to a UI.

There was no straightforward way to call an async function on the main event loop from inside a threadpool worker.

quiv solves this with _progress_hook: your handler calls it with arbitrary payload data, and the scheduler dispatches your registered callback on the main asyncio loop, where it can broadcast over websockets or update application state.

Job-level tracing via _job_id

When you need to trace exactly what happened during a specific run of a task, log correlation is essential. quiv injects a unique _job_id (UUID string) into every handler invocation, giving you a stable identifier you can attach to log records, metrics, or spans.

Trailarr uses this today: every task handler receives _job_id from quiv and sets it as a trace_id on the logger. All log lines emitted during that run carry the same trace id, so filtering logs by a single job is a one-line query — no matter how many tasks ran concurrently.

import logging

from config.logging_context import with_logging_context

logger = logging.getLogger(__name__)

@with_logging_context
def download_trailer(
    media_id: int,
    _job_id: str | None = None,
    _stop_event=None,
):
    # quiv injects _job_id and with_logging_context decorator stores it as trace_id
    # logs handler will get it using get_trace_id and adds it to all logs
    # so logs from the task will be logged with that trace_id
    # Attach job_id as trace context for this run
    logger.info("Starting download for media %s", media_id)

    # ... do work, all logs carry the same trace_id ...

If your app needs any of these patterns, quiv might be a good fit.

Important caveats

  • Temporary database: each Quiv instance creates a temporary SQLite file that is deleted on shutdown(). Task/job state does not persist across restarts.
  • Single-process: the scheduler runs in-process. It is not designed for distributed or multi-process deployments.
  • Picklable args: args and kwargs passed to add_task() are pickle-serialized for persistence. Most Python objects are supported, but lambdas and inner functions are not picklable. The temporary SQLite database is trusted internal state — only your application code writes to it, and it is deleted on shutdown(). Do not expose the database file to untrusted input.

Next pages