---
name: keboola-data-engineering
description: Expert assistant for Keboola data platform. Builds working data pipelines, not just advice. Use for: data extraction, transformation, validation, orchestration, dashboard creation.
---

# Keboola Data Engineering Skill v4.1

## Quick Start (Copy-Paste Workflow)

**Build a pipeline in 4 steps:**
```yaml
1. Understand: Ask outcome questions → Save context → Track with todos
2. Discover: List data sources → Use agents for complex searches → Map to extractors
3. Propose: Show architecture + diagram → Get explicit approval
4. Build: Generate configs → Test in sandbox → Validate impact → Deploy → Monitor
```

**Tool Pattern**: `Read KNOWLEDGE_MAP → Read component docs → Write config → Bash deploy`

---

## Tool Reference Card

| Task | Tool | Command Pattern |
|------|------|-----------------|
| **Find component** | Read | `Read resources/KNOWLEDGE_MAP.md`, search for name |
| **Complex search** | Task | `Task(subagent_type=Explore, prompt=...)` for multi-file searches |
| **Search docs** | Grep | `Grep pattern in docs-repos/` |
| **Get template** | Read | `Read resources/templates/{name}.md` |
| **Save config** | Write | `Write {name}.json with content` |
| **Save context** | Write | `Write project_context.json` - persist requirements |
| **Track progress** | TodoWrite | Track requirements, validations, decisions |
| **Deploy config** | Bash | `curl -X POST {api_url} -d @{file}` |
| **Test pipeline** | Bash | `curl {queue_api} -d {job_params}` |
| **Check MCP** | - | If `mcp__keboola_*` tools exist, use them |

---

## Core Workflow

### Step 1: Understand Business Problem (5 questions)

**Ask these, nothing more until answered:**
1. "What decision does this enable? Who makes it?"
2. "What's the ONE metric that matters most?"
3. "How often is this needed? (Real-time/Hourly/Daily/Weekly)"
4. "Does data contain PII? (Names/Emails/SSNs/Financial data)"
5. "What does success look like in 30 days?"

**Output**: `{Decision: "X", Metric: "Y", Frequency: "Z", PII: Yes/No, Success: "..."}`

**⭐ NEW: Persist Context** (Feature 1: File-based state tracking)

**Use Write tool** to save `project_context.json`:
```json
{
  "decision": "{what decision this enables}",
  "decision_maker": "{who makes the decision}",
  "metric": "{the ONE key metric}",
  "frequency": "{Real-time/Hourly/Daily/Weekly}",
  "pii": true/false,
  "success_criteria": "{what success looks like in 30 days}",
  "timestamp": "{ISO 8601 timestamp}"
}
```

**⭐ NEW: Track Context with Todos** (Feature 2: TodoWrite context tracking)

**Use TodoWrite tool** to track business requirements:
```json
{
  "todos": [
    {"content": "Business context captured: {metric}, {frequency}, PII={yes/no}", "status": "completed", "activeForm": "Capturing business context"},
    {"content": "Validate architecture includes PII handling (required)", "status": "pending", "activeForm": "Validating PII requirements"},
    {"content": "Ensure {frequency} schedule is implemented", "status": "pending", "activeForm": "Implementing schedule"}
  ]
}
```

**Use Read tool** on `resources/templates/Discovery_Prompt.txt` for 15 more optional questions.

---

### Step 2: Discover Data Sources

**If MCP available**: Call `mcp__keboola_storage_api(endpoint="/buckets")` to list existing data

**If MCP unavailable**: Ask "What systems do you use?" then:

**⭐ NEW: Complex Discovery with Agents** (Feature 5: Multi-agent delegation)

For complex searches (e.g., "Find all extractors for CRM systems"):
```yaml
Use Task tool:
  subagent_type: Explore
  thoroughness: medium
  prompt: |
    Find Keboola components for: {user's data sources}

    Search:
    - resources/KNOWLEDGE_MAP.md for component IDs
    - docs-repos/connection-docs/components/extractors/ for configs

    Return structured list:
    - Component ID (e.g., keboola.ex-salesforce)
    - Doc path
    - Common config patterns (incremental, primaryKey)
    - Typical use cases
```

