---
name: data-pipeline-gen
description: |
  트리거: "데이터 파이프라인", "celery task", "kafka consumer", "파이프라인 만들어줘", "비동기 작업",
  "airflow dag", "airflow 만들어줘", "배치 파이프라인", "rabbitmq", "메시지 큐",
  "etl 파이프라인", "데이터 처리", "스케줄러 만들어줘"
  수행: 요구사항 분석 → Celery / Kafka / Airflow DAG / RabbitMQ 파이프라인 코드 생성 → 재시도/에러 핸들링/DLQ/모니터링 포함
  출력: 완전한 파이프라인 코드 + 설정 파일 + 모니터링 가이드
---
# Data Pipeline Generator

## 목적
Celery, Kafka, Apache Airflow, RabbitMQ 기반의 프로덕션 수준 데이터 파이프라인 코드를 생성한다.
재시도 전략, 에러 핸들링, Dead Letter Queue, 모니터링까지 포함한 완전한 구현을 제공한다.

---

## 파이프라인 유형 선택 가이드

| 요구사항 | 권장 도구 |
|----------|---------|
| 비동기 태스크 큐, 단순 백그라운드 작업 | **Celery** |
| 고처리량 이벤트 스트리밍 (초당 수만 건) | **Kafka** |
| 시간 기반 배치 작업, 복잡한 의존성 DAG | **Airflow** |
| 경량 메시지 큐, 서비스 간 비동기 통신 | **RabbitMQ** |

---

## 실행 절차

### 1단계: 요구사항 파악

다음을 확인한다.
- 파이프라인 유형: Celery / Kafka / Airflow / RabbitMQ
- 데이터 소스 및 목적지
- 처리량 요구사항 (초당 메시지 수)
- 실패 처리 정책 (재시도 횟수, 백오프 전략)
- 스케줄 요구사항 (배치인 경우 cron 표현식)
- 모니터링 도구 (Flower, Prometheus, Airflow UI 등)

---

### 2단계: Celery 파이프라인 생성

**프로젝트 구조:**
```
pipeline/
├── config.py
├── celery_app.py
├── tasks/
│   ├── extract.py
│   ├── transform.py
│   └── load.py
└── monitoring/metrics.py
```

**celery_app.py:**
```python
from celery import Celery
from kombu import Exchange, Queue

app = Celery('data_pipeline')
app.config_from_object('config.CeleryConfig')

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default',
          queue_arguments={'x-dead-letter-exchange': 'dlx'}),
    Queue('high_priority', Exchange('high_priority'), routing_key='high'),
    Queue('dead_letter', Exchange('dlx'), routing_key='dead'),
)
```

**config.py:**
```python
import os

class CeleryConfig:
    broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
    result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    task_acks_late = True
    task_reject_on_worker_lost = True
    task_max_retries = 3
    task_soft_time_limit = 300
    task_time_limit = 360
    worker_concurrency = 4
    worker_prefetch_multiplier = 1
    worker_send_task_events = True
    task_send_sent_event = True
```

**tasks/extract.py:**
```python
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(
    bind=True,
    name='pipeline.extract.fetch_data',
    max_retries=3,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
    queue='default',
    acks_late=True,
)
def fetch_data(self, source_config: dict) -> dict:
    task_id = self.request.id
    logger.info(f"[{task_id}] Extracting from {source_config['type']}")
    try:
        extractor = ExtractorFactory.create(source_config['type'])
        records = extractor.fetch(source_config['params'])
        logger.info(f"[{task_id}] Extracted {len(records)} records")
        return {'records': records, 'meta': {'count': len(records)}}
    except SoftTimeLimitExceeded:
        extractor.close()
        raise
    except Exception as exc:
        logger.error(f"[{task_id}] Extraction failed: {exc}", exc_info=True)
        if self.request.retries >= self.max_retries:
            send_to_dead_letter_queue(self.name, self.request.args, str(exc))
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
```

**ETL 체인 실행:**
```python
from celery import chain

def run_etl_pipeline(source_config, transform_config, load_config):
    pipeline = chain(
        fetch_data.s(source_config),
        process_records.s(transform_config),
        load_records.s(load_config),
    )
    return pipeline.apply_async().id
```

