---
name: fastapi-async-and-task-integration
description: Use when adding background tasks or event integration to a scaffolded FastAPI service after the broker and contracts are declared in backend-architecture. Adds Celery/RQ/arq or Kafka wiring, a transactional outbox, idempotent consumers, retry/DLQ, and Testcontainers tests. Not for the shell, auth, observability, or performance.
---

# FastAPI Async and Task Integration

## When to use

Invoke when a scaffolded FastAPI service must enqueue background work, publish domain events, or consume from a broker (Celery/RQ/arq on Redis, or Kafka), and the integration needs correct delivery semantics, idempotency, retry/DLQ, and an outbox so a database commit and an event publish cannot diverge.

Do not use for: the service shell, settings, or error tiers (use `fastapi-service-scaffold`), auth or OWASP review (use `fastapi-auth-and-security-review`), OpenTelemetry/metrics/SLO wiring (use `fastapi-observability-readiness`), or pool-sizing/circuit-breaker/load-test gating (use `fastapi-performance-and-resilience`).

## Inputs

Required:

- A service with the `fastapi-service-scaffold` baseline (DI providers, validated settings, logging, ASGI-lifespan shutdown present).
- `backend-architecture.md` declaring the broker, the event/message contracts, and the delivery-semantics and retry strategy — or explicit confirmation a needed decision is intentionally deferred.

Optional:

- Approved `architecture/reliability` for redelivery/DLQ expectations and the consumer SLO.
- The transactional data store (for the outbox table) if `backend-architecture.md` is silent.
- Ordering and partitioning requirements (Kafka key, queue priority) per contract.

## Operating rules

- Never invent the broker, contract, or delivery semantics. Broker choice, message/event schemas, ordering, and at-least-once vs effectively-once belong to `backend-architecture.md`. If silent on a decision this skill needs, pause and raise an ADR candidate rather than guessing.
- Extend the scaffold; do not duplicate it. Producers/consumers register in the scaffold DI providers, read connection settings from the validated settings seam, log through the scaffold structlog logger, and close via the scaffold ASGI-lifespan shutdown. Do not re-create any of these.
- Delivery is at-least-once unless the broker and contract guarantee otherwise: therefore every consumer/task is idempotent. Idempotency is explicit — a dedupe key (message id or business key) checked against a store, not "the task is probably safe to re-run".
- A database write that must produce an event uses the transactional outbox: the domain change and the outbox row commit in one transaction; a relay publishes from the outbox. Never publish inside the request path before the transaction commits (dual-write hazard).
- Failure handling is explicit and bounded: a retry policy with backoff and a max attempt count, then a dead-letter destination. A message/task is never retried forever and never silently dropped.
- Workers respect shutdown: on `SIGTERM` the worker stops fetching, finishes in-flight tasks within a bounded timeout, and does not ack work it did not complete. Poison messages go to the DLQ, not an infinite redelivery loop.
- Producers do not block the request path on broker latency beyond a bounded timeout; a broker outage degrades to the outbox (for transactional events) or a clear, handled error — never an unbounded hang or a synchronous broker call on the async path.
- An integration without a real-broker test is not done. Provide Testcontainers-backed tests proving: successful round trip, duplicate delivery handled idempotently, retry then DLQ on poison, and clean shutdown mid-consume.
- A change that does not pass `mypy`, `ruff`, the integration tests, and the boot smoke check is not done. Fix and re-run.

## Output contract

The async/task integration MUST conform to:

- [api-standards](../../../../../standards/api-standards/README.md) — message/event payloads match the declared contract and are schema-validated (Pydantic) on produce and consume; versioned, explicit envelope.
- [observability-standards](../../../../../standards/observability-standards/README.md) — produce/consume go through the scaffold logger with correlation; retry, DLQ, and lag are observable.

Upstream contract: `backend-architecture.md` is the source of truth for broker, contracts, ordering, and delivery semantics; `architecture/reliability` is the source of truth for redelivery/DLQ expectations and the consumer SLO. If either is silent on a decision this skill needs, pause and raise an ADR candidate rather than guessing.

## Progressive references

- Read `references/fastapi-async-playbook.md` when implementing any owned area or checking the anti-pattern list.
- Read `references/fastapi-async-quality-rubric.md` before declaring the work complete.
- Use `assets/fastapi-async-and-task-integration.template.md` as the producer, consumer, outbox, and test reference.

## Process