For simple lookups:
1. **Use Read tool** on `resources/KNOWLEDGE_MAP.md`
2. **Use Grep tool** to search for system name (e.g., "Salesforce", "MySQL")
3. Note component ID and doc path

**Data Inventory Template**:
```json
{
  "have": [
    {"system": "Salesforce", "component": "keboola.ex-salesforce", "tables": ["Opportunity", "Account"]},
    {"system": "MySQL", "component": "keboola.ex-db-mysql", "tables": ["orders", "customers"]}
  ],
  "need": [
    {"system": "Stripe", "status": "user will provide API key"},
    {"system": "Product events", "status": "missing - defer to Phase 2"}
  ]
}
```

**Use Write tool** to save inventory as `data_inventory.json`

---

### Step 3: Propose Architecture & Get Approval

**⚠️ CONTEXT-AWARE DESIGN** (Feature 1: Read saved context)

**Use Read tool** on `project_context.json` to retrieve requirements, then check:
```yaml
IF pii = true:
  MUST include:
    - PII field identification
    - Masking/hashing/removal strategy
    - Access control notes

IF frequency = "Real-time":
  MUST use:
    - CDC extractors (not batch)
    - Stream processing pattern

IF metric contains revenue/financial/cost:
  MUST include:
    - Impact simulation (current state vs projected)
    - Rollback plan
```

**Use Read tool** on `resources/templates/Design_Brief.md`, then create:

```markdown
## {Problem} Solution

**Outcome**: {What user will get}
**Frequency**: {Daily at 6am}
**Data Sources**: {List from Step 2}

**Pipeline**:
{Source 1} --[Extractor]--> in.c-{source}.{table}
{Source 2} --[Extractor]--> in.c-{source}.{table}
    ↓
[SQL Transform + Validation]
    ↓
out.c-{purpose}.{table}
    ↓
[Dashboard/Writer]

**Data Quality**:
- Freshness: < {X} hours
- Completeness: No NULLs in {key_fields}
- Validation: {What checks will run}

**PII Handling** (if applicable):
- {field}: Masked/Hashed/Removed

**Impact** (if metric-driven):
- Current state: {baseline}
- Projected: {expected change}
- Risk: {potential issues}
```

**⭐ NEW: Visual Architecture Diagram** (Feature 7: Visual diagrams)

Generate mermaid diagram for visual representation:
```mermaid
graph LR
    A[{Source 1}] -->|{Extractor}| B[in.c-{source}.{table}]
    C[{Source 2}] -->|{Extractor}| D[in.c-{source}.{table}]

    B --> E[SQL Transform]
    D --> E

    E --> F[out.c-{purpose}.{table}]

    F --> G[{Writer/Dashboard}]

    style E fill:#f9f,stroke:#333,stroke-width:4px
    style F fill:#bbf,stroke:#333,stroke-width:2px
```

**⚠️ STOP**: Ask "Should I proceed with building this?"
- If NO: Iterate on Step 3
- If YES: Continue to Step 4

**Use Write tool** to save as `architecture_proposal.md`

**Use TodoWrite** to update:
```json
{"content": "Architecture proposal approved by user", "status": "completed", "activeForm": "Getting architecture approval"}
```

---

### Step 4: Build It

#### A. Component Configs

**Pattern**: Find docs → Generate config → Deploy via API

**Example: Salesforce Extractor**

1. **Use Read tool** on path from KNOWLEDGE_MAP (e.g., `docs-repos/connection-docs/components/extractors/marketing-sales/salesforce/index.md`)

2. **Use Write tool** to create `salesforce_config.json`:
```json
{
  "parameters": {
    "objects": [
      {
        "name": "Opportunity",
        "soql": "SELECT Id, Amount, StageName, CloseDate FROM Opportunity WHERE LastModifiedDate >= LAST_N_DAYS:7",
        "output": "in.c-salesforce.opportunities"
      }
    ]
  }
}
```

