---
name: eeg-pipeline
description: "Top-level orchestrator: chains eeg-bids → eeg-preprocess → eeg-ica → eeg-epoch → (eeg-erp / eeg-tfr / eeg-connectivity / eeg-microstate / eeg-source) → eeg-stats → eeg-figure → eeg-report → eeg-audit. Reads DATASET_BRIEF.md and ANALYSIS_PLAN.md to decide which branches to run. Use when the user wants the full pipeline end-to-end."
argument-hint: "[project-dir] [— skip: ica,source] [— from: epoch] [— auto-proceed]"
allowed-tools: Bash(*), Read, Write, Edit, Grep, Glob, Skill
---

# eeg-pipeline: full orchestration

## Context: $ARGUMENTS

## Constants

- **AUTO_PROCEED** — read from `~/.claude/checkpoint.json`. If `auto_proceed: true` and the gate is `"auto"`, run gates auto-pass. If `auto_proceed: false` or gate is `"always_wait"`, pause for user OK between stages. If `~/.claude/checkpoint.json` does not exist, default to `true`.
- **FAIL_FAST = `false`** — when `true`, abort on first per-subject failure; otherwise log and continue.
- **STAGES_DEFAULT = `[bids, preprocess, ica, epoch, erp, tfr, stats, figure, report, audit]`** — `connectivity / microstate / source` only added if `DATASET_BRIEF.md > Analyses planned` ticks them.
- **STATE_FILE = `<project-dir>/PIPELINE_STATE.json`** — crash recovery persistence (see Phase B).
- **TIMEOUT_SECONDS** — read from `~/.claude/checkpoint.json`, default `300`.

## Stage dependency graph

Some stages have strict sequential dependencies; others can run in parallel when their inputs are ready:

```
bids ──→ preprocess ──→ ica ──→ epoch ──┬──→ erp ────────┐
                                         ├──→ tfr ────────┤
                                         ├──→ connectivity┤
                                         ├──→ microstate  ├──→ stats ──→ figure ──→ report ──→ audit
                                         └──→ source ─────┘
```

- **Parallel group**: After `epoch` completes, `erp`, `tfr`, `connectivity`, `microstate`, and `source` can all run independently.
- **Sync point**: `stats` waits for ALL analysis branches to finish before starting.
- In practice, invoke parallel branches sequentially via `Skill` tool (no true multiprocessing), but log that they have no data dependency.

## Phases

### A. Pre-flight (mandatory)
1. **Checkpoint config**: Read `~/.claude/checkpoint.json`. Set `AUTO_PROCEED` and `TIMEOUT_SECONDS` from the file. If file missing, default `AUTO_PROCEED=true`, `TIMEOUT_SECONDS=300`.
2. **Auto-brief**: If `DATASET_BRIEF.md` does not exist but `raw/` does, run `python tools/auto_brief.py --raw-dir <project-dir>/raw/ --out <project-dir>/DATASET_BRIEF.md --json <project-dir>/auto_brief_scan.json`. Then ask the user conversationally about remaining `[USER]` fields (paradigm, condition→marker mapping, epoch window, hypothesis). Do NOT dump the full template.
3. Read `DATASET_BRIEF.md`. If any `[USER]` fields remain for critical parameters (paradigm, conditions, epoch window), ask the user. Non-critical fields with sensible defaults (bandpass, reference, stats) can proceed with defaults.
4. Read or generate `ENVIRONMENT.json` via `tools/env/check_env.{sh,ps1}`.
5. Read `ANALYSIS_PLAN.md`. If absent, generate a draft from `DATASET_BRIEF.md` claims and ask the user to confirm. **Do not run analyses without a frozen plan.**
6. Resolve which optional analyses (TFR / connectivity / microstate / source) are ticked in DATASET_BRIEF.
7. **Resume check**: If `PIPELINE_STATE.json` exists AND `— from:` was NOT explicitly passed, read it and offer to resume from the last incomplete stage (or auto-resume if `AUTO_PROCEED=true`).

### B. State persistence (crash recovery)

Maintain `PIPELINE_STATE.json` throughout the run for crash recovery:

