Skip to content

Background jobs in a SCALR app

Use this when a request handler would otherwise block on slow work (sending email, regenerating a report, calling out to a third-party API). The handler enqueues a task, returns immediately, and a worker process executes the task asynchronously.

TL;DR

from scalr_queue import enqueue

@router.post("/invoices/{id}/send")
async def send_invoice(id: int):
    await enqueue("send_invoice_email", invoice_id=id)
    return {"queued": True}
# backend/src/worker.py
async def send_invoice_email(ctx, invoice_id: int) -> None:
    ...

tasks = [send_invoice_email]

Two-process model

Apps that use background jobs run two containers:

Container Process Purpose
<app>-backend uvicorn Handles HTTP requests, calls enqueue()
<app>-worker python -m scalr_queue.worker Pulls tasks off Redis, runs them

Both share the same image and source code; they differ only in the command. Both connect to the shared Redis at redis://scalr-redis:6379/0 (set via REDIS_URL in .env.shared).

Activating the worker in your app

template-app/docker-compose.yml ships a commented-out worker: service stub. To activate:

  1. Uncomment the worker: block in your app's docker-compose.yml.
  2. Make sure your backend/src/worker.py exports a tasks = [...] list (one is provided in fresh apps as an example).
  3. make up. Both backend and worker come up.

Verify with:

docker logs -f <app>-worker

You should see arq's startup banner and redis_version=7.x.x ....

Writing a task

Tasks are async functions whose first argument is ctx (arq's per-job context — has redis pool, job_id, job_try, etc.). The rest are whatever the enqueuer passes:

import logging
from scalr_email import send

logger = logging.getLogger(__name__)

async def send_invoice_email(ctx: dict, invoice_id: int) -> None:
    logger.info("sending invoice %s (job %s)", invoice_id, ctx["job_id"])
    # ... look up invoice, render template, send mail ...
    send(to="customer@example.com", subject="Your invoice", body="...")

tasks = [send_invoice_email]

Add the function to the tasks list. The worker's python -m scalr_queue.worker src.worker command picks them up automatically.

Enqueuing from a request handler

from scalr_queue import enqueue

@router.post("/invoices/{id}/send")
async def trigger(id: int):
    job = await enqueue("send_invoice_email", invoice_id=id)
    return {"job_id": job.job_id}

enqueue("name", ...) looks up the function by name in the worker's tasks list. Misspelled names won't fail at enqueue time — they fail when the worker tries to run them. Worth a unit test that the names your backend uses appear in your worker's tasks list.

Retries and timeouts

Defaults from scalr_queue:

  • Concurrency: 10 jobs per worker process simultaneously.
  • Per-job timeout: 300 seconds. Tasks that take longer get killed and retried.
  • Retries: arq defaults — 5 attempts with exponential backoff.

To override, switch from a tasks list to a full arq WorkerSettings class:

class WorkerSettings:
    functions = [send_invoice_email]
    max_jobs = 50
    job_timeout = 1800
    max_tries = 3   # don't retry forever
    on_startup = warm_caches
    on_shutdown = drain_in_flight

…and run with python -m arq backend.src.worker.WorkerSettings directly (skip the scalr_queue.worker wrapper). At that point you're using arq vanilla; the wrapper exists for the 90% case where you just want a list of tasks.

Scheduled / cron jobs

arq supports cron-style scheduled tasks. Use a WorkerSettings class:

from arq import cron

async def cleanup_stale_invoices(ctx):
    ...

class WorkerSettings:
    functions = []
    cron_jobs = [
        cron(cleanup_stale_invoices, hour=3, minute=0),  # 03:00 daily
    ]

When NOT to use the queue

Default to synchronous in-request work. Reach for the queue only when the task takes >1s, can fail and should retry, is fire-and-forget, or needs to be scheduled. For fast in-memory work, the Redis round-trip costs more than it saves.

Local debugging

# Tail worker logs
docker logs -f <app>-worker

# See queue depth
docker exec scalr-redis redis-cli llen arq:queue:default

# Inspect a specific job (replace JOB_ID)
docker exec scalr-redis redis-cli get "arq:job:JOB_ID"

# Health check (arq built-in)
docker exec <app>-worker python -m arq --check src.worker

For richer observability, wait for centralized logs (planned, see ~/.claude/plans/lovely-prancing-alpaca.md). Worker logs will flow into Grafana via Loki and a panel will surface queue health.

Failure modes

  • Backend can enqueue, worker doesn't run. Worker container is down or tasks list is empty/misnamed. docker compose ps and worker logs.
  • Tasks run but fail silently. They almost certainly raised an exception inside the worker — check worker logs. Default behaviour is to retry; after max_tries the job is moved to the "failed" set and you can inspect it via redis-cli.
  • Redis full / OOM. Default Redis has no maxmemory cap. If you're enqueuing faster than processing for hours, set maxmemory and a maxmemory-policy in services/redis/docker-compose.yml.
  • Slow tasks block the queue. A single worker processes 10 jobs in parallel. If all 10 are long-running, new tasks queue up. Either raise max_jobs, run multiple worker replicas, or split onto separate queues (enqueue("name", _queue_name="bulk")).