Skip to content

Getting Started

This guide gets quiv running with recurring tasks, progress callbacks, and clean shutdown behavior.

Install

uv add quiv
pip install quiv

For local development:

git clone https://github.com/nandyalu/quiv.git
cd quiv
uv pip install -e ".[dev]"
git clone https://github.com/nandyalu/quiv.git
cd quiv
pip install -e ".[dev]"

1) Create a scheduler

You can configure Quiv with either a QuivConfig object or direct args.

from quiv import Quiv, QuivConfig

scheduler = Quiv(
    config=QuivConfig(
        pool_size=8,                    # default is 10
        history_retention_seconds=3600, # default is 86400 (1 day)
        timezone="UTC",                 # default is UTC
    )
)

Equivalent direct parameters:

from quiv import Quiv

scheduler = Quiv(
    pool_size=8,                    # default is 10
    history_retention_seconds=3600, # default is 86400 (1 day)
    timezone="UTC",            # default is UTC
)

Do not mix config=... with direct constructor config args. See Quiv API for full configuration options.

2) Add a task

Sync handler

def my_task(
    _job_id: str | None = None,
    _stop_event: threading.Event | None = None,
    _progress_hook: Callable | None = None,
):
    total = 5
    for step in range(1, total + 1):
        # <do some task work here>
        if _progress_hook:
            _progress_hook(step=step, total=total)
        if _stop_event and _stop_event.is_set():
            return

task_id = scheduler.add_task(
    task_name="demo-task",
    func=my_task,
    interval=10,
    delay=0,
    run_once=False,
    args=(),
    kwargs={},
)

Async handler

Async handlers are fully supported. They run in thread-local event loops created per invocation, so they do not block the scheduler or main loop.

import httpx

async def poll_api(
    _stop_event: threading.Event | None = None,
    _progress_hook: Callable | None = None,
):
    async with httpx.AsyncClient() as client:
        # example of doing some async work
        response = await client.get("https://api.example.com/status")
        if _progress_hook:
            _progress_hook(status_code=response.status_code)
        if _stop_event and _stop_event.is_set():
            return

scheduler.add_task(
    task_name="api-poll",
    func=poll_api,
    interval=30,
)

_job_id, _stop_event, and _progress_hook are injected only if your handler accepts those keyword parameters. If your handler signature does not include them (and does not use **kwargs), they are not injected. See Progress Callbacks and Cancellation for in-depth guides.

Hold onto task_id

add_task() returns a task_id (UUID string). All runtime operations — pause_task(), resume_task(), run_task_immediately(), remove_task(), and get_task() — use this id. Multiple tasks can share the same task_name; each gets its own unique task_id.

3) Add progress callback (optional)

Progress callbacks can be sync or async. When an asyncio event loop is available, async callbacks run via run_coroutine_threadsafe and sync callbacks run via call_soon_threadsafe on the main loop. If no event loop is available (e.g. in a plain script without asyncio), sync callbacks run directly on the worker thread and async callbacks run in a temporary event loop on the worker thread.

async def on_progress(**payload):
    print("progress", payload)

scheduler.add_task(
    task_name="demo-task-with-progress",
    func=my_task,
    interval=10,
    progress_callback=on_progress,
)

4) Listen for events (optional)

Event listeners let you react to task and job lifecycle events. Register a callback with add_listener():

from quiv import Event
from quiv.models import Task, Job

def on_job_completed(event: Event, task: Task, job: Job):
    print(f"Job {job.id} for '{task.task_name}' completed in {job.duration_seconds}s")

def on_job_failed(event: Event, task: Task, job: Job):
    print(f"Job {job.id} for '{task.task_name}' failed: {job.error_message}")

scheduler.add_listener(Event.JOB_COMPLETED, on_job_completed)
scheduler.add_listener(Event.JOB_FAILED, on_job_failed)

Typed callbacks

TASK_* listeners receive (event, task). JOB_* listeners receive (event, task, job). Both use typed model objects with full IDE autocomplete — no dict key lookups.