**실행 명령어:**
```bash
celery -A celery_app worker -Q default,high_priority --loglevel=info
celery -A celery_app flower  # 모니터링 UI
```

---

### 3단계: Kafka Consumer 파이프라인 생성

**kafka_consumer.py:**
```python
import asyncio
import json
import signal
import logging
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from prometheus_client import Counter, Histogram, start_http_server

logger = logging.getLogger(__name__)

MESSAGES_CONSUMED = Counter('kafka_messages_total', 'Messages consumed', ['topic', 'status'])
PROCESSING_TIME = Histogram('kafka_processing_seconds', 'Processing time', ['topic'])
DLQ_MESSAGES = Counter('kafka_dlq_total', 'DLQ messages', ['topic', 'error_type'])


class DataPipelineConsumer:
    def __init__(self, config: dict):
        self.config = config
        self.consumer = None
        self.producer = None
        self._running = False

    async def start(self):
        self.consumer = AIOKafkaConsumer(
            *self.config['topics'],
            bootstrap_servers=self.config['bootstrap_servers'],
            group_id=self.config['group_id'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            max_poll_records=self.config.get('batch_size', 100),
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        )
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.config['bootstrap_servers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            acks='all',
            enable_idempotence=True,
        )
        await self.consumer.start()
        await self.producer.start()
        self._running = True

    async def stop(self):
        self._running = False
        if self.consumer: await self.consumer.stop()
        if self.producer: await self.producer.stop()

    async def run(self):
        signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(self.stop()))
        async for batch in self._consume_batches():
            await self._process_batch(batch)

    async def _consume_batches(self):
        while self._running:
            try:
                records = await asyncio.wait_for(
                    self.consumer.getmany(timeout_ms=1000, max_records=100), timeout=5.0
                )
                if records:
                    yield [msg for tp_msgs in records.values() for msg in tp_msgs]
            except asyncio.TimeoutError:
                continue

    async def _process_batch(self, messages: list):
        failed = []
        for msg in messages:
            with PROCESSING_TIME.labels(topic=msg.topic).time():
                try:
                    await self._process_message(msg.value)
                    MESSAGES_CONSUMED.labels(topic=msg.topic, status='success').inc()
                except Exception as e:
                    MESSAGES_CONSUMED.labels(topic=msg.topic, status='error').inc()
                    failed.append((msg, e))

        for msg, error in failed:
            await self._send_to_dlq(msg, error)
        await self.consumer.commit()

    async def _process_message(self, payload: dict):
        raise NotImplementedError

    async def _send_to_dlq(self, msg, error: Exception):
        dlq_topic = f"{msg.topic}.dlq"
        await self.producer.send_and_wait(dlq_topic, value={
            'original_topic': msg.topic,
            'original_offset': msg.offset,
            'payload': msg.value,
            'error_type': type(error).__name__,
            'error_message': str(error),
        })
        DLQ_MESSAGES.labels(topic=msg.topic, error_type=type(error).__name__).inc()
```

---

### 4단계: Apache Airflow DAG 생성

