Timing Control¶
In Timing Modes, you learned about the three timing modes available in fastapi-tasks.
This tutorial focuses on the practical usage of after_route and after_response timing modes.
All Background Tasks are Fire-and-Forget
Critical concept: All background tasks in fastapi-tasks are fire-and-forget. They are scheduled at specific points in the request lifecycle, but they DO NOT block the response. The response is sent as soon as tasks are scheduled, not after they complete. Tasks run concurrently in the background.
Quick Recap¶
tasks.schedule()- Runs immediately (concurrent)tasks.after_route.schedule()- Scheduled after endpoint, before responsetasks.after_response.schedule()- Runs after response is sent
Using After-Route Tasks¶
Tasks scheduled with after_route are scheduled after your endpoint function completes but before the response is sent to the client. These tasks are fire-and-forget - they don't block the response, but are guaranteed to be scheduled before the client receives it.
Basic Example¶
from fastapi import FastAPI
from fastapi_tasks import Tasks, add_tasks
app = FastAPI()
add_tasks(app)
async def log_request(endpoint: str, user_id: int) -> None:
# Log to database
print(f"User {user_id} accessed {endpoint}")
@app.get("/data")
async def get_data(user_id: int, tasks: Tasks) -> dict:
data = {"result": [1, 2, 3]}
# This is SCHEDULED after this function returns,
# before the response is sent (but doesn't block the response)
tasks.after_route.schedule(log_request, "/data", user_id)
return data
When to Use After-Route¶
Use after_route when you need to:
- Schedule tasks before response is sent - Operations that must be scheduled before the client knows the request succeeded
- Audit logging - Recording that an action was taken before confirming it to the client
- Cleanup operations - Releasing resources or cleaning up state
- Pre-response validation - Last-chance checks that don't affect the response data
Real-World Example: Request Auditing¶
from datetime import datetime
async def audit_log_to_db(
user_id: int,
action: str,
resource_id: int,
timestamp: datetime
) -> None:
# Save to audit log database
print(f"AUDIT: User {user_id} performed {action} on {resource_id} at {timestamp}")
@app.delete("/posts/{post_id}")
async def delete_post(post_id: int, user_id: int, tasks: Tasks) -> dict:
# Delete the post
delete_post_from_db(post_id)
# Schedule audit log before response is sent
# This ensures the log is scheduled (fire-and-forget) before confirming to client
tasks.after_route.schedule(
audit_log_to_db,
user_id=user_id,
action="delete_post",
resource_id=post_id,
timestamp=datetime.utcnow()
)
return {"status": "deleted", "post_id": post_id}
Response Time Impact
After-route tasks have minimal scheduling overhead. They are fire-and-forget (don't block response) to maintain good performance.
Multiple After-Route Tasks¶
You can schedule multiple after-route tasks. They are scheduled in order:
async def update_cache(key: str) -> None:
print(f"Updating cache: {key}")
async def invalidate_related_cache(keys: list[str]) -> None:
print(f"Invalidating: {keys}")
async def log_cache_update(key: str) -> None:
print(f"Logged cache update: {key}")
@app.put("/items/{item_id}")
async def update_item(item_id: int, data: dict, tasks: Tasks) -> dict:
update_item_in_db(item_id, data)
# These are scheduled in order: update → invalidate → log
tasks.after_route.schedule(update_cache, f"item:{item_id}")
tasks.after_route.schedule(invalidate_related_cache, [f"items:list", f"items:count"])
tasks.after_route.schedule(log_cache_update, f"item:{item_id}")
return {"status": "updated"}
Using After-Response Tasks¶
Tasks scheduled with after_response run after the HTTP response has been sent to the client.
This is the most common timing mode because it provides the fastest response times.
Basic Example¶
async def send_email(to: str, subject: str, body: str) -> None:
# Simulate email sending
print(f"Email sent to {to}: {subject}")
@app.post("/signup")
async def signup(email: str, password: str, tasks: Tasks) -> dict:
user_id = create_user(email, password)
# Email is sent AFTER the client receives the response
tasks.after_response.schedule(
send_email,
to=email,
subject="Welcome!",
body="Thanks for signing up!"
)
return {"user_id": user_id}
When to Use After-Response¶
Use after_response for:
- Notifications - Emails, SMS, push notifications
- External API calls - Webhooks, third-party integrations
- Analytics - Event tracking, metrics collection
- Non-critical operations - Anything that doesn't affect the response
Real-World Example: Order Processing¶
async def send_order_confirmation(email: str, order_id: int) -> None:
print(f"Sending order confirmation to {email}")
async def notify_warehouse(order_id: int, items: list[dict]) -> None:
print(f"Notifying warehouse about order {order_id}")
async def track_analytics(event: str, order_id: int, amount: float) -> None:
print(f"Analytics: {event} - Order {order_id} - ${amount}")
@app.post("/orders")
async def create_order(
user_email: str,
items: list[dict],
tasks: Tasks
) -> dict:
# Create the order
order_id = save_order(user_email, items)
total_amount = calculate_total(items)
# All these happen AFTER the response is sent
tasks.after_response.schedule(
send_order_confirmation,
email=user_email,
order_id=order_id
)
tasks.after_response.schedule(
notify_warehouse,
order_id=order_id,
items=items
)
tasks.after_response.schedule(
track_analytics,
event="order_created",
order_id=order_id,
amount=total_amount
)
# Client gets this immediately
return {
"order_id": order_id,
"status": "confirmed",
"total": total_amount
}
Multiple After-Response Tasks¶
Like after-route tasks, multiple after-response tasks run in order:
@app.post("/publish")
async def publish_article(article_id: int, tasks: Tasks) -> dict:
publish_article_in_db(article_id)
# These are scheduled in order: social → email → analytics
tasks.after_response.schedule(post_to_social_media, article_id)
tasks.after_response.schedule(send_newsletter, article_id)
tasks.after_response.schedule(track_publication, article_id)
return {"status": "published"}
Combining All Three Timing Modes¶
You can use all three timing modes in a single endpoint for complex workflows:
async def validate_payment(payment_id: int) -> None:
print(f"Validating payment {payment_id}")
async def finalize_order(order_id: int) -> None:
print(f"Finalizing order {order_id}")
async def send_confirmation_email(email: str, order_id: int) -> None:
print(f"Sending confirmation to {email}")
async def notify_shipping(order_id: int) -> None:
print(f"Notifying shipping for order {order_id}")
@app.post("/checkout")
async def checkout(
cart_id: int,
email: str,
tasks: Tasks
) -> dict:
# Start payment validation immediately (concurrent)
payment_id = initiate_payment(cart_id)
tasks.schedule(validate_payment, payment_id)
# Create order
order_id = create_order_from_cart(cart_id)
# Schedule finalization before response is sent (fire-and-forget)
tasks.after_route.schedule(finalize_order, order_id)
# Send notifications after response (background)
tasks.after_response.schedule(send_confirmation_email, email, order_id)
tasks.after_response.schedule(notify_shipping, order_id)
return {
"order_id": order_id,
"payment_id": payment_id,
"status": "processing"
}
Execution timeline:
1. initiate_payment() runs (in endpoint)
2. validate_payment() starts immediately (may still be running)
3. create_order_from_cart() runs (in endpoint)
4. Endpoint function completes
5. finalize_order() is scheduled (after-route, fire-and-forget)
6. Response sent to client: {"order_id": ..., "payment_id": ..., "status": "processing"}
7. send_confirmation_email() runs (after-response)
8. notify_shipping() runs (after-response)
Accessing the Same Scheduler¶
The after_route and after_response properties return a scheduler object that has the same API as tasks:
# These are equivalent
tasks.after_route.schedule(my_task)
tasks.after_route.task(name="my_task").schedule(my_function)
# You can also schedule multiple tasks from the same scheduler
after_resp = tasks.after_response
after_resp.schedule(task_1)
after_resp.schedule(task_2)
after_resp.task(shield=True).schedule(task_3)
Choosing the Right Timing Mode¶
Here's a decision guide:
Does the operation affect the response data?
├─ YES → Run in endpoint function (not as background task)
└─ NO → Continue...
Must the operation be scheduled before client receives response?
├─ YES → Use tasks.after_route.schedule()
└─ NO → Continue...
Does the operation need to start as early as possible?
├─ YES → Use tasks.schedule() (immediate)
└─ NO → Use tasks.after_response.schedule()
Timing Mode Comparison Table¶
| Mode | When it starts | Response blocks? | Use for | Typical duration |
|---|---|---|---|---|
| Immediate | Right away | No | Concurrent processing | Any |
| After Route | After endpoint | No (fire-and-forget) | Pre-response scheduling | Any |
| After Response | After response | No | Notifications, analytics | Any |
Performance Tips¶
Optimize After-Route Tasks¶
Since After-route tasks have minimal scheduling overhead, keep them fast:
# Good: Quick database insert
tasks.after_route.schedule(log_to_db, event_data)
# Bad: Slow external API call
tasks.after_route.schedule(call_slow_api, data) # Use after_response instead!
Use Immediate Tasks for CPU-Intensive Work¶
If you have CPU-intensive work that doesn't need to complete before responding:
@app.post("/analyze")
async def analyze_data(data: dict, tasks: Tasks) -> dict:
# Start heavy computation immediately (concurrent)
tasks.schedule(run_ml_model, data)
return {"status": "analyzing"}
Batch After-Response Tasks¶
If you have many similar tasks, consider batching:
async def send_bulk_notifications(user_ids: list[int], message: str) -> None:
# Send to all users in one task
for user_id in user_ids:
send_notification(user_id, message)
@app.post("/broadcast")
async def broadcast_message(message: str, tasks: Tasks) -> dict:
user_ids = get_all_user_ids()
# One task instead of thousands
tasks.after_response.schedule(send_bulk_notifications, user_ids, message)
return {"recipients": len(user_ids)}
Next Steps¶
Now that you understand timing control, learn about:
- Task Configuration - Add names, error handlers, and shielding
- Sync and Async - Understand how sync and async tasks differ
- Error Handling - Handle task failures gracefully