Cancellation
quiv uses cooperative cancellation via threading.Event. This gives handlers
full control over how and when they stop, enabling clean resource cleanup and
graceful shutdown.
How it works
Each job gets its own threading.Event stop signal. When cancellation is
requested, the event is set. The handler checks the event and decides how to
respond.
sequenceDiagram
participant App as Application
participant S as Scheduler
participant H as Handler Thread
S->>H: Submit job (with stop_event)
H->>H: Working...
App->>S: cancel_job(job_id)
S->>S: stop_event.set()
H->>H: Checks _stop_event.is_set()
H->>H: Cleans up and returns
S->>S: Detects stop_event was set
S->>S: Mark job as "cancelled"
Key points
- Cancellation is cooperative — the handler must check
_stop_eventto actually stop. quiv cannot force-kill a running thread. - The handler decides when to check and how to clean up.
- Job status is automatically set to
cancelledif the stop event was set when the handler returns, regardless of whether the handler accepted_stop_eventin its signature.
Writing a cancellable handler
Add _stop_event to your handler's signature. quiv inspects the signature
and only injects it if the parameter is present.
import threading
import time
def long_running_task(
_stop_event: threading.Event | None = None,
):
"""A task that processes items and can be cancelled between steps."""
items = fetch_items()
for item in items:
if _stop_event and _stop_event.is_set():
# Clean up and exit gracefully
return
process(item)
time.sleep(0.1)
Check frequency
Check _stop_event at natural breakpoints in your handler:
- Between iterations of a loop
- Before starting an expensive operation
- After completing a unit of work
def batch_processor(
_stop_event: threading.Event | None = None,
_progress_hook: Callable | None = None,
):
batches = get_batches()
for i, batch in enumerate(batches):
# Check before each batch
if _stop_event and _stop_event.is_set():
return
result = process_batch(batch) # might take a while
save_result(result)
if _progress_hook:
_progress_hook(step=i + 1, total=len(batches))
Using _stop_event.wait() instead of time.sleep()
If your handler has a sleep/wait period, use _stop_event.wait() instead of
time.sleep(). This makes cancellation responsive even during wait periods:
def polling_task(
_stop_event: threading.Event | None = None,
):
"""Poll an API every 5 seconds, but respond to cancellation immediately."""
while not (_stop_event and _stop_event.is_set()):
result = check_api()
if result.ready:
handle_result(result)
return
# Wait 5 seconds OR until cancelled — whichever comes first
if _stop_event:
_stop_event.wait(timeout=5)
else:
time.sleep(5)
This pattern is especially useful for tasks that poll external services.
Cancelling a job
Use cancel_job(job_id) to signal cancellation:
# Find running jobs
jobs = scheduler.get_all_jobs(status="running")
# Cancel a specific job
for job in jobs:
scheduler.cancel_job(job.id)
cancel_job returns True if the stop event was found and set, False if
the job was not found (already finished or invalid ID).
Cancellation during shutdown
When shutdown() is called, quiv automatically cancels all tracked running
jobs by setting their stop events. Handlers that check _stop_event will
exit gracefully; handlers that don't will run to completion before the
process exits.
flowchart TD
A["shutdown() called"] --> B[Set _shutdown flag]
B --> C[Cancel all tracked jobs]
C --> D[Join scheduler thread]
D --> E[Shutdown thread pool]
E --> F[Dispose DB engine]
F --> G["Delete DB files (.db, -wal, -shm)"]
How status is determined
When a job finishes, quiv checks the stop event to determine the final status.
This happens in the finally block of _run_job, so it works regardless of
how the handler exited:
flowchart TD
A[Handler returns] --> B{Exception raised?}
B -- Yes --> C["status = failed"]
B -- No --> D["status = completed"]
C --> E{"stop_event.is_set()?"}
D --> E
E -- Yes --> F["status = cancelled (overrides failed/completed)"]
E -- No --> G[Keep current status]
F --> H[Finalize job in DB]
G --> H
Note that cancelled takes priority over both completed and failed. If
a handler raises an exception and the stop event is set, the job is marked
as cancelled — the assumption is that the cancellation caused the error.
Handlers without _stop_event
If your handler's signature does not include _stop_event (and does not use
**kwargs), the event is not injected — but it is still tracked
internally. This means:
cancel_job()still sets the event- The job status is still set to
cancelledif the event was set when the handler returns - The handler itself just can't respond to cancellation early
This is useful for short-lived tasks where you don't need mid-execution cancellation but still want correct status tracking on shutdown.
Combining with progress callbacks
A common pattern is to check _stop_event and report progress in the same
loop:
def export_data(
format: str,
_stop_event: threading.Event | None = None,
_progress_hook: Callable | None = None,
):
records = query_records()
total = len(records)
for i, record in enumerate(records, 1):
if _stop_event and _stop_event.is_set():
return
write_record(record, format)
if _progress_hook and i % 100 == 0:
_progress_hook(
step=i,
total=total,
pct=round(i / total * 100),
)
The progress callback and stop event are independent — you can use either or both. quiv injects each one only if the handler's signature accepts it.