Listeners follow the same dispatch model as progress callbacks: async listeners run on the main loop, sync listeners run via call_soon_threadsafe (or directly on the calling thread when no loop is available). Exceptions in listeners are logged and swallowed. See Event Listeners for the full event list and dispatch details.

5) Start and stop

import asyncio

async def main() -> None:
    scheduler.startup()
    await asyncio.sleep(25)
    scheduler.shutdown()

asyncio.run(main())

Always call shutdown() (or stop()) when your app exits.

startup() / shutdown() is the recommended pair, but start() / stop() works identically — they are aliases.

6) Operate tasks at runtime

task_id = scheduler.add_task(
    task_name="demo-task",
    func=my_task,
    interval=10,
)

scheduler.run_task_immediately(task_id)
scheduler.pause_task(task_id)
scheduler.resume_task(task_id)

7) Cancel a running job

jobs = scheduler.get_all_jobs(status="running")
for job in jobs:
    scheduler.cancel_job(job.id)

Cancellation is cooperative: it sets the job's stop event. The handler must check _stop_event.is_set() to actually stop.

8) Inspect state

tasks = scheduler.get_all_tasks(include_run_once=True)
jobs = scheduler.get_all_jobs()
failed_jobs = scheduler.get_all_jobs(status="failed")

FastAPI integration example

quiv is intended for app-integrated task scheduling, especially in FastAPI. Use the lifespan context manager to tie scheduler lifecycle to the app:

from contextlib import asynccontextmanager

from fastapi import FastAPI

from quiv import Quiv

scheduler = Quiv(timezone="UTC")


def reindex_documents(_stop_event=None, _progress_hook=None) -> None:
    total = 100
    for step in range(1, total + 1):
        if _stop_event and _stop_event.is_set():
            return

        # Simulate blocking work
        import time
        time.sleep(0.05)

        if _progress_hook:
            _progress_hook(step=step, total=total, stage="reindex")


async def on_reindex_progress(**payload) -> None:
    # Replace with websocket broadcast, logging, metrics, etc.
    print("progress", payload)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    scheduler.add_task(
        task_name="reindex-docs",
        func=reindex_documents,
        interval=300,
        progress_callback=on_reindex_progress,
    )
    scheduler.start()
    yield
    # Shutdown
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)

Why this matters:

  • _stop_event makes long tasks cancel safely on shutdown.
  • _progress_hook sends task progress back into FastAPI's async context.
  • Scheduler lifecycle is tied cleanly to app lifecycle.

Logging

quiv uses Python's standard logging module. If you do not configure logging, no output is produced (Python's default NullHandler behavior).

To see scheduler logs, configure the "Quiv" logger:

import logging

logging.basicConfig(level=logging.INFO)

Or configure the "Quiv" logger directly for more control:

import logging

quiv_logger = logging.getLogger("Quiv")
quiv_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
quiv_logger.addHandler(handler)

You can also inject your own logger instance:

import logging

my_logger = logging.getLogger("myapp.scheduler")
scheduler = Quiv(logger=my_logger)

The library logs at these levels:

Level What is logged
DEBUG Database table creation, datetime normalization
INFO Task added, scheduler loop start, job start/completion, cleanup
WARNING Progress callback skipped (no event loop or main loop closed)
ERROR Job failures, scheduler loop errors, progress callback errors

A separate "quiv.models" logger emits DEBUG-level messages for datetime normalization. This logger is not configurable via the constructor and follows standard Python logging configuration.

Troubleshooting

  • ConfigurationError on startup: check pool_size > 0 and history_retention_seconds >= 0.
  • InvalidTimezoneError: use a valid IANA timezone name (for example UTC or America/New_York).
  • HandlerNotRegisteredError for immediate run: call add_task(...) first, and use the returned task_id.
  • TaskNotScheduledError: the task handler is registered, but the scheduled task row no longer exists in the database.
  • No log output: configure Python logging (see Logging above).
  • Args/kwargs errors: args and kwargs are pickle-serialized, so most Python objects are supported. If you encounter errors, ensure the objects are picklable (e.g. lambdas and inner functions are not).