3. **Use Bash tool** to deploy:
```bash
curl -X POST "https://connection.keboola.com/v2/storage/components/keboola.ex-salesforce/configs" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  --form 'name="Salesforce Opportunities"' \
  --form "configuration=@salesforce_config.json" \
  | tee response.json

CONFIG_ID=$(jq -r '.id' response.json)
echo "Extractor config ID: $CONFIG_ID"
```

**Repeat for each data source** from Step 2 inventory.

#### B. SQL Transformations (Validation MANDATORY)

**Pattern**: Business logic + Validation + Abort if fail

**⭐ NEW: Agent-Assisted SQL Generation** (Feature 5: Multi-agent delegation)

For complex transformations (joins, calculations, ML features):
```yaml
Use Task tool:
  subagent_type: general-purpose
  prompt: |
    Generate Snowflake SQL transformation for: {business requirement}

    Context from project_context.json:
    - Metric: {metric from context}
    - PII: {yes/no from context}
    - Frequency: {frequency from context}

    Apply DA/DE concepts:
    - Use Read tool on resources/Keboola_Data_Enablement_Guide.md
    - Apply relevant patterns (aggregation, window functions, etc.)

    MUST include:
    1. Business logic SQL (CREATE TABLE with calculations)
    2. PII handling (if PII=true): mask/hash/remove sensitive fields
    3. Validation SQL (freshness, volume, schema, completeness)
    4. SET ABORT_TRANSFORMATION pattern (fail fast on issues)
    5. Comments explaining DA/DE concepts applied

    Return: Complete SQL ready to test in sandbox
```

For simple transformations, manually write:

**Use Write tool** to create `transform.sql`:
```sql
-- 1. Business Logic
CREATE OR REPLACE TABLE "out.c-analytics.{output_table}" AS
SELECT
  {columns},
  {calculated_fields}
FROM "in.c-{source}.{table}"
{joins}
{where_clauses};

-- 2. Validation (REQUIRED - DO NOT SKIP)
CREATE OR REPLACE TABLE "_validation" AS
SELECT
  COUNT(*) as row_count,
  COUNT(DISTINCT {primary_key}) as unique_keys,
  COUNT(*) - COUNT({critical_field}) as null_count,
  DATEDIFF('hour', MAX({timestamp_field}), CURRENT_TIMESTAMP) as hours_old,
  CASE
    WHEN COUNT(*) = 0 THEN 'FAIL: No data'
    WHEN null_count > 0 THEN 'FAIL: NULLs in {critical_field}'
    WHEN hours_old > {max_hours} THEN 'FAIL: Data too old'
    WHEN row_count != unique_keys THEN 'FAIL: Duplicate keys'
    ELSE 'PASS'
  END as status
FROM "out.c-analytics.{output_table}";

-- 3. Abort if validation fails
SET ABORT_TRANSFORMATION = (
  SELECT CASE WHEN status != 'PASS' THEN status ELSE '' END FROM "_validation"
);
```

**⭐ NEW: Sandbox Testing** (Feature 4: Sandbox testing)

Before deploying to production:
```bash
# 1. Create temporary workspace for testing
curl -X POST "https://connection.keboola.com/v2/storage/workspaces" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"backend":"snowflake"}' \
  | tee workspace.json

WORKSPACE_ID=$(jq -r '.id' workspace.json)

# 2. Load sample data (last 7 days or 1000 rows)
curl -X POST "https://connection.keboola.com/v2/storage/workspaces/$WORKSPACE_ID/load" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  -d "input=in.c-{source}.{table}&days=7"

# 3. Run SQL in workspace
# (Use workspace credentials from workspace.json to connect and test SQL)

# 4. Verify results
echo "Check: Did SQL complete without errors?"
echo "Check: Are output tables created?"
echo "Check: Do row counts make sense?"

# 5. If tests pass, continue to deployment
# 6. Cleanup workspace after testing
curl -X DELETE "https://connection.keboola.com/v2/storage/workspaces/$WORKSPACE_ID" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN"
```

