Error Handlers API Reference¶
Error handlers allow you to customize behavior when tasks fail.
Type Definition¶
from typing import TypeAlias, Callable, Awaitable
TaskErrorHandler: TypeAlias = (
Callable[["Task[..., Any]", Exception], Any] |
Callable[["Task[..., Any]", Exception], Awaitable[Any]]
)
An error handler can be either:
- A synchronous function: (Task, Exception) -> Any
- An asynchronous function: (Task, Exception) -> Awaitable[Any]
Signature¶
Both sync and async error handlers have the same signature:
# Async error handler
async def my_error_handler(task: Task, error: Exception) -> None:
pass
# Sync error handler
def my_sync_error_handler(task: Task, error: Exception) -> None:
pass
Parameters¶
task: Task¶
The task object that failed. Provides access to:
task.config.name- Task name (if configured)task.func- The function that was executedtask.args- Positional argumentstask.kwargs- Keyword argumentstask.config.shielded- Whether task is shieldedtask.started- Event indicating if task started
error: Exception¶
The exception that was raised by the task.
Return Value¶
Return value is ignored. Error handlers are called for side effects (logging, alerting, retrying, etc.).
Usage¶
Basic Error Handler¶
from fastapi_tasks import Task
async def log_error(task: Task, error: Exception) -> None:
"""Simple error logging"""
print(f"Task {task.config.name} failed: {error}")
@app.post("/example")
async def example(tasks: Tasks) -> dict:
tasks.task(on_error=log_error).schedule(my_function)
return {"status": "ok"}
Accessing Task Details¶
async def detailed_error_handler(task: Task, error: Exception) -> None:
"""Access all task information"""
error_info = {
"task_name": task.config.name,
"function": task.func.__name__,
"args": task.args,
"kwargs": task.kwargs,
"error_type": type(error).__name__,
"error_message": str(error),
"is_shielded": task.config.shielded,
}
# Log structured error data
import json
print(json.dumps(error_info))
Sync Error Handler¶
def sync_error_handler(task: Task, error: Exception) -> None:
"""Synchronous error handler"""
# Sync operations only
with open("errors.log", "a") as f:
f.write(f"Error in {task.config.name}: {error}\n")
Common Patterns¶
Pattern 1: Send to Error Tracking¶
import sentry_sdk
async def send_to_sentry(task: Task, error: Exception) -> None:
"""Send error to Sentry"""
with sentry_sdk.push_scope() as scope:
scope.set_context("task", {
"name": task.config.name,
"function": task.func.__name__,
})
sentry_sdk.capture_exception(error)
Pattern 2: Retry with Backoff¶
import asyncio
async def retry_handler(task: Task, error: Exception) -> None:
"""Retry task with exponential backoff"""
max_retries = 3
for attempt in range(max_retries):
try:
await asyncio.sleep(2 ** attempt)
result = await task()
print(f"Task succeeded on retry {attempt + 1}")
return result
except Exception as e:
if attempt == max_retries - 1:
print(f"Task failed after {max_retries} retries")
raise
Pattern 3: Conditional Handling¶
async def smart_error_handler(task: Task, error: Exception) -> None:
"""Handle different errors differently"""
if isinstance(error, ConnectionError):
# Retry network errors
await retry_handler(task, error)
elif isinstance(error, ValueError):
# Log validation errors
await send_to_sentry(task, error)
else:
# Alert on unknown errors
await send_alert(f"Unknown error in {task.config.name}: {error}")
Pattern 4: Fallback Action¶
async def fallback_handler(task: Task, error: Exception) -> None:
"""Perform fallback action on error"""
# Try alternative approach
print(f"Primary task failed, trying fallback")
# Extract original arguments
if task.func.__name__ == "send_email_primary":
email = task.args[0] if task.args else task.kwargs.get("email")
await send_email_backup(email)
Error Handler Behavior¶
Execution¶
Error handlers are called when: 1. A task function raises an exception 2. The exception propagates to the task execution wrapper 3. The error is logged (always happens) 4. The error handler is invoked (if configured)
Exceptions in Error Handlers¶
If an error handler itself raises an exception:
async def bad_error_handler(task: Task, error: Exception) -> None:
raise RuntimeError("Handler failed!") # This will be logged
The exception is logged, but the application continues. Error handlers should be defensive:
async def safe_error_handler(task: Task, error: Exception) -> None:
"""Error handler that doesn't fail"""
try:
await send_to_sentry(task, error)
except Exception as handler_error:
# Fallback logging
import logging
logging.error(f"Error handler failed: {handler_error}")
Async vs Sync¶
Both async and sync error handlers work with both async and sync tasks:
# Async handler + async task ✓
tasks.task(on_error=async_handler).schedule(async_task)
# Async handler + sync task ✓
tasks.task(on_error=async_handler).schedule(sync_task)
# Sync handler + async task ✓
tasks.task(on_error=sync_handler).schedule(async_task)
# Sync handler + sync task ✓
tasks.task(on_error=sync_handler).schedule(sync_task)
Default Behavior (No Handler)¶
If no error handler is provided:
tasks.schedule(failing_task) # No on_error
The error is logged using Python's logging module:
logger.exception("Exception occurred in task %r", task)
The task stops executing, but other tasks continue normally.
Multiple Error Handlers¶
You cannot assign multiple error handlers to a single task. If you need multiple actions:
async def combined_error_handler(task: Task, error: Exception) -> None:
"""Combine multiple error handling actions"""
# Action 1: Log
await log_error(task, error)
# Action 2: Send to Sentry
await send_to_sentry(task, error)
# Action 3: Alert
await send_alert(task, error)
Testing Error Handlers¶
import pytest
async def test_error_handler():
"""Test that error handler is called"""
error_handled = False
async def test_handler(task: Task, error: Exception) -> None:
nonlocal error_handled
error_handled = True
assert isinstance(error, ValueError)
async def failing_task() -> None:
raise ValueError("Test error")
# Create task with error handler
# (in actual code, within an endpoint)
task = Task(
func=failing_task,
args=(),
kwargs={},
config=TaskConfig(on_error=test_handler)
)
# Execute task
await task()
# Verify handler was called
assert error_handled
Best Practices¶
- Always log errors - Even if you handle them, log for debugging
- Be defensive - Error handlers shouldn't raise exceptions
- Use structured logging - Makes analysis easier
- Don't swallow errors - At minimum, log them
- Consider retries carefully - Not all errors should be retried
- Alert on critical failures - Don't wait to discover problems
- Test error handlers - Ensure they work as expected
See Also¶
- Error Handling Tutorial - Complete error handling guide
- Tasks API - Tasks dependency
- TaskConfig - Task configuration