
quiv is a lightweight background task scheduler for Python applications.
It is designed to work especially well with FastAPI apps that need predictable, in-process background task orchestration.
Supports Python 3.10 through 3.14.
It provides:
- threadpool-backed execution
- support for sync and async task handlers
- cooperative cancellation (
_stop_event) - progress callbacks routed to your main async loop (
_progress_hook) - event listeners for task and job lifecycle events
- persistent task/job state via SQLModel + SQLite
When to use quiv
Use quiv when you need in-process background scheduling for app-level jobs,
for example:
- polling APIs every N seconds
- periodic cleanup tasks
- one-shot delayed jobs
- progress-aware long-running workloads
Install
uv add quiv
pip install quiv
Quick example
from contextlib import asynccontextmanager
from fastapi import FastAPI
from quiv import Quiv
scheduler = Quiv(timezone="UTC")
def ping(_progress_hook=None):
for i in range(30):
# do some work
if _progress_hook:
_progress_hook(message="ping", progress=i, total=30)
async def on_progress(**payload):
# Replace with websocket broadcast, logging, metrics, etc.
print("progress", payload)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
scheduler.start()
yield
# Shutdown
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start-heartbeat")
def start_heartbeat():
task_id = scheduler.add_task(
task_name="heartbeat",
func=ping,
interval=30,
progress_callback=on_progress,
)
return {"task_id": task_id}
Async handlers work the same way:
async def fetch_updates(_stop_event=None):
# async handlers run in thread-local event loops
await some_async_api_call()
scheduler.add_task(task_name="fetch", func=fetch_updates, interval=60)
FastAPI usage
For a full FastAPI integration example (startup/shutdown lifecycle plus
_stop_event and _progress_hook), see the FastAPI section in
Getting Started.
Concepts
- Task: scheduling definition (
interval,run_once, args/kwargs, status) - Job: one execution record of a task
- Task statuses:
active,running,paused - Job statuses:
scheduled,running,completed,cancelled,failed
Why quiv?
Python has several task schedulers — APScheduler, arq, rq, sched, schedule, and others. quiv was born out of gaps none of them filled well.
Cooperative cancellation
I am the developor of Trailarr, an open-source app for downloading and managing trailers for media libraries, Trailarr is a fastapi app at it's core and was using APScheduler for background tasks and things that shouldn't block the main thread/async loop.
As the app grew, users started requesting a way to stop long-running tasks mid-execution. None of the existing schedulers offered a clean mechanism for this.
quiv solves it with _stop_event: a per-job threading.Event that is injected into your handler so you can check it at natural breakpoints and exit early when cancellation is requested.
Progress callbacks across thread boundaries
Apps with a frontend or any sort of UI often need background tasks to report progress back to the main thread — for example, to push websocket messages to a UI.
There was no straightforward way to call an async function on the main event loop from inside a threadpool worker.
quiv solves this with _progress_hook: your handler calls it with arbitrary payload data, and the scheduler dispatches your registered callback on the main asyncio loop, where it can broadcast over websockets or update application state.
Job-level tracing via _job_id
When you need to trace exactly what happened during a specific run of a task,
log correlation is essential. quiv injects a unique _job_id (UUID string)
into every handler invocation, giving you a stable identifier you can attach
to log records, metrics, or spans.
Trailarr uses this today: every task
handler receives _job_id from quiv and sets it as a trace_id on the
logger. All log lines emitted during that run carry the same trace id, so
filtering logs by a single job is a one-line query — no matter how many tasks
ran concurrently.
import logging
from config.logging_context import with_logging_context
logger = logging.getLogger(__name__)
@with_logging_context
def download_trailer(
media_id: int,
_job_id: str | None = None,
_stop_event=None,
):
# quiv injects _job_id and with_logging_context decorator stores it as trace_id
# logs handler will get it using get_trace_id and adds it to all logs
# so logs from the task will be logged with that trace_id
# Attach job_id as trace context for this run
logger.info("Starting download for media %s", media_id)
# ... do work, all logs carry the same trace_id ...
If your app needs any of these patterns, quiv might be a good fit.
Important caveats
- Temporary database: each
Quivinstance creates a temporary SQLite file that is deleted onshutdown(). Task/job state does not persist across restarts. - Single-process: the scheduler runs in-process. It is not designed for distributed or multi-process deployments.
- Picklable args:
argsandkwargspassed toadd_task()are pickle-serialized for persistence. Most Python objects are supported, but lambdas and inner functions are not picklable. The temporary SQLite database is trusted internal state — only your application code writes to it, and it is deleted onshutdown(). Do not expose the database file to untrusted input.