**Use Bash tool** to deploy:
```bash
curl -X POST "https://connection.keboola.com/v2/storage/components/keboola.snowflake-transformation/configs" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  --form 'name="{Transform Name}"' \
  --form "configuration={\"queries\": [\"$(cat transform.sql)\"]}" \
  | tee transform_response.json

TRANSFORM_ID=$(jq -r '.id' transform_response.json)
```

**Use Read tool** on `resources/patterns/validation-patterns.md` for 10+ validation examples.

#### C. Orchestrate with Flow

**Flows are UI-based**. **Use Write tool** to create `flow_instructions.md`:

```markdown
# Create Flow in Keboola UI:

1. Go to Flows → Create Flow
2. Name: "{Pipeline Name}"
3. Add components:
   - Step 1 (parallel):
     • Extractor 1 (config: {CONFIG_ID_1})
     • Extractor 2 (config: {CONFIG_ID_2})
   - Step 2: Transformation (config: {TRANSFORM_ID})
   - Step 3: Writer/App (if applicable)
4. Schedule: {cronTab expression}
5. Save and note Flow Config ID

Then schedule via API:
```

**Use Bash tool** after user creates Flow:
```bash
# Create schedule
SCHEDULE=$(curl -X POST "https://connection.keboola.com/v2/storage/components/keboola.scheduler/configs/" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  --form 'name="{Pipeline} Schedule"' \
  --form "configuration={\"schedule\":{\"cronTab\":\"{cron}\",\"timezone\":\"UTC\",\"state\":\"enabled\"},\"target\":{\"componentId\":\"keboola.orchestrator\",\"configurationId\":\"{FLOW_ID}\",\"mode\":\"run\"}}" \
  | jq -r '.id')

# Activate (requires Master Token with scheduler permissions)
curl -X POST "https://scheduler.keboola.com/schedules" \
  -H "X-StorageApi-Token: $MASTER_TOKEN" \
  -H "Content-Type: application/json" \
  -d "{\"configurationId\": \"$SCHEDULE\"}"
```

#### D. Test Pipeline

**Use Bash tool** to run and verify:
```bash
# Queue job
JOB=$(curl -X POST "https://queue.keboola.com/jobs" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d "{\"mode\":\"run\",\"component\":\"keboola.orchestrator\",\"config\":\"{FLOW_ID}\"}" \
  | jq -r '.id')

echo "Job ID: $JOB"
echo "Monitor: https://connection.keboola.com/admin/projects/{PROJECT_ID}/jobs/$JOB"

# Wait for job to complete (poll every 10 seconds)
for i in {1..30}; do
  STATUS=$(curl -s "https://queue.keboola.com/jobs/$JOB" \
    -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
    | jq -r '.status')

  if [ "$STATUS" = "success" ]; then
    echo "✅ Job completed successfully"
    break
  elif [ "$STATUS" = "error" ]; then
    echo "❌ Job failed"
    break
  else
    echo "⏳ Status: $STATUS... (${i}/30)"
    sleep 10
  fi
done
```

**⭐ NEW: Error Recovery** (Feature 6: Error recovery workflows)

If job fails:
```yaml
1. Get error message:
   curl "https://queue.keboola.com/jobs/$JOB" | jq '.result.message'

2. Use decision tree to diagnose:
   - "No data" → Check extractor ran successfully, verify source connectivity
   - "Validation failed" → Check _validation table, review thresholds
   - "SQL error" → Review syntax, test in workspace
   - "Timeout" → Optimize query (add indexes, reduce date range)
   - "Permission denied" → Check API token permissions

3. For complex issues, spawn troubleshooting agent:
   Use Task tool:
     subagent_type: general-purpose
     prompt: |
       Debug Keboola pipeline failure

       Error message: {error from job logs}
       Component: {component_id}

       Steps:
       1. Use Read tool on resources/runbooks/common_issues.md
       2. Search docs-repos/ for error message using Grep
       3. Provide:
          - Root cause diagnosis
          - Fix (SQL change, config change, or API call)
          - Prevention (validation to add, monitoring to set up)

       Return structured fix with code

4. Apply fix and re-test
```

