Event Listeners
Event listeners let you react to scheduler lifecycle events — tasks being added, removed, paused, or resumed, and jobs starting, completing, failing, or being cancelled. This is useful for logging, metrics, alerting, or updating UI state.
How it works
Register a callback for one or more Event types via add_listener().
When the event fires, quiv dispatches your callback on the main event loop
(same dispatch model as progress callbacks).
flowchart TD
A["Scheduler emits event"] --> B{Listeners registered?}
B -- No --> C[Return silently]
B -- Yes --> D{Event loop available?}
D -- Yes --> E{Async listener?}
D -- No --> F{Async listener?}
E -- Yes --> G["run_coroutine_threadsafe()
on main loop"]
E -- No --> H["call_soon_threadsafe()
on main loop"]
F -- Yes --> I["Run in temporary event loop"]
F -- No --> J["Call directly on
calling thread"]
Dispatch paths
| Event loop | Listener type | What happens |
|---|---|---|
| Available | Async | Dispatched via run_coroutine_threadsafe on the main loop |
| Available | Sync | Dispatched via call_soon_threadsafe on the main loop |
| Unavailable | Sync | Called directly on the calling thread |
| Unavailable | Async | Run in a temporary event loop on the calling thread |
Events
All events are defined in the Event enum:
| Event | When it fires | Callback receives |
|---|---|---|
TASK_ADDED |
After add_task() completes |
event, task |
TASK_REMOVED |
After remove_task() completes |
event, task1 |
TASK_PAUSED |
After pause_task() completes |
event, task |
TASK_RESUMED |
After resume_task() completes |
event, task |
JOB_STARTED |
When a job begins execution | event, task, job |
JOB_COMPLETED |
When a job finishes successfully | event, task, job |
JOB_FAILED |
When a job ends with an exception | event, task, job |
JOB_CANCELLED |
When a job is cancelled via stop event | event, task, job |
Callback signatures
Listeners receive typed model objects — better callbacks with predictable inputs. The signature depends on the event group:
Task events (TASK_*)
from quiv import Event
from quiv.models import Task
def on_task_event(event: Event, task: Task) -> None:
print(f"[{event.value}] Task '{task.task_name}' (id={task.id})")
Job events (JOB_*)
from quiv import Event
from quiv.models import Task, Job
def on_job_event(event: Event, task: Task, job: Job) -> None:
print(f"[{event.value}] Job {job.id} for '{task.task_name}'")
if job.duration_seconds is not None:
print(f" Duration: {job.duration_seconds:.2f}s")
if job.error_message is not None:
print(f" Error: {job.error_message}")
Async callbacks use the same signatures:
async def on_task_event(event: Event, task: Task) -> None:
...
async def on_job_event(event: Event, task: Task, job: Job) -> None:
...
Full type safety
Since listeners receive Task and Job model objects, you get IDE
autocomplete and type checking on every field — no more guessing dict
keys at runtime.
Registering listeners
Use add_listener() to register a callback for a specific event:
from quiv import Quiv, Event
from quiv.models import Task
scheduler = Quiv()
def on_task_added(event: Event, task: Task) -> None:
print(f"Task '{task.task_name}' added with ID {task.id}")
scheduler.add_listener(Event.TASK_ADDED, on_task_added)
Multiple listeners
You can register multiple listeners for the same event. They are called in registration order:
scheduler.add_listener(Event.JOB_FAILED, log_failure)
scheduler.add_listener(Event.JOB_FAILED, send_alert)
Multiple events
Register the same callback for different events within the same event group:
from quiv.models import Task, Job
def job_audit_log(event: Event, task: Task, job: Job) -> None:
print(f"[{event.value}] task={task.task_name} job={job.id}")
scheduler.add_listener(Event.JOB_COMPLETED, job_audit_log)
scheduler.add_listener(Event.JOB_FAILED, job_audit_log)
Removing listeners
Use remove_listener() to unregister a previously added callback:
scheduler.remove_listener(Event.TASK_ADDED, on_task_added)
If the callback is not found, the call is silently ignored.
Async listeners
Async listeners run on the main event loop via run_coroutine_threadsafe,
just like async progress callbacks. This makes them ideal for FastAPI apps
where you want to broadcast events to WebSocket clients:
from quiv.models import Task, Job
async def on_job_completed(event: Event, task: Task, job: Job) -> None:
await ws_manager.broadcast({
"type": "job_completed",
"task": task.task_name,
"duration_seconds": job.duration_seconds,
})
scheduler.add_listener(Event.JOB_COMPLETED, on_job_completed)
Error handling
If a listener raises an exception, quiv logs the error but does not fail the scheduler or the job. Other listeners for the same event still run. This prevents a broken listener from disrupting task execution.
flowchart TD
A["Event emitted"] --> B["Dispatch listener 1"]
B --> C{Raises?}
C -- No --> D["Dispatch listener 2"]
C -- Yes --> E["Log error"]
E --> D
D --> F["Continue"]
Without an event loop
In scripts without asyncio, sync event listeners work normally — they run directly on the calling thread:
from quiv import Quiv, Event
from quiv.models import Task
scheduler = Quiv()
def on_added(event: Event, task: Task) -> None:
print(f"Added: {task.task_name}")
scheduler.add_listener(Event.TASK_ADDED, on_added)
scheduler.add_task("my-task", lambda: None, interval=10)
# Prints: Added: my-task
Async listeners also work in this scenario — they run in a temporary event
loop on the calling thread, so await calls inside the listener execute
correctly.
FastAPI example
A complete example wiring event listeners into a FastAPI app with WebSocket notifications:
import logging
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from quiv import Event, Quiv
from quiv.models import Task, Job
scheduler = Quiv(timezone="UTC")
logger = logging.getLogger(__name__)
connected_clients: list[WebSocket] = []
async def broadcast(message: dict) -> None:
for ws in connected_clients:
try:
await ws.send_json(message)
except Exception:
pass
async def on_job_event(event: Event, task: Task, job: Job) -> None:
"""Broadcast job lifecycle events to WebSocket clients."""
payload: dict[str, Any] = {
"event": event.value,
"task_name": task.task_name,
"job_id": job.id,
}
if job.duration_seconds is not None:
payload["duration_seconds"] = job.duration_seconds
if job.error_message is not None:
payload["error"] = job.error_message
await broadcast(payload)
def sync_task() -> None:
pass # your task logic
@asynccontextmanager
async def lifespan(app: FastAPI):
# Register event listeners
scheduler.add_listener(Event.JOB_STARTED, on_job_event)
scheduler.add_listener(Event.JOB_COMPLETED, on_job_event)
scheduler.add_listener(Event.JOB_FAILED, on_job_event)
scheduler.add_listener(Event.JOB_CANCELLED, on_job_event)
scheduler.add_task("my-task", sync_task, interval=60)
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws/events")
async def events_websocket(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
connected_clients.remove(websocket)
-
For
TASK_REMOVED, thetaskobject is a snapshot taken before deletion. ↩