1. Gather context: load `backend-architecture.md` (broker, contracts, ordering, delivery semantics) and `architecture/reliability` (redelivery/DLQ, consumer SLO). Confirm the scaffold baseline and the transactional store. If a needed decision is missing, raise an ADR candidate before proceeding.
2. Extend settings: add broker connection settings, queue/topic names, worker concurrency, retry attempts/backoff, and DLQ target to the scaffold `Settings` model and `.env.example` (placeholders only).
3. Define the message envelope and schemas: a versioned Pydantic envelope (`id`, `type`, `occurred_at`, `schema_version`, `payload`) with a model per message type, validated on both produce and consume; reject unknown types.
4. Implement the producer: a typed publish API registered in the scaffold DI providers, with a bounded send timeout. For events tied to a database write, write to an outbox table in the same transaction instead of publishing inline.
5. Implement the transactional outbox relay: a poller (or CDC hook) that reads unsent outbox rows, publishes them, marks them sent, and is itself idempotent and at-least-once safe.
6. Implement the consumer/task: registered in the DI providers, idempotent via an explicit dedupe-key store, with a bounded retry+backoff policy and a max-attempt threshold that routes to the DLQ. Honor the scaffold ASGI-lifespan shutdown (stop fetching, drain in-flight, no ack of incomplete work).
7. Make it observable: produce, consume, retry, DLQ, and consumer lag emit through the scaffold logger with correlation; expose the metrics seam hooks for `fastapi-observability-readiness` to instrument (do not wire the vendor here).
8. Write integration tests with Testcontainers: a real broker container; assert successful round trip, idempotent handling of a duplicate, retry-then-DLQ on a poison message, and clean shutdown mid-consume without losing or double-acking work.
9. Build verification (mandatory): run `mypy`, `ruff check`, the integration test command, and the boot smoke check. Fix and re-run on failure. Validate against the Output contract standards; document any unresolved gap in the service README.

## Outputs

Required:

- Typed producer registered in the scaffold DI providers with a bounded send timeout.
- Transactional outbox table + relay for events tied to a database write (no inline dual write).
- Idempotent consumer/task with an explicit dedupe-key store, bounded retry+backoff, and a DLQ route.
- Versioned Pydantic message envelope with per-type models validated on produce and consume.
- Shutdown-aware worker wired to the scaffold ASGI-lifespan shutdown.
- Testcontainers integration tests: round trip, duplicate, retry→DLQ, shutdown mid-consume.

Output rules:

- The scaffold DI, settings, logging, and lifespan shutdown are extended, not duplicated.
- No event is published inside the request path before its transaction commits (outbox required for transactional events).
- Every consumer/task is idempotent with an explicit dedupe mechanism — not assumed-safe.
- No message is retried unbounded or silently dropped; poison goes to the DLQ.

## Quality checks

- [ ] `mypy`, `ruff check`, the integration tests, and the boot smoke check all pass (or a skip is documented with reason).
- [ ] Broker, contract, ordering, and delivery semantics trace to `backend-architecture.md` (no invented choice).
- [ ] The producer, consumer, and relay are registered in the scaffold DI providers and use the scaffold settings/logger/lifespan.
- [ ] Events tied to a DB write go through the transactional outbox; no inline publish-before-commit exists.
- [ ] The consumer/task is idempotent via an explicit dedupe-key store — a duplicate delivery is test-verified to have no double effect.
- [ ] Retry uses bounded backoff with a max-attempt count; exceeding it routes to the DLQ (test-verified).
- [ ] On `SIGTERM` the worker stops fetching, drains in-flight within the bounded timeout, and does not ack incomplete work (test-verified).
- [ ] Every message is envelope- and schema-validated on produce and consume; unknown types are rejected.
- [ ] Producer send is bounded by a timeout; a broker outage does not hang the request path or block the event loop.
- [ ] Produce/consume/retry/DLQ/lag are logged with correlation; metrics seam hooks are exposed for the observability skill.
- [ ] The service README documents the broker, contracts, retry/DLQ policy, and the outbox relay.

## References

- Upstream: [`architecture/backend-architecture`](../../../../architecture/backend-architecture/SKILL.md), [`architecture/reliability`](../../../../architecture/reliability/SKILL.md).
- Baseline this extends: `fastapi-service-scaffold`. Related: `fastapi-auth-and-security-review`, `fastapi-observability-readiness`, `fastapi-performance-and-resilience`.
- Standards: [`api-standards`](../../../../../standards/api-standards/README.md), [`observability-standards`](../../../../../standards/observability-standards/README.md).