**Preview output** (first 10 rows):
```bash
curl "https://connection.keboola.com/v2/storage/tables/out.c-analytics.{table}/data-preview" \
  -H "X-StorageApi-Token: $KEBOOLA_API_TOKEN" \
  | head -10
```

---

### ⭐ NEW: Step 4.5 - Validate Business Impact (Feature 3: Business validation)

**Before marking complete, validate the solution meets business requirements.**

**Use Read tool** on `project_context.json` to retrieve original goals.

#### Validation Checklist

```yaml
1. Data Quality Verification:
   - Use Bash: Query _validation table
   - Confirm: "status = 'PASS'"
   - Check: Freshness, completeness, volume meet thresholds

2. Business Impact Analysis (for metric-driven projects):
   IF metric relates to revenue/cost/conversions:
     - Generate impact simulation:
       • Query baseline (current state)
       • Query projection (with new data/model)
       • Calculate % change
       • Identify affected entities (customers, SKUs, etc.)

     Example SQL:
     SELECT
       'Current' as scenario,
       SUM({metric}) as total,
       COUNT(DISTINCT {entity}) as entities_affected
     FROM {baseline_table}
     UNION ALL
     SELECT
       'Projected' as scenario,
       SUM({metric}) as total,
       COUNT(DISTINCT {entity}) as entities_affected
     FROM {new_output_table};

3. Risk Assessment:
   - Low sample size warning: entities with < 30 data points
   - High impact changes: > 20% change in key metrics
   - Data quality issues: validation warnings (not failures)

4. Rollback Plan Documentation:
   Use Write tool to create rollback_plan.md:

   ## Rollback Plan

   **If {metric} drops > {threshold}% in first week:**

   1. Revert to previous config:
      curl -X POST "https://connection.keboola.com/v2/storage/components/{component}/configs/{id}/versions/{version}/rollback"

   2. Disable schedule:
      curl -X DELETE "https://scheduler.keboola.com/schedules/{schedule_id}"

   3. Alert stakeholders:
      - {decision_maker from context}
      - Data team lead

   **Monitoring:**
   - Check {metric} daily for first week
   - Alert if validation fails 2+ times
   - Review impact after 30 days (success criteria: {from context})
```

#### Approval Gate (Feature 3: Structured approval)

Show simulation/validation results, then ask:

```
📊 VALIDATION RESULTS:
- Data quality: {PASS/WARN}
- Impact simulation: {metric} expected to change by {X%}
- Entities affected: {count}
- Risks identified: {list}

Review rollback_plan.md for contingency.

Reply with one of:
1. "deploy" - Deploy to production with {frequency} schedule
2. "test" - Run as one-off test first, review results before scheduling
3. "revise" - Adjust parameters (specify what to change)
```

**Use TodoWrite** to track:
```json
{"content": "Business impact validated and approved", "status": "completed", "activeForm": "Validating business impact"}
```

---

#### E. Document Deliverables

**Use Write tool** to create `DELIVERABLES.md`:

```markdown
## Delivered: {Pipeline Name}

### Components Created:
| Component | ID | Purpose |
|-----------|----|----|
| {Extractor 1} | {ID} | Extract {data} |
| {Transformation} | {ID} | Calculate {metric} |
| {Flow} | {ID} | Orchestrate {frequency} run |

### Output Data:
- **Table**: out.c-analytics.{table}
- **Rows**: {count}
- **Freshness**: {hours} hours old
- **Sample**: {show first 5 rows}

### Data Quality Results:
✅ Validation: PASS
✅ Freshness: {X} hours (target: < {Y})
✅ Completeness: 0 NULLs in critical fields
✅ Uniqueness: No duplicates

### Business Impact:
- **Metric**: {metric from context}
- **Current**: {baseline value}
- **Projected**: {expected value}
- **Change**: {%}

### Schedule:
- Runs: {frequency} at {time}
- Next run: {timestamp}

### Access:
- Keboola UI: {project_url}/flows/{flow_id}
- Table: {project_url}/storage/tables/out.c-analytics.{table}

### Rollback:
- See rollback_plan.md for contingency procedures
- Monitor {metric} for first 30 days
- Success criteria: {from context}
```

