---
name: oban-thinking
description: This skill should be used when the user asks to "add a background job", "process async", "schedule a task", "retry failed jobs", "add email sending", "run this later", "add a cron job", "unique jobs", "batch process", or mentions Oban, Oban Pro, workflows, job queues, cascades, grafting, recorded values, job args, or troubleshooting job failures.
---

# Oban Thinking

Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.

---

# Part 1: Oban (Non-Pro)

## The Iron Law: JSON Serialization

```
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
```

This single fact causes most Oban debugging headaches.

```elixir
# Creating - atom keys are fine
MyWorker.new(%{user_id: 123})

# Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
  # ...
end
```

## Error Handling: Let It Crash

**Don't catch errors in Oban jobs.** Let them bubble up to Oban for proper handling.

### Why?

1. **Automatic logging**: Oban logs the full error with stacktrace
2. **Automatic retries**: Jobs retry with exponential backoff
3. **Visibility**: Failed jobs appear in Oban Web dashboard
4. **Consistency**: Error states are tracked in the database

### Anti-Pattern

```elixir
# Bad: Swallowing errors
def perform(%Oban.Job{} = job) do
  case do_work(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, reason} ->
      Logger.error("Failed: #{reason}")
      {:ok, :failed}  # Silently marks as complete!
  end
end
```

### Correct Pattern

```elixir
# Good: Let errors propagate
def perform(%Oban.Job{} = job) do
  result = do_work!(job.args)  # Raises on failure
  {:ok, result}
end

# Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do
  case do_work(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, reason} -> {:error, reason}  # Oban will retry
  end
end
```

### When to Catch Errors

Only catch errors when you need custom retry logic or want to mark a job as permanently failed:

```elixir
def perform(%Oban.Job{} = job) do
  case external_api_call(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, :not_found} -> {:cancel, :resource_not_found}  # Don't retry
    {:error, :rate_limited} -> {:snooze, 60}  # Retry in 60 seconds
    {:error, _} -> {:error, :will_retry}  # Normal retry
  end
end
```

## Snoozing for Polling

Use `{:snooze, seconds}` for polling external state instead of manual retry logic:

```elixir
def perform(%Oban.Job{} = job) do
  if external_thing_finished?(job.args) do
    {:ok, :done}
  else
    {:snooze, 5}  # Check again in 5 seconds
  end
end
```

## Simple Job Chaining

For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:

```elixir
def perform(%Oban.Job{} = job) do
  result = do_work(job.args)
  # Enqueue next job on success
  NextWorker.new(%{data: result}) |> Oban.insert()
  {:ok, result}
end
```

**Don't reach for Oban Pro Workflows for linear chains.**

## Unique Jobs

Prevent duplicate jobs with the `unique` option:

```elixir
use Oban.Worker,
  queue: :default,
  unique: [period: 60]  # Only one job with same args per 60 seconds

# Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]
```

**Gotcha:** Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.

## High Throughput: Chunking

For millions of records, **chunk work into batches** rather than one job per item:

```elixir
# Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())

# Good: Chunk into batches
contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
```

Use bulk inserts without uniqueness constraints for maximum throughput.

---

# Part 2: Oban Pro

## Cascade Context: Erlang Term Serialization

Unlike regular job args, **cascade context preserves atoms**:

```elixir
# Creating - atom keys
Workflow.put_context(%{score_run_id: id})

# Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
  # ...
end

# Dot notation works too
def later_step(context) do
  context.score_run_id
  context.previous_result
end
```

### Serialization Summary

| | Creating | Processing |
|-----------------|----------|--------------|
| Regular jobs | atoms ok | strings only |
| Cascade context | atoms ok | atoms ok |

## When to Use Workflows

Reserve Workflows for:
- Complex dependency graphs (not just linear chains)
- Fan-out/fan-in patterns
- When you need recorded values across steps
- Conditional branching based on runtime state

**Don't use Workflows for simple A → B → C chains.**

## Workflow Composition with Graft

When you need a parent workflow to wait for a sub-workflow to complete before continuing, use `add_graft` instead of `add_workflow`.

### Key Differences

| Method | Sub-workflow completes before deps run? | Output accessible? |
|--------|----------------------------------------|-------------------|
| `add_workflow` | No - just inserts jobs | No |
| `add_graft` | Yes - waits for all jobs | Yes, via recorded values |

### Pattern: Composing Independent Concerns

Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:

```elixir
# Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do
  def workflow(score_run_id) do
    Workflow.new()
    |> Workflow.add(:aggregate, AggregateJob.new(...))
    |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate)  # Wrong place!
  end
end

# Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do
  def workflow(site_url, opts) do
    notification_opts = build_notification_opts(opts)

    Workflow.new()
    |> Workflow.put_context(%{notification_opts: notification_opts})
    |> Workflow.add_graft(:scoring, &graft_full_run/1)
    |> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
  end

  defp graft_full_run(context) do
    # Sub-workflow doesn't know about notifications
    FullRun.workflow(context.site_url, context.opts)
    |> Workflow.apply_graft()
    |> Oban.insert_all()
  end
end
```

### Recording Values for Dependent Steps

For a grafted workflow's output to be available to dependent steps, the final job must use `recorded: true`:

```elixir
defmodule FinalJob do
  use Oban.Pro.Worker, queue: :default, recorded: true

  def perform(%Oban.Job{} = job) do
    # Return value becomes available in context
    {:ok, %{score_run_id: score_run_id, composite_score: score}}
  end
end
```

## Dynamic Workflow Appending

Add jobs to a running workflow with `Workflow.append/2`:

```elixir
def perform(%Oban.Job{} = job) do
  if needs_extra_step?(job.args) do
    job
    |> Workflow.append()
    |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
    |> Oban.insert_all()
  end
  {:ok, :done}
end
```

**Caveat:** Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.

## Fan-Out/Fan-In with Batches

To run a final job after multiple paginated workflows complete, use Batch callbacks:

```elixir
# Wrap workflows in a shared batch
batch_id = "import-#{import_id}"

pages
|> Enum.each(fn page ->
  PageWorkflow.workflow(page)
  |> Batch.from_workflow(batch_id: batch_id)
  |> Oban.insert_all()
end)

# Add completion callback
Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()
```

**Tip:** Include pagination workers in the batch to prevent premature completion.

## Testing Workflows

**Don't use inline testing mode** - workflows need database interaction.

```elixir
# Use run_workflow/1 for integration tests
assert %{completed: 3} =
  Workflow.new()
  |> Workflow.add(:a, WorkerA.new(%{}))
  |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
  |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
  |> run_workflow()
```

For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.

---

# Red Flags - STOP and Reconsider

**Non-Pro:**
- Pattern matching on atom keys in `perform/1`
- Catching all errors and returning `{:ok, _}`
- Wrapping job logic in try/rescue
- Creating one job per item when processing millions of records

**Pro:**
- Using `add_workflow` when you need to wait for completion
- Coupling notifications/emails to domain workflows
- Not using `recorded: true` when you need output from grafted workflows
- Using Workflows for simple linear job chains
- Testing workflows with inline mode

**Any of these? Re-read the serialization rules.**