```json
{
  "pipeline_version": "1.0",
  "project_dir": "/path/to/project",
  "started_at": "2026-05-22T10:00:00Z",
  "updated_at": "2026-05-22T10:15:00Z",
  "resolved_stages": ["bids", "preprocess", "ica", "epoch", "erp", "stats", "figure", "report", "audit"],
  "completed_stages": [
    {"stage": "bids", "status": "done", "duration_s": 12.3, "started_at": "...", "finished_at": "..."},
    {"stage": "preprocess", "status": "done", "duration_s": 45.1, "started_at": "...", "finished_at": "..."}
  ],
  "current_stage": "ica",
  "current_stage_started_at": "2026-05-22T10:01:00Z",
  "failed_stages": [],
  "skipped_stages": ["source"],
  "auto_proceed": true,
  "fail_fast": false
}
```

**Update rules**:
- Write `PIPELINE_STATE.json` at the start of the pipeline with `resolved_stages`.
- Before each stage: update `current_stage` and `current_stage_started_at`.
- After each stage: append to `completed_stages` with timing, update `updated_at`.
- On failure: append to `failed_stages` with error message.
- On crash recovery: read the file, skip `completed_stages`, resume from `current_stage`.

### C. Stage chain

For each stage in resolved list, in order:

1. **Update state**: Write `current_stage` to `PIPELINE_STATE.json`.
2. **Record start time**: `stage_start = now()`.
3. **Invoke skill**: Call matching skill via `Skill` tool, passing `[project-dir]` + relevant overrides.
4. **Record end time**: `stage_end = now()`. Compute `duration_s = stage_end - stage_start`.
5. **Progress report**: Print to user:
   ```
   ✓ [stage] completed in [duration_s]s  ([N]/[total] stages done)
   ```
6. **Update state**: Append to `completed_stages` in `PIPELINE_STATE.json`.
7. Read the stage's report; if it reports any subject failure and `FAIL_FAST=true`, abort.
8. **Gate**: If `AUTO_PROCEED=false`, summarize the stage report to the user and wait for "go" before the next stage. If `AUTO_PROCEED=true`, proceed immediately.

### D. Audit + finalize

After all stages:
1. Invoke `eeg-audit`. Surface the audit verdict to the user.
2. Do not declare the pipeline complete unless the audit produces verdict `pass` or `pass-with-caveats`.
3. **Final timing report**: Print a summary table of all stage durations:
   ```
   Stage          Duration    Status
   ─────────────  ──────────  ──────
   bids           12.3s       done
   preprocess     45.1s       done
   ica            120.5s      done
   ...
   TOTAL          312.4s
   ```
4. Update `PIPELINE_STATE.json` with `"pipeline_status": "complete"` and total duration.

## Resume semantics

- `— from: <stage>` skips earlier stages, reading their existing artifacts. Overrides `PIPELINE_STATE.json` resume point.
- `— skip: <stage1>,<stage2>` skips specific stages entirely. They appear in `skipped_stages` in the state file.
- If `PIPELINE_STATE.json` exists and no `— from:` override, auto-resume from the last incomplete stage.
- The pipeline is **idempotent on re-run** for any stage as long as inputs haven't changed.

## Per-stage timing and progress

Every stage invocation is timed. The pipeline emits:
- **Per-stage**: `"[stage] completed in [X]s"` immediately after each stage.
- **Running total**: `"([N]/[total] stages done, elapsed [Y]s)"` after each stage.
- **Final summary**: Table of all stages with durations at pipeline end (see Phase D).

## Failure modes

| Symptom | Action |
|---|---|
| `PIPELINE_STATE.json` corrupt | Delete it, start fresh, warn user. |
| Stage times out (> TIMEOUT_SECONDS) | Log timeout in state file, surface to user, ask whether to continue. |
| `DATASET_BRIEF.md` missing + no `raw/` | Stop. Cannot auto-brief without raw data. |
| `ANALYSIS_PLAN.md` missing | Generate draft, require user confirmation before proceeding. |
| Checkpoint config missing | Default to `auto_proceed: true`, `timeout_seconds: 300`. |

## Cross-references

- See each child skill's own SKILL.md for stage-specific behavior.
- Read `AGENT_GUIDE.md` for the project-layout convention.
- Checkpoint config: `~/.claude/checkpoint.json`.