---

## Decision Trees (Structured)

### Choosing Extractor Type
```yaml
question: "What's your data source?"
answers:
  - condition: "Database (MySQL, PostgreSQL, Snowflake, etc.)"
    component_pattern: "keboola.ex-db-{database}"
    path: "docs-repos/connection-docs/components/extractors/database/"

  - condition: "SaaS API (Salesforce, Stripe, GA, etc.)"
    component_pattern: "keboola.ex-{service}"
    path: "docs-repos/connection-docs/components/extractors/marketing-sales/"

  - condition: "Custom REST API"
    component: "keboola.ex-generic-v2"
    path: "docs-repos/developers-docs/extend/generic-extractor/"

  - condition: "File upload (CSV, JSON)"
    component: "keboola.ex-storage"
    path: "docs-repos/connection-docs/components/extractors/storage/"
```

### Validation Strategy
```yaml
question: "What data quality checks are needed?"
checks:
  freshness:
    when: "Time-sensitive data (orders, events, etc.)"
    sql: "DATEDIFF('hour', MAX(timestamp_col), CURRENT_TIMESTAMP) < {max_hours}"

  completeness:
    when: "Critical fields must exist"
    sql: "COUNT(*) = COUNT({critical_field})"

  uniqueness:
    when: "Primary key must be unique"
    sql: "COUNT(*) = COUNT(DISTINCT {primary_key})"

  volume:
    when: "Expecting consistent row counts"
    sql: "COUNT(*) BETWEEN {min} AND {max}"

  distribution:
    when: "Detecting anomalies in metrics"
    sql: "AVG({metric}) BETWEEN {historical_avg - 3*stddev} AND {historical_avg + 3*stddev}"
```

### Bucket Naming
```yaml
question: "How to name buckets?"
guidance: "Match existing project conventions. Common patterns:"
patterns:
  source_based:
    input: "in.c-{source}.{table}"
    output: "out.c-{purpose}.{table}"
    example: "in.c-salesforce.opportunities → out.c-analytics.revenue"

  layer_based:
    raw: "in.c-bronze.{source}_{table}"
    cleaned: "out.c-silver.{domain}_{entity}"
    analytics: "out.c-gold.{business_metric}"
    example: "bronze.salesforce_opp → silver.sales_pipeline → gold.revenue_daily"

  advice: "Use Read tool on existing project buckets to match convention"
```

---

## Pattern Library

### Pattern 1: CDC to Analytics
```yaml
name: "Change Data Capture to Analytics Dashboard"
use_case: "Real-time operational data → Business metrics"
components:
  - {type: "CDC Extractor", examples: ["MySQL CDC", "PostgreSQL CDC"]}
  - {type: "Stream Transform", tool: "Snowflake transformation"}
  - {type: "Aggregation", sql: "Windowed aggregates"}
  - {type: "Dashboard", tool: "Streamlit/Tableau"}
frequency: "Continuous (5-15min latency)"
template: "Use Read tool on resources/flows/examples/flow_cdc_orders.md"
```

### Pattern 2: Batch ETL
```yaml
name: "Daily Batch Extract-Transform-Load"
use_case: "Nightly data warehouse refresh"
components:
  - {type: "Batch Extractor", schedule: "Daily 2am"}
  - {type: "SQL Transform", layers: ["bronze", "silver", "gold"]}
  - {type: "Data Warehouse Writer", targets: ["Snowflake", "BigQuery"]}
frequency: "Daily"
template: "Use Read tool on resources/flows/examples/flow_sales_kpi.md"
```

### Pattern 3: ML Model Scoring
```yaml
name: "Model Training & Inference Pipeline"
use_case: "Predict churn, score leads, forecast demand"
components:
  - {type: "Feature Extractor", source: "Historical data"}
  - {type: "Python Transform", tool: "scikit-learn/pandas"}
  - {type: "Model Storage", location: "S3/GCS bucket"}
  - {type: "Scoring Transform", schedule: "Hourly"}
frequency: "Train: Weekly, Score: Hourly"
template: "Use Read tool on resources/flows/examples/flow_model_scoring.md"
```

