Skip to content

Real World Examples

This tutorial provides complete, production-ready examples of using fastapi-tasks in real-world scenarios.

Example 1: E-commerce Order Processing

Complete order processing with payment, inventory, and notifications:

from fastapi import FastAPI, HTTPException
from fastapi_tasks import Tasks, add_tasks, Task
from pydantic import BaseModel
import httpx
import asyncio
from typing import List

app = FastAPI()
add_tasks(app)


# Models
class OrderItem(BaseModel):
    product_id: int
    quantity: int
    price: float


class OrderCreate(BaseModel):
    user_id: int
    email: str
    items: List[OrderItem]
    payment_token: str


# Database operations (simplified)
async def create_order_record(user_id: int, items: List[OrderItem]) -> int:
    # Create order in database
    order_id = 12345  # Generated ID
    return order_id


async def update_order_status(order_id: int, status: str) -> None:
    # Update order status
    pass


# Payment processing
async def process_payment(
    order_id: int,
    amount: float,
    payment_token: str
) -> dict:
    """Process payment through payment gateway"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.payment-gateway.com/charge",
            json={
                "amount": amount,
                "currency": "USD",
                "token": payment_token,
                "metadata": {"order_id": order_id}
            },
            headers={"Authorization": "Bearer YOUR_API_KEY"}
        )
        response.raise_for_status()
        return response.json()


async def payment_error_handler(task: Task, error: Exception) -> None:
    """Handle payment failures"""
    order_id = task.kwargs.get("order_id")

    # Update order status
    await update_order_status(order_id, "payment_failed")

    # Alert admin
    await send_slack_alert(f"Payment failed for order {order_id}: {error}")

    # Log to monitoring
    import sentry_sdk
    sentry_sdk.capture_exception(error)


# Inventory management
async def reserve_inventory(order_id: int, items: List[OrderItem]) -> None:
    """Reserve inventory for order"""
    async with db.transaction():
        for item in items:
            result = await db.execute(
                "UPDATE inventory SET reserved = reserved + ? "
                "WHERE product_id = ? AND available >= ?",
                item.quantity, item.product_id, item.quantity
            )

            if result.rowcount == 0:
                raise ValueError(f"Insufficient inventory for product {item.product_id}")

        # Record reservation
        await db.execute(
            "INSERT INTO reservations (order_id, items) VALUES (?, ?)",
            order_id, json.dumps([item.dict() for item in items])
        )


# Notifications
async def send_order_confirmation(email: str, order_id: int, items: List[OrderItem]) -> None:
    """Send order confirmation email"""
    total = sum(item.price * item.quantity for item in items)

    async with httpx.AsyncClient() as client:
        await client.post(
            "https://api.email-service.com/send",
            json={
                "to": email,
                "subject": f"Order #{order_id} Confirmed",
                "template": "order_confirmation",
                "data": {
                    "order_id": order_id,
                    "items": [item.dict() for item in items],
                    "total": total
                }
            }
        )


async def notify_warehouse(order_id: int, items: List[OrderItem]) -> None:
    """Notify warehouse system to prepare shipment"""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://warehouse-api.internal/prepare-shipment",
            json={
                "order_id": order_id,
                "items": [{"sku": item.product_id, "qty": item.quantity} for item in items]
            }
        )


# Analytics
async def track_order_event(event: str, order_id: int, metadata: dict) -> None:
    """Send analytics event"""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://analytics.internal/track",
            json={
                "event": event,
                "properties": {
                    "order_id": order_id,
                    **metadata
                }
            }
        )


# Main endpoint
@app.post("/orders")
async def create_order(order_data: OrderCreate, tasks: Tasks) -> dict:
    """
    Create a new order with complete processing pipeline:
    1. Create order record
    2. Process payment (shielded, critical)
    3. Reserve inventory (before response)
    4. Send confirmation email (after response)
    5. Notify warehouse (after response)
    6. Track analytics (after response)
    """

    # Calculate total
    total_amount = sum(item.price * item.quantity for item in order_data.items)

    # Create order in database
    order_id = await create_order_record(order_data.user_id, order_data.items)

    # Process payment immediately (critical, shielded)
    tasks.task(
        name=f"payment_{order_id}",
        shield=True,
        on_error=payment_error_handler
    ).schedule(
        process_payment,
        order_id=order_id,
        amount=total_amount,
        payment_token=order_data.payment_token
    )

    # Reserve inventory before response (must complete)
    tasks.after_route.task(
        name=f"inventory_{order_id}",
        shield=True
    ).schedule(reserve_inventory, order_id, order_data.items)

    # Send confirmation email after response
    tasks.after_response.schedule(
        send_order_confirmation,
        order_data.email,
        order_id,
        order_data.items
    )

    # Notify warehouse after response
    tasks.after_response.schedule(
        notify_warehouse,
        order_id,
        order_data.items
    )

    # Track analytics after response
    tasks.after_response.schedule(
        track_order_event,
        "order_created",
        order_id,
        {
            "user_id": order_data.user_id,
            "total": total_amount,
            "items_count": len(order_data.items)
        }
    )

    return {
        "order_id": order_id,
        "status": "processing",
        "total": total_amount
    }

Example 2: User Registration with Email Verification

Complete user registration flow with email verification:

import secrets
from datetime import datetime, timedelta


# Models
class UserRegister(BaseModel):
    email: str
    password: str
    name: str


# Database operations
async def create_user_record(email: str, password_hash: str, name: str) -> int:
    user_id = await db.execute(
        "INSERT INTO users (email, password_hash, name, verified) VALUES (?, ?, ?, ?)",
        email, password_hash, name, False
    )
    return user_id


async def create_verification_token(user_id: int) -> str:
    token = secrets.token_urlsafe(32)
    expires_at = datetime.utcnow() + timedelta(hours=24)

    await db.execute(
        "INSERT INTO verification_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
        user_id, token, expires_at
    )

    return token


# Email operations
async def send_verification_email(email: str, token: str, name: str) -> None:
    """Send email verification link"""
    verification_url = f"https://yourapp.com/verify?token={token}"

    async with httpx.AsyncClient() as client:
        await client.post(
            "https://api.email-service.com/send",
            json={
                "to": email,
                "subject": "Verify your email",
                "template": "email_verification",
                "data": {
                    "name": name,
                    "verification_url": verification_url
                }
            }
        )


async def send_welcome_email(email: str, name: str) -> None:
    """Send welcome email after verification"""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://api.email-service.com/send",
            json={
                "to": email,
                "subject": "Welcome to Our App!",
                "template": "welcome",
                "data": {"name": name}
            }
        )


# Analytics
async def track_user_event(event: str, user_id: int, metadata: dict = None) -> None:
    """Track user events"""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://analytics.internal/track",
            json={
                "user_id": user_id,
                "event": event,
                "timestamp": datetime.utcnow().isoformat(),
                "properties": metadata or {}
            }
        )


# Error handlers
async def email_error_handler(task: Task, error: Exception) -> None:
    """Handle email sending failures with retry"""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
            await task()  # Retry
            return
        except Exception as e:
            if attempt == max_retries - 1:
                # Final failure - log and alert
                import logging
                logging.error(f"Email task failed after {max_retries} retries: {e}")
                await send_slack_alert(f"Email delivery failed: {e}")


# Endpoints
@app.post("/register")
async def register_user(user_data: UserRegister, tasks: Tasks) -> dict:
    """Register a new user with email verification"""

    # Hash password (sync operation)
    import bcrypt
    password_hash = bcrypt.hashpw(
        user_data.password.encode(),
        bcrypt.gensalt()
    ).decode()

    # Create user record
    user_id = await create_user_record(
        user_data.email,
        password_hash,
        user_data.name
    )

    # Create verification token
    verification_token = await create_verification_token(user_id)

    # Send verification email (with retry on failure)
    tasks.after_response.task(
        name=f"verification_email_{user_id}",
        on_error=email_error_handler
    ).schedule(
        send_verification_email,
        user_data.email,
        verification_token,
        user_data.name
    )

    # Track registration event
    tasks.after_response.schedule(
        track_user_event,
        "user_registered",
        user_id,
        {"email": user_data.email}
    )

    return {
        "user_id": user_id,
        "message": "Registration successful. Please check your email to verify your account."
    }


@app.post("/verify")
async def verify_email(token: str, tasks: Tasks) -> dict:
    """Verify user email with token"""

    # Verify token
    result = await db.fetchone(
        "SELECT user_id FROM verification_tokens "
        "WHERE token = ? AND expires_at > ? AND used = FALSE",
        token, datetime.utcnow()
    )

    if not result:
        raise HTTPException(status_code=400, detail="Invalid or expired token")

    user_id = result["user_id"]

    # Mark user as verified
    await db.execute(
        "UPDATE users SET verified = TRUE WHERE id = ?",
        user_id
    )

    # Mark token as used
    await db.execute(
        "UPDATE verification_tokens SET used = TRUE WHERE token = ?",
        token
    )

    # Get user details
    user = await db.fetchone("SELECT email, name FROM users WHERE id = ?", user_id)

    # Send welcome email
    tasks.after_response.schedule(
        send_welcome_email,
        user["email"],
        user["name"]
    )

    # Track verification
    tasks.after_response.schedule(
        track_user_event,
        "email_verified",
        user_id
    )

    return {"message": "Email verified successfully"}

Example 3: Image Processing Pipeline

Complete image upload and processing with thumbnails and CDN upload:

from PIL import Image
import aiofiles
import os
from pathlib import Path


# Models
class ImageUpload(BaseModel):
    user_id: int


# Image processing (sync operations with PIL)
def create_thumbnail(source_path: str, thumb_path: str, size: tuple) -> None:
    """Create image thumbnail"""
    img = Image.open(source_path)
    img.thumbnail(size)
    img.save(thumb_path, optimize=True, quality=85)


def optimize_image(source_path: str, output_path: str) -> None:
    """Optimize image for web"""
    img = Image.open(source_path)
    img.save(output_path, optimize=True, quality=90)


# CDN upload
async def upload_to_cdn(local_path: str, cdn_path: str) -> str:
    """Upload file to CDN"""
    async with aiofiles.open(local_path, 'rb') as f:
        file_data = await f.read()

    async with httpx.AsyncClient() as client:
        response = await client.put(
            f"https://cdn-api.example.com/upload/{cdn_path}",
            content=file_data,
            headers={"Authorization": "Bearer YOUR_CDN_KEY"}
        )
        response.raise_for_status()
        return response.json()["url"]


# Database
async def save_image_record(
    user_id: int,
    original_url: str,
    optimized_url: str,
    thumbnail_url: str,
    metadata: dict
) -> int:
    """Save image record to database"""
    image_id = await db.execute(
        "INSERT INTO images (user_id, original_url, optimized_url, thumbnail_url, metadata) "
        "VALUES (?, ?, ?, ?, ?)",
        user_id, original_url, optimized_url, thumbnail_url, json.dumps(metadata)
    )
    return image_id


# Complete processing pipeline
async def process_uploaded_image(
    user_id: int,
    upload_path: str,
    filename: str
) -> None:
    """Complete image processing pipeline"""

    # Generate paths
    base_name = Path(filename).stem
    ext = Path(filename).suffix

    optimized_path = f"/tmp/{base_name}_optimized{ext}"
    thumb_path = f"/tmp/{base_name}_thumb{ext}"

    # Step 1: Create optimized version (sync)
    optimize_image(upload_path, optimized_path)

    # Step 2: Create thumbnail (sync)
    create_thumbnail(upload_path, thumb_path, (300, 300))

    # Step 3: Upload all versions to CDN (async, parallel)
    upload_tasks = [
        upload_to_cdn(upload_path, f"images/{user_id}/original/{filename}"),
        upload_to_cdn(optimized_path, f"images/{user_id}/optimized/{filename}"),
        upload_to_cdn(thumb_path, f"images/{user_id}/thumb/{filename}"),
    ]

    original_url, optimized_url, thumb_url = await asyncio.gather(*upload_tasks)

    # Step 4: Save to database
    image_id = await save_image_record(
        user_id,
        original_url,
        optimized_url,
        thumb_url,
        {
            "filename": filename,
            "upload_date": datetime.utcnow().isoformat()
        }
    )

    # Step 5: Cleanup temp files
    for path in [upload_path, optimized_path, thumb_path]:
        if os.path.exists(path):
            os.remove(path)

    # Step 6: Track analytics
    await track_user_event("image_uploaded", user_id, {"image_id": image_id})


# Error handler for image processing
async def image_processing_error(task: Task, error: Exception) -> None:
    """Handle image processing failures"""
    # Clean up temp files
    upload_path = task.args[1] if len(task.args) > 1 else None
    if upload_path and os.path.exists(upload_path):
        os.remove(upload_path)

    # Log error
    import logging
    logging.error(f"Image processing failed: {error}")


# Upload endpoint
@app.post("/upload-image")
async def upload_image(
    file: UploadFile,
    user_id: int,
    tasks: Tasks
) -> dict:
    """Upload and process image"""

    # Save uploaded file to temp location
    upload_path = f"/tmp/{file.filename}"

    async with aiofiles.open(upload_path, 'wb') as f:
        content = await file.read()
        await f.write(content)

    # Start processing immediately (long-running)
    tasks.task(
        name=f"process_image_{user_id}_{file.filename}",
        on_error=image_processing_error
    ).schedule(
        process_uploaded_image,
        user_id,
        upload_path,
        file.filename
    )

    return {
        "status": "processing",
        "message": "Image uploaded successfully and is being processed"
    }

Example 4: Analytics Event Processing

Batch analytics event processing with retry and fallback:

from collections import defaultdict
from datetime import datetime
import asyncio


# In-memory buffer for batching
event_buffer = defaultdict(list)
buffer_lock = asyncio.Lock()


async def flush_events_to_analytics(events: list[dict]) -> None:
    """Send batched events to analytics service"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://analytics.example.com/batch",
            json={"events": events}
        )
        response.raise_for_status()


