Error Handling¶
Robust error handling is crucial for production applications. This tutorial covers advanced error handling patterns
for background tasks in fastapi-tasks.
Default Error Behavior¶
By default, when a task fails:
- The exception is logged
- The error handler (if provided) is called
- The task stops executing
- Other tasks continue running normally
from fastapi import FastAPI
from fastapi_tasks import Tasks, add_tasks
app = FastAPI()
add_tasks(app)
async def failing_task() -> None:
raise ValueError("Something went wrong!")
@app.post("/fail")
async def failing_endpoint(tasks: Tasks) -> dict:
# This task will fail, but the endpoint returns successfully
tasks.schedule(failing_task)
return {"status": "ok"}
The client receives {"status": "ok"} immediately, and the error is logged on the server.
Custom Error Handlers¶
Define custom error handlers to control failure behavior:
from fastapi_tasks import Task
async def my_error_handler(task: Task, error: Exception) -> None:
"""Called when a task fails"""
print(f"Task failed: {task.config.name}")
print(f"Error: {error}")
print(f"Error type: {type(error).__name__}")
async def risky_task() -> None:
raise ValueError("Oops!")
@app.post("/custom-error")
async def custom_error_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="risky_operation",
on_error=my_error_handler
).schedule(risky_task)
return {"status": "ok"}
Error Handler Patterns¶
Pattern 1: Logging to External Service¶
Send errors to a monitoring service like Sentry:
import sentry_sdk
async def send_to_sentry(task: Task, error: Exception) -> None:
"""Send task errors to Sentry"""
with sentry_sdk.push_scope() as scope:
# Add task context
scope.set_context("task", {
"name": task.config.name,
"function": task.func.__name__,
"args": str(task.args),
"kwargs": str(task.kwargs),
})
# Capture the exception
sentry_sdk.capture_exception(error)
@app.post("/monitored")
async def monitored_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="monitored_task",
on_error=send_to_sentry
).schedule(potentially_failing_task)
return {"status": "ok"}
Pattern 2: Retry Logic¶
Implement automatic retries with exponential backoff:
import asyncio
from typing import Any
async def retry_handler(
task: Task,
error: Exception,
max_retries: int = 3,
base_delay: float = 1.0
) -> None:
"""Retry the task with exponential backoff"""
for attempt in range(max_retries):
try:
# Exponential backoff: 1s, 2s, 4s, ...
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
print(f"Retrying {task.config.name} (attempt {attempt + 1}/{max_retries})")
# Retry the task
result = await task()
print(f"Task {task.config.name} succeeded on retry {attempt + 1}")
return result
except Exception as e:
if attempt == max_retries - 1:
# Final attempt failed
print(f"Task {task.config.name} failed after {max_retries} retries")
await send_to_sentry(task, e)
raise
print(f"Retry {attempt + 1} failed: {e}")
async def flaky_task(success_rate: float = 0.3) -> None:
"""Simulates a task that fails randomly"""
import random
if random.random() > success_rate:
raise ValueError("Random failure")
print("Task succeeded!")
@app.post("/retry")
async def retry_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="flaky_task",
on_error=retry_handler
).schedule(flaky_task)
return {"status": "ok"}
Pattern 3: Fallback Actions¶
Perform alternative actions when a task fails:
async def send_email_primary(to: str, subject: str, body: str) -> None:
# Primary email service
raise ConnectionError("Primary service unavailable")
async def send_email_fallback(to: str, subject: str, body: str) -> None:
# Backup email service
print(f"Sending via fallback service to {to}")
async def email_error_handler(task: Task, error: Exception) -> None:
"""Use fallback email service if primary fails"""
if isinstance(error, ConnectionError):
print("Primary email service failed, using fallback")
# Extract original arguments
to = task.kwargs.get("to") or task.args[0]
subject = task.kwargs.get("subject") or task.args[1]
body = task.kwargs.get("body") or task.args[2]
# Try fallback service
await send_email_fallback(to, subject, body)
else:
# For other errors, just log
print(f"Email task failed: {error}")
@app.post("/email-fallback")
async def email_endpoint(email: str, tasks: Tasks) -> dict:
tasks.task(
name="send_email",
on_error=email_error_handler
).schedule(
send_email_primary,
to=email,
subject="Welcome",
body="Thanks for signing up!"
)
return {"status": "ok"}
Pattern 4: Error Notification¶
Alert administrators when critical tasks fail:
import httpx
async def send_slack_alert(message: str) -> None:
"""Send alert to Slack"""
async with httpx.AsyncClient() as client:
await client.post(
"https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
json={"text": message}
)
async def critical_error_handler(task: Task, error: Exception) -> None:
"""Alert on-call engineer when critical task fails"""
alert_message = (
f"🚨 Critical Task Failed!\n"
f"Task: {task.config.name}\n"
f"Function: {task.func.__name__}\n"
f"Error: {type(error).__name__}: {error}\n"
f"Arguments: {task.args}\n"
f"Keyword Arguments: {task.kwargs}"
)
# Send to Slack
await send_slack_alert(alert_message)
# Also send to error tracking
await send_to_sentry(task, error)
@app.post("/critical")
async def critical_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="critical_payment_processing",
shield=True,
on_error=critical_error_handler
).schedule(process_payment, payment_id=12345)
return {"status": "processing"}
Handling Specific Error Types¶
Handle different errors differently:
async def smart_error_handler(task: Task, error: Exception) -> None:
"""Handle different error types appropriately"""
if isinstance(error, ConnectionError):
# Network errors: retry
print("Network error, will retry...")
await retry_handler(task, error, max_retries=5)
elif isinstance(error, ValueError):
# Validation errors: log and alert
print(f"Validation error in task {task.config.name}: {error}")
await send_slack_alert(f"Validation error: {error}")
elif isinstance(error, PermissionError):
# Permission errors: critical alert
await send_slack_alert(f"🚨 Permission error in {task.config.name}!")
await send_to_sentry(task, error)
else:
# Unknown errors: log to Sentry
print(f"Unknown error: {error}")
await send_to_sentry(task, error)
@app.post("/smart-handling")
async def smart_handling_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="smart_task",
on_error=smart_error_handler
).schedule(complex_operation)
return {"status": "ok"}
Global Error Handler¶
Create a reusable error handler for all tasks:
import logging
from typing import Dict, Type, Callable, Awaitable
logger = logging.getLogger(__name__)
class ErrorHandlerRegistry:
"""Registry for different error handlers based on error type"""
def __init__(self):
self.handlers: Dict[Type[Exception], Callable] = {}
self.default_handler: Callable | None = None
def register(self, error_type: Type[Exception], handler: Callable) -> None:
"""Register a handler for a specific error type"""
self.handlers[error_type] = handler
def set_default(self, handler: Callable) -> None:
"""Set the default handler for unregistered error types"""
self.default_handler = handler
async def handle(self, task: Task, error: Exception) -> None:
"""Route error to appropriate handler"""
error_type = type(error)
# Find handler for this error type
handler = self.handlers.get(error_type, self.default_handler)
if handler:
await handler(task, error)
else:
logger.error(f"Unhandled error in task {task.config.name}: {error}")
# Create global registry
error_registry = ErrorHandlerRegistry()
# Register handlers for specific error types
async def handle_connection_error(task: Task, error: ConnectionError) -> None:
logger.warning(f"Connection error in {task.config.name}, will retry")
await retry_handler(task, error)
async def handle_validation_error(task: Task, error: ValueError) -> None:
logger.error(f"Validation error in {task.config.name}: {error}")
await send_to_sentry(task, error)
async def handle_default(task: Task, error: Exception) -> None:
logger.exception(f"Error in task {task.config.name}")
await send_to_sentry(task, error)
error_registry.register(ConnectionError, handle_connection_error)
error_registry.register(ValueError, handle_validation_error)
error_registry.set_default(handle_default)
@app.post("/global-handler")
async def global_handler_endpoint(tasks: Tasks) -> dict:
# All tasks use the global error handler
tasks.task(
name="task1",
on_error=error_registry.handle
).schedule(operation1)
tasks.task(
name="task2",
on_error=error_registry.handle
).schedule(operation2)
return {"status": "ok"}
Error Handling with Different Timing Modes¶
Error handlers work with all timing modes:
async def immediate_error_handler(task: Task, error: Exception) -> None:
print(f"Immediate task {task.config.name} failed: {error}")
async def after_route_error_handler(task: Task, error: Exception) -> None:
print(f"After-route task {task.config.name} failed: {error}")
# This is serious - task failed before response was sent
await send_slack_alert(f"After-route task failed: {error}")
async def after_response_error_handler(task: Task, error: Exception) -> None:
print(f"After-response task {task.config.name} failed: {error}")
@app.post("/timing-errors")
async def timing_errors_endpoint(tasks: Tasks) -> dict:
# Immediate task with error handler
tasks.task(
name="immediate",
on_error=immediate_error_handler
).schedule(task1)
# After-route task with error handler
tasks.after_route.task(
name="after_route",
on_error=after_route_error_handler
).schedule(task2)
# After-response task with error handler
tasks.after_response.task(
name="after_response",
on_error=after_response_error_handler
).schedule(task3)
return {"status": "ok"}
Structured Error Logging¶
Create detailed, structured error logs:
import json
from datetime import datetime
async def structured_error_handler(task: Task, error: Exception) -> None:
"""Create structured error logs for analysis"""
error_data = {
"timestamp": datetime.utcnow().isoformat(),
"task": {
"name": task.config.name,
"function": task.func.__name__,
"args": str(task.args),
"kwargs": str(task.kwargs),
"shielded": task.config.shielded,
},
"error": {
"type": type(error).__name__,
"message": str(error),
"traceback": traceback.format_exc(),
},
}
# Log as JSON for easy parsing
logger.error(json.dumps(error_data))
# Also send to error tracking
await send_to_sentry(task, error)
@app.post("/structured-logging")
async def structured_logging_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="important_operation",
on_error=structured_error_handler
).schedule(important_task)
return {"status": "ok"}
Testing Error Handlers¶
How to test your error handlers:
import pytest
from fastapi.testclient import TestClient
async def test_error_handler():
"""Test that error handler is called on failure"""
error_was_handled = False
async def test_error_handler(task: Task, error: Exception) -> None:
nonlocal error_was_handled
error_was_handled = True
assert isinstance(error, ValueError)
assert task.config.name == "test_task"
async def failing_task() -> None:
raise ValueError("Test error")
# In your test endpoint
@app.post("/test")
async def test_endpoint(tasks: Tasks) -> dict:
tasks.task(
name="test_task",
on_error=test_error_handler
).schedule(failing_task)
return {"status": "ok"}
# Make request
client = TestClient(app)
response = client.post("/test")
# Wait for task to complete
await asyncio.sleep(0.5)
assert response.status_code == 200
assert error_was_handled
Best Practices¶
- Always handle critical task errors - Don't let payment or data corruption errors silently fail
- Log all errors - Even if you handle them, log for debugging
- Use structured logging - Makes analysis easier
- Alert on critical failures - Don't wait to discover problems
- Implement retries for transient failures - Network errors often resolve themselves
- Don't swallow errors silently - Always log or alert
- Test error handlers - Ensure they work as expected
Common Pitfalls¶
Pitfall 1: Error Handler That Raises¶
# BAD: Error handler that raises
async def bad_error_handler(task: Task, error: Exception) -> None:
raise RuntimeError("Error handler failed!") # Don't do this!
# GOOD: Error handler that catches its own errors
async def good_error_handler(task: Task, error: Exception) -> None:
try:
await send_to_sentry(task, error)
except Exception as e:
# Fallback: at least log it
logger.exception(f"Error handler failed: {e}")
Pitfall 2: Forgetting to Await Async Calls¶
# BAD: Not awaiting async function
async def bad_handler(task: Task, error: Exception) -> None:
send_to_sentry(task, error) # Missing await!
# GOOD: Properly awaiting
async def good_handler(task: Task, error: Exception) -> None:
await send_to_sentry(task, error)
Pitfall 3: Ignoring Error Types¶
# BAD: Treating all errors the same
async def generic_handler(task: Task, error: Exception) -> None:
await retry_handler(task, error) # Don't retry validation errors!
# GOOD: Handle different errors appropriately
async def specific_handler(task: Task, error: Exception) -> None:
if isinstance(error, ConnectionError):
await retry_handler(task, error)
else:
await log_error(task, error)
Next Steps¶
- Task Shielding - Protect critical tasks from cancellation
- Real World Examples - See complete error handling in production scenarios
- API Reference: Error Handlers - Complete error handler API documentation