**기본 ETL DAG:**
```python
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

with DAG(
    dag_id='etl_sales_pipeline',
    default_args=default_args,
    description='일별 매출 데이터 ETL 파이프라인',
    schedule='0 2 * * *',           # 매일 오전 2시
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'sales', 'daily'],
) as dag:

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule=TriggerRule.ALL_DONE)

    def extract_sales(**context):
        """외부 소스에서 매출 데이터를 추출한다."""
        execution_date = context['ds']  # 'YYYY-MM-DD'
        # 실제 추출 로직
        records = fetch_from_source(date=execution_date)

        # XCom으로 다음 태스크에 전달 (소량 메타데이터만)
        context['task_instance'].xcom_push(key='record_count', value=len(records))

        # 대용량 데이터는 중간 저장소(S3, GCS) 사용
        save_to_staging(records, path=f"s3://bucket/staging/{execution_date}/")
        return f"s3://bucket/staging/{execution_date}/"

    def transform_sales(**context):
        """데이터 정제 및 변환."""
        ti = context['task_instance']
        staging_path = ti.xcom_pull(task_ids='extract_sales')
        records = load_from_staging(staging_path)

        # 변환 로직
        transformed = [transform_record(r) for r in records]
        save_to_staging(transformed, path=staging_path.replace('/staging/', '/transformed/'))

    def load_sales(**context):
        """변환된 데이터를 데이터 웨어하우스에 적재한다."""
        ti = context['task_instance']
        transformed_path = ti.xcom_pull(task_ids='transform_sales')
        load_to_warehouse(path=transformed_path, table='fact_sales')

    def validate_load(**context):
        """적재 완료 후 데이터 품질을 검증한다."""
        execution_date = context['ds']
        count = query_warehouse(f"SELECT COUNT(*) FROM fact_sales WHERE date='{execution_date}'")
        expected = context['task_instance'].xcom_pull(
            task_ids='extract_sales', key='record_count'
        )
        if count != expected:
            raise ValueError(f"품질 검증 실패: 예상 {expected}건, 실제 {count}건")

    def notify_failure(context):
        """실패 시 슬랙 알림."""
        dag_run = context['dag_run']
        task = context['task']
        message = f"❌ DAG 실패: {dag_run.dag_id} / Task: {task.task_id}"
        send_slack_notification(message)

    t_extract = PythonOperator(
        task_id='extract_sales',
        python_callable=extract_sales,
        on_failure_callback=notify_failure,
    )
    t_transform = PythonOperator(
        task_id='transform_sales',
        python_callable=transform_sales,
        on_failure_callback=notify_failure,
    )
    t_load = PythonOperator(
        task_id='load_sales',
        python_callable=load_sales,
        on_failure_callback=notify_failure,
    )
    t_validate = PythonOperator(
        task_id='validate_load',
        python_callable=validate_load,
        on_failure_callback=notify_failure,
    )

    # 의존성 체인
    start >> t_extract >> t_transform >> t_load >> t_validate >> end
```

**병렬 처리 DAG (팬아웃 패턴):**
```python
from airflow.operators.python import PythonOperator

# 여러 소스를 병렬로 처리 후 통합
with DAG(dag_id='parallel_etl', schedule='@daily', ...) as dag:

    sources = ['sales', 'inventory', 'customers']

    extract_tasks = [
        PythonOperator(
            task_id=f'extract_{source}',
            python_callable=extract_source,
            op_kwargs={'source': source},
        )
        for source in sources
    ]

    merge = PythonOperator(task_id='merge_all', python_callable=merge_sources)
    load = PythonOperator(task_id='load_warehouse', python_callable=load_to_dw)

    # 팬아웃: 모든 extract가 완료된 후 merge 실행
    extract_tasks >> merge >> load
```

**Airflow 설정 (docker-compose):**
```yaml
# docker-compose.airflow.yml
services:
  airflow-webserver:
    image: apache/airflow:2.8.0
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: 'your-secret-key'
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    command: webserver

  airflow-scheduler:
    image: apache/airflow:2.8.0
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    command: scheduler
```

---

### 5단계: RabbitMQ Consumer 파이프라인 생성