async def save_events_to_db_fallback(events: list[dict]) -> None:
    """Fallback: Save events to database if analytics service fails"""
    await db.execute_many(
        "INSERT INTO analytics_events (event_type, user_id, properties, timestamp) "
        "VALUES (?, ?, ?, ?)",
        [
            (e["event"], e["user_id"], json.dumps(e["properties"]), e["timestamp"])
            for e in events
        ]
    )


async def analytics_error_handler(task: Task, error: Exception) -> None:
    """Handle analytics failures with fallback"""
    events = task.args[0] if task.args else []

    # Try fallback to database
    try:
        await save_events_to_db_fallback(events)
        import logging
        logging.warning(f"Analytics service failed, saved {len(events)} events to DB")
    except Exception as fallback_error:
        # Both failed - alert
        await send_slack_alert(
            f"Analytics pipeline failed completely: {error}\n"
            f"Fallback also failed: {fallback_error}\n"
            f"Lost {len(events)} events"
        )


# Batch processor
async def process_event_batch(events: list[dict], tasks: Tasks) -> None:
    """Process a batch of analytics events"""
    if not events:
        return

    tasks.task(
        name=f"analytics_batch_{len(events)}_events",
        on_error=analytics_error_handler
    ).schedule(flush_events_to_analytics, events)


