Getting Started
This guide gets quiv running with recurring tasks, progress callbacks,
and clean shutdown behavior.
Install
uv add quiv
pip install quiv
For local development:
git clone https://github.com/nandyalu/quiv.git
cd quiv
uv pip install -e ".[dev]"
git clone https://github.com/nandyalu/quiv.git
cd quiv
pip install -e ".[dev]"
1) Create a scheduler
You can configure Quiv with either a QuivConfig object or direct args.
from quiv import Quiv, QuivConfig
scheduler = Quiv(
config=QuivConfig(
pool_size=8, # default is 10
history_retention_seconds=3600, # default is 86400 (1 day)
timezone="UTC", # default is UTC
)
)
Equivalent direct parameters:
from quiv import Quiv
scheduler = Quiv(
pool_size=8, # default is 10
history_retention_seconds=3600, # default is 86400 (1 day)
timezone="UTC", # default is UTC
)
Do not mix config=... with direct constructor config args. See Quiv API for full configuration options.
2) Add a task
Sync handler
def my_task(
_job_id: str | None = None,
_stop_event: threading.Event | None = None,
_progress_hook: Callable | None = None,
):
total = 5
for step in range(1, total + 1):
# <do some task work here>
if _progress_hook:
_progress_hook(step=step, total=total)
if _stop_event and _stop_event.is_set():
return
task_id = scheduler.add_task(
task_name="demo-task",
func=my_task,
interval=10,
delay=0,
run_once=False,
args=(),
kwargs={},
)
Async handler
Async handlers are fully supported. They run in thread-local event loops created per invocation, so they do not block the scheduler or main loop.
import httpx
async def poll_api(
_stop_event: threading.Event | None = None,
_progress_hook: Callable | None = None,
):
async with httpx.AsyncClient() as client:
# example of doing some async work
response = await client.get("https://api.example.com/status")
if _progress_hook:
_progress_hook(status_code=response.status_code)
if _stop_event and _stop_event.is_set():
return
scheduler.add_task(
task_name="api-poll",
func=poll_api,
interval=30,
)
_job_id, _stop_event, and _progress_hook are injected only if your
handler accepts those keyword parameters. If your handler signature does not
include them (and does not use **kwargs), they are not injected. See
Progress Callbacks and Cancellation
for in-depth guides.
Hold onto task_id
add_task() returns a task_id (UUID string). All runtime operations —
pause_task(), resume_task(), run_task_immediately(), remove_task(),
and get_task() — use this id. Multiple tasks can share the same
task_name; each gets its own unique task_id.
3) Add progress callback (optional)
Progress callbacks can be sync or async. When an asyncio event loop is
available, async callbacks run via run_coroutine_threadsafe and sync
callbacks run via call_soon_threadsafe on the main loop. If no event loop
is available (e.g. in a plain script without asyncio), sync callbacks run
directly on the worker thread and async callbacks run in a temporary event
loop on the worker thread.
async def on_progress(**payload):
print("progress", payload)
scheduler.add_task(
task_name="demo-task-with-progress",
func=my_task,
interval=10,
progress_callback=on_progress,
)
4) Listen for events (optional)
Event listeners let you react to task and job lifecycle events. Register a
callback with add_listener():
from quiv import Event
from quiv.models import Task, Job
def on_job_completed(event: Event, task: Task, job: Job):
print(f"Job {job.id} for '{task.task_name}' completed in {job.duration_seconds}s")
def on_job_failed(event: Event, task: Task, job: Job):
print(f"Job {job.id} for '{task.task_name}' failed: {job.error_message}")
scheduler.add_listener(Event.JOB_COMPLETED, on_job_completed)
scheduler.add_listener(Event.JOB_FAILED, on_job_failed)
Typed callbacks
TASK_* listeners receive (event, task). JOB_* listeners receive
(event, task, job). Both use typed model objects with full IDE
autocomplete — no dict key lookups.
Listeners follow the same dispatch model as progress callbacks: async listeners
run on the main loop, sync listeners run via call_soon_threadsafe (or
directly on the calling thread when no loop is available). Exceptions in
listeners are logged and swallowed. See Event Listeners
for the full event list and dispatch details.
5) Start and stop
import asyncio
async def main() -> None:
scheduler.startup()
await asyncio.sleep(25)
scheduler.shutdown()
asyncio.run(main())
Always call shutdown() (or stop()) when your app exits.
startup() / shutdown() is the recommended pair, but start() / stop()
works identically — they are aliases.
6) Operate tasks at runtime
task_id = scheduler.add_task(
task_name="demo-task",
func=my_task,
interval=10,
)
scheduler.run_task_immediately(task_id)
scheduler.pause_task(task_id)
scheduler.resume_task(task_id)
7) Cancel a running job
jobs = scheduler.get_all_jobs(status="running")
for job in jobs:
scheduler.cancel_job(job.id)
Cancellation is cooperative: it sets the job's stop event. The handler must
check _stop_event.is_set() to actually stop.
8) Inspect state
tasks = scheduler.get_all_tasks(include_run_once=True)
jobs = scheduler.get_all_jobs()
failed_jobs = scheduler.get_all_jobs(status="failed")
FastAPI integration example
quiv is intended for app-integrated task scheduling, especially in FastAPI.
Use the lifespan context manager to tie scheduler lifecycle to the app:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from quiv import Quiv
scheduler = Quiv(timezone="UTC")
def reindex_documents(_stop_event=None, _progress_hook=None) -> None:
total = 100
for step in range(1, total + 1):
if _stop_event and _stop_event.is_set():
return
# Simulate blocking work
import time
time.sleep(0.05)
if _progress_hook:
_progress_hook(step=step, total=total, stage="reindex")
async def on_reindex_progress(**payload) -> None:
# Replace with websocket broadcast, logging, metrics, etc.
print("progress", payload)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
scheduler.add_task(
task_name="reindex-docs",
func=reindex_documents,
interval=300,
progress_callback=on_reindex_progress,
)
scheduler.start()
yield
# Shutdown
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
Why this matters:
_stop_eventmakes long tasks cancel safely on shutdown._progress_hooksends task progress back into FastAPI's async context.- Scheduler lifecycle is tied cleanly to app lifecycle.
Logging
quiv uses Python's standard logging module. If you do not configure
logging, no output is produced (Python's default NullHandler behavior).
To see scheduler logs, configure the "Quiv" logger:
import logging
logging.basicConfig(level=logging.INFO)
Or configure the "Quiv" logger directly for more control:
import logging
quiv_logger = logging.getLogger("Quiv")
quiv_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
quiv_logger.addHandler(handler)
You can also inject your own logger instance:
import logging
my_logger = logging.getLogger("myapp.scheduler")
scheduler = Quiv(logger=my_logger)
The library logs at these levels:
| Level | What is logged |
|---|---|
| DEBUG | Database table creation, datetime normalization |
| INFO | Task added, scheduler loop start, job start/completion, cleanup |
| WARNING | Progress callback skipped (no event loop or main loop closed) |
| ERROR | Job failures, scheduler loop errors, progress callback errors |
A separate "quiv.models" logger emits DEBUG-level messages for datetime
normalization. This logger is not configurable via the constructor and follows
standard Python logging configuration.
Troubleshooting
ConfigurationErroron startup: checkpool_size > 0andhistory_retention_seconds >= 0.InvalidTimezoneError: use a valid IANA timezone name (for exampleUTCorAmerica/New_York).HandlerNotRegisteredErrorfor immediate run: calladd_task(...)first, and use the returnedtask_id.TaskNotScheduledError: the task handler is registered, but the scheduled task row no longer exists in the database.- No log output: configure Python logging (see Logging above).
- Args/kwargs errors:
argsandkwargsare pickle-serialized, so most Python objects are supported. If you encounter errors, ensure the objects are picklable (e.g. lambdas and inner functions are not).