**rabbitmq_consumer.py:**
```python
import pika
import json
import logging
import time
from typing import Callable

logger = logging.getLogger(__name__)


class RabbitMQPipeline:
    def __init__(self, config: dict):
        self.config = config
        self.connection = None
        self.channel = None

    def connect(self):
        credentials = pika.PlainCredentials(
            self.config.get('username', 'guest'),
            self.config.get('password', 'guest'),
        )
        parameters = pika.ConnectionParameters(
            host=self.config.get('host', 'localhost'),
            port=self.config.get('port', 5672),
            virtual_host=self.config.get('vhost', '/'),
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300,
        )
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self._setup_queues()

    def _setup_queues(self):
        # 메인 큐
        self.channel.queue_declare(
            queue=self.config['queue'],
            durable=True,
            arguments={
                'x-dead-letter-exchange': 'dlx',
                'x-message-ttl': 86400000,  # 24시간
                'x-max-retries': 3,
            }
        )
        # Dead Letter Exchange + 큐
        self.channel.exchange_declare(exchange='dlx', exchange_type='direct')
        self.channel.queue_declare(queue=f"{self.config['queue']}.dlq", durable=True)
        self.channel.queue_bind(
            queue=f"{self.config['queue']}.dlq",
            exchange='dlx',
            routing_key=self.config['queue'],
        )
        # QoS: 한 번에 하나씩 처리
        self.channel.basic_qos(prefetch_count=1)

    def consume(self, handler: Callable):
        def callback(ch, method, properties, body):
            try:
                payload = json.loads(body)
                retry_count = (properties.headers or {}).get('x-retry-count', 0)
                logger.info(f"Processing message (retry={retry_count}): {method.delivery_tag}")

                handler(payload)
                ch.basic_ack(delivery_tag=method.delivery_tag)
                logger.info(f"Message acknowledged: {method.delivery_tag}")

            except Exception as e:
                logger.error(f"Processing failed: {e}", exc_info=True)
                retry_count = (properties.headers or {}).get('x-retry-count', 0)

                if retry_count < 3:
                    # 재시도: 지수 백오프
                    time.sleep(2 ** retry_count)
                    new_headers = {**(properties.headers or {}), 'x-retry-count': retry_count + 1}
                    ch.basic_publish(
                        exchange='',
                        routing_key=self.config['queue'],
                        body=body,
                        properties=pika.BasicProperties(
                            delivery_mode=2,  # 영구 저장
                            headers=new_headers,
                        )
                    )
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                else:
                    # 재시도 소진 → DLQ로 이동 (nack + requeue=False)
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    logger.warning(f"Message sent to DLQ after {retry_count} retries")

        self.channel.basic_consume(
            queue=self.config['queue'],
            on_message_callback=callback,
        )
        logger.info(f"Waiting for messages on queue: {self.config['queue']}")
        self.channel.start_consuming()

    def publish(self, message: dict, queue: str | None = None):
        self.channel.basic_publish(
            exchange='',
            routing_key=queue or self.config['queue'],
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2),
        )


# 사용 예시
def process_order(payload: dict):
    order_id = payload['order_id']
    logger.info(f"Processing order: {order_id}")
    # 비즈니스 로직

if __name__ == '__main__':
    pipeline = RabbitMQPipeline(config={
        'host': 'localhost',
        'queue': 'orders',
    })
    pipeline.connect()
    pipeline.consume(handler=process_order)
```

---

### 6단계: 모니터링 설정

**docker-compose.monitoring.yml:**
```yaml
version: '3.8'
services:
  flower:
    image: mher/flower:2.0
    command: celery flower --broker=redis://redis:6379/0 --port=5555
    ports: ["5555:5555"]

  prometheus:
    image: prom/prometheus:latest
    volumes: [./prometheus.yml:/etc/prometheus/prometheus.yml]
    ports: ["9090:9090"]

  grafana:
    image: grafana/grafana:latest
    ports: ["3000:3000"]
    environment: [GF_SECURITY_ADMIN_PASSWORD=admin]

  rabbitmq:
    image: rabbitmq:3-management
    ports: ["5672:5672", "15672:15672"]  # 15672: 관리 UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
```

---

## 출력 형식

```
## 데이터 파이프라인 생성 결과

### 파이프라인 유형: [Celery / Kafka / Airflow / RabbitMQ]

### 생성된 파일
- 파이프라인 코드 (메인)
- 설정 파일
- docker-compose.yml

### 핵심 설계 결정
- 재시도: 최대 3회, 지수 백오프
- 실패 처리: Dead Letter Queue
- 커밋 전략: 처리 완료 후 수동 커밋
- 모니터링: [Flower / Prometheus+Grafana / Airflow UI / RabbitMQ Management]

### 실행 방법
[파이프라인별 시작 명령어]
```

---

## 주의사항

- **Celery**: `task_acks_late=True`와 `task_reject_on_worker_lost=True`를 함께 설정해야 워커 크래시 시 메시지 유실 방지
- **Kafka**: `enable_auto_commit=False`로 설정하고 처리 완료 후 수동 커밋
- **Airflow**: XCom은 소량 메타데이터 전달용으로만 사용. 대용량 데이터는 S3/GCS 경유
- **Airflow**: `catchup=False` 기본 설정 권장 (과거 날짜 소급 실행 방지)
- **RabbitMQ**: `basic_qos(prefetch_count=1)`로 공정 분배 보장
- 배치 크기는 처리 시간과 메모리 사용량을 고려해 조정
- DLQ 메시지는 주기적으로 재처리 또는 알람 연동 필요