# Track event endpoint
@app.post("/track")
async def track_event(
    event: str,
    user_id: int,
    properties: dict,
    tasks: Tasks
) -> dict:
    """Track an analytics event with batching"""

    event_data = {
        "event": event,
        "user_id": user_id,
        "properties": properties,
        "timestamp": datetime.utcnow().isoformat()
    }

    async with buffer_lock:
        event_buffer[user_id].append(event_data)

        # Flush if buffer is large enough
        if len(event_buffer[user_id]) >= 10:
            events_to_flush = event_buffer[user_id].copy()
            event_buffer[user_id].clear()

            # Process batch in background
            await process_event_batch(events_to_flush, tasks)

    return {"status": "tracked"}

Common Patterns Across Examples

Pattern 1: Critical Operations First

Always process critical operations (payment, inventory) before non-critical ones (emails, analytics):

# Critical: Payment
tasks.task(shield=True).schedule(process_payment, ...)

# Non-critical: Email
tasks.after_response.schedule(send_confirmation, ...)

Pattern 2: Proper Error Handling

All production tasks should have error handlers:

tasks.task(
    name="descriptive_name",
    on_error=appropriate_error_handler
).schedule(task_function, ...)

Pattern 3: Shield Sparingly

Only shield operations that absolutely must complete:

# Shield: Payment processing
tasks.task(shield=True).schedule(finalize_payment, ...)

# Don't shield: Email notifications
tasks.schedule(send_email, ...)  # Can retry if it fails

Pattern 4: Use Timing Modes Appropriately

  • Immediate: Long-running operations that don't affect the response
  • After-route: Quick operations that must are scheduled before response
  • After-response: All non-critical notifications and logging

Next Steps