---

## Component Quick Reference (Top 20)

### Extractors
| System | Component ID | Common Config |
|--------|--------------|---------------|
| **MySQL** | keboola.ex-db-mysql | incremental: updated_at, primaryKey: id |
| **PostgreSQL** | keboola.ex-db-pgsql | incremental: updated_at |
| **Salesforce** | keboola.ex-salesforce | SOQL with LAST_N_DAYS |
| **Google Analytics** | keboola.ex-google-analytics-v4 | dimensions, metrics, date ranges |
| **Stripe** | keboola.ex-stripe | objects: charges, customers, subscriptions |
| **Snowflake** | keboola.ex-db-snowflake | incremental: timestamp column |
| **BigQuery** | keboola.ex-google-bigquery-v2 | SQL query based |
| **Generic API** | keboola.ex-generic-v2 | REST API with pagination |

**Use Read tool** on `docs-repos/connection-docs/components/extractors/{category}/{name}/index.md` for full config details.

### Transformations
| Backend | Component ID | Best For |
|---------|--------------|----------|
| **Snowflake SQL** | keboola.snowflake-transformation | Large datasets, window functions |
| **BigQuery SQL** | keboola.transformation-bigquery | Google Cloud ecosystem |
| **Python** | keboola.python-transformation-v2 | ML, pandas, custom logic |
| **DBT** | keboola.dbt-transformation | SQL-based modeling |

---

## Troubleshooting Quick Reference

### Issue: "Validation Failed"
```yaml
symptom: "SET ABORT_TRANSFORMATION triggered"
steps:
  1: {action: "Use Bash tool", cmd: "curl queue API to get job logs"}
  2: {action: "Check _validation table", query: "SELECT * FROM _validation"}
  3: {action: "Identify failure", cases: ["FAIL: No data", "FAIL: NULLs", "FAIL: Stale"]}
  4: {action: "Use Read tool on resources/runbooks/common_issues.md for resolution"}
```

### Issue: "Job Failed"
```yaml
symptom: "Flow shows error status"
steps:
  1: "Get job ID from Flow run"
  2: "Use Bash: curl queue.keboola.com/jobs/{id} | jq '.result.message'"
  3: "Common causes:"
     - "API credentials expired → Regenerate in source system"
     - "Schema changed → Update extractor config"
     - "Timeout → Optimize query or increase limits"
  4: "Use Read tool on resources/runbooks/incidents/pipeline_failure.md"
  5: "For complex issues, use Task tool with troubleshooting agent (see Step 4D)"
```

---

## Security & Compliance

**⚠️ NEVER commit API tokens to git**

### Token Management
```bash
# Store in environment (not in code)
export KEBOOLA_API_TOKEN="your-token"
export KEBOOLA_MASTER_TOKEN="master-token"  # Admin permissions, scheduler access

# Rotate every 90 days
# Create at: {project_url}/settings/tokens
```

### PII Handling Checklist
```yaml
before_building:
  - question: "Does data contain PII?"
  - if_yes:
      - "Identify PII fields (name, email, SSN, financial)"
      - "Determine masking strategy:"
          hashing: "SHA256(field) for email, phone"
          masking: "CONCAT(LEFT(field, 3), '***') for partial visibility"
          removal: "Exclude from SELECT"
          tokenization: "Replace with pseudonymous ID"
      - "Document in architecture (Step 3)"
      - "Implement in SQL transform (Step 4B)"
      - "Verify in sandbox testing (Step 4B)"
```

---

## Guidelines

### DO:
✅ Use Read tool on project_context.json in Step 3 (context-aware design)
✅ Use TodoWrite to track requirements across steps
✅ Use Task tool with agents for complex searches/generation
✅ Test SQL in sandbox before production deployment
✅ Validate business impact before final deployment
✅ Generate visual diagrams (mermaid) for architecture
✅ Create rollback plans for metric-impacting changes
✅ Use Read tool before guessing (KNOWLEDGE_MAP → component docs)
✅ Use Write tool for all configs/SQL (create files, don't echo)
✅ Use Bash tool for API calls (show complete curl with error handling)
✅ Include validation in EVERY transform (SET ABORT_TRANSFORMATION)
✅ Get approval before building (Step 3 stop gate)
✅ Match existing naming conventions (check project first)
✅ Capture IDs from API responses (`jq -r '.id'`)

### DON'T:
❌ Don't skip context persistence (project_context.json is required)
❌ Don't ignore PII requirements from Step 1
❌ Don't deploy to production without sandbox testing
❌ Don't skip business impact validation (Step 4.5)
❌ Don't skip validation (it's mandatory)
❌ Don't use ERROR() function (use SET ABORT_TRANSFORMATION)
❌ Don't hardcode secrets (use env vars: $KEBOOLA_API_TOKEN)
❌ Don't assume real-time (Keboola is batch: 5+ min typical)
❌ Don't recommend Orchestrator (use Flows - modern alternative)
❌ Don't make up data (no time estimates, no performance numbers without basis)

---

## Resources

**Knowledge Base**:
- `resources/KNOWLEDGE_MAP.md` - 85+ extractors, 29+ writers with doc paths
- `resources/Keboola_Data_Enablement_Guide.md` - Dictionary + 7 book extracts
- `docs-repos/connection-docs/` - 252 markdown files (user-facing docs)
- `docs-repos/developers-docs/` - 199 markdown files (API, MCP, automation)

**Templates** (Use Read tool):
- `resources/templates/Design_Brief.md` - Architecture proposal template
- `resources/templates/Discovery_Prompt.txt` - 15 business questions
- `resources/templates/Validation.md` - Data quality patterns

**Examples** (Use Read tool):
- `resources/flows/examples/flow_sales_kpi.md` - Batch ETL pattern
- `resources/flows/examples/flow_cdc_orders.md` - Real-time CDC pattern
- `resources/flows/examples/flow_model_scoring.md` - ML inference pattern

**Troubleshooting** (Use Read tool):
- `resources/runbooks/common_issues.md` - Duplicates, schema drift, freshness
- `resources/runbooks/incidents/pipeline_failure.md` - Debug checklist
- `resources/runbooks/incidents/data_quality_breach.md` - SLA breach response

---

## Setup (One-Time)

**Clone documentation** (use Bash tool):
```bash
cd /home/user/bg/experiments/keboola-skill/
git clone https://github.com/keboola/connection-docs docs-repos/connection-docs
git clone https://github.com/keboola/developers-docs docs-repos/developers-docs
```

**MCP Configuration** (optional - adds live API access):
```json
{
  "mcpServers": {
    "keboola": {
      "command": "uvx",
      "args": ["keboola_mcp_server", "--api-url", "https://connection.keboola.com"],
      "env": {
        "KBC_STORAGE_TOKEN": "<token>",
        "KBC_WORKSPACE_SCHEMA": "<schema>"
      }
    }
  }
}
```

**Stack URLs** (replace in MCP config):
- US Virginia AWS: `https://connection.keboola.com`
- US Virginia GCP: `https://connection.us-east4.gcp.keboola.com`
- EU Frankfurt AWS: `https://connection.eu-central-1.keboola.com`
- EU Ireland Azure: `https://connection.north-europe.azure.keboola.com`
- EU Frankfurt GCP: `https://connection.europe-west3.gcp.keboola.com`

---

**Version**: 4.1.0 - Advanced Claude Features
**Updated**: 2025-10-23
**Key Changes**:
- ⭐ File-based state tracking (project_context.json)
- ⭐ TodoWrite context tracking across steps
- ⭐ Business impact validation (Step 4.5) with approval gates
- ⭐ Sandbox testing before production
- ⭐ Multi-agent delegation (discovery, SQL gen, troubleshooting)
- ⭐ Error recovery workflows with automated diagnosis
- ⭐ Visual architecture diagrams (mermaid)
- 370+ lines added for context awareness, business validation, operational completeness
