---
name: rag-pipeline-gen
description: |
  트리거: "RAG 파이프라인", "벡터 검색", "문서 임베딩", "RAG 만들어줘", "retrieval augmented generation"
  수행: 문서 청킹 전략 설계 → 임베딩 → 벡터 DB 저장 → 검색 파이프라인 코드 생성
  출력: 완전한 RAG 파이프라인 코드 (LangChain 또는 LlamaIndex 기반) + 설정 가이드
---

# RAG Pipeline Generator

## 목적

문서 청킹, 임베딩, 벡터 DB 저장, 시맨틱 검색까지 포함한 완전한 RAG(Retrieval-Augmented Generation) 파이프라인을 생성한다. LangChain 또는 LlamaIndex 기반으로 프로덕션 수준의 코드를 제공한다.

## 실행 절차

### 1단계: 요구사항 파악

다음을 확인한다.
- 문서 유형: PDF, Markdown, HTML, 코드, 데이터베이스 레코드
- 벡터 DB: Chroma(로컬), Pinecone(클라우드), pgvector(PostgreSQL), Weaviate
- 임베딩 모델: OpenAI text-embedding-3-small/large, Cohere, HuggingFace
- LLM: GPT-4o, Claude 3.5, Llama 3
- 검색 전략: 시맨틱, 하이브리드(BM25+벡터), MMR

### 2단계: 청킹 전략 선택

| 문서 유형 | 권장 전략 | 청크 크기 |
|----------|-----------|---------|
| 일반 텍스트 | RecursiveCharacterTextSplitter | 512-1024 토큰 |
| 마크다운 | MarkdownHeaderTextSplitter | 섹션 단위 |
| 코드 | Language-aware splitter | 함수/클래스 단위 |
| HTML | HTMLHeaderTextSplitter | 헤더 계층 기준 |
| 테이블/CSV | 행 단위 or 의미 단위 | 5-20행 |

**청크 크기 선택 원칙:**
- 청크가 너무 작으면: 컨텍스트 유실, 검색 정밀도 저하
- 청크가 너무 크면: 관련 없는 내용 포함, LLM 컨텍스트 낭비
- `chunk_overlap`은 청크 크기의 10-20% 권장 (문맥 연속성 보장)

### 3단계: LangChain 기반 파이프라인

**프로젝트 구조:**
```
rag/
├── ingest.py          # 문서 수집·청킹·임베딩·저장
├── retriever.py       # 검색 전략 (시맨틱/하이브리드)
├── chain.py           # RAG 체인 구성
├── config.py          # 설정 관리
└── evaluate.py        # 검색 품질 평가
```

**config.py:**
```python
from pydantic_settings import BaseSettings
from typing import Literal

class RAGConfig(BaseSettings):
    # 임베딩
    embedding_model: str = "text-embedding-3-small"
    embedding_dimensions: int = 1536

    # 청킹
    chunk_size: int = 512
    chunk_overlap: int = 64

    # 벡터 DB
    vector_db: Literal["chroma", "pinecone", "pgvector"] = "chroma"
    chroma_persist_dir: str = "./chroma_db"
    collection_name: str = "documents"

    # 검색
    top_k: int = 5
    search_type: Literal["similarity", "mmr", "hybrid"] = "mmr"
    mmr_lambda: float = 0.5     # 다양성 vs 관련성 균형

    # LLM
    llm_model: str = "gpt-4o-mini"
    temperature: float = 0.1
    max_tokens: int = 1024

    class Config:
        env_file = ".env"
```

**ingest.py:**
```python
import logging
from pathlib import Path
from typing import List

from langchain_community.document_loaders import (
    PyPDFLoader,
    DirectoryLoader,
    UnstructuredMarkdownLoader,
    WebBaseLoader,
)
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
)
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document

from config import RAGConfig

logger = logging.getLogger(__name__)


class DocumentIngestor:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.embeddings = OpenAIEmbeddings(
            model=config.embedding_model,
            dimensions=config.embedding_dimensions,
        )
        self.vectorstore = Chroma(
            collection_name=config.collection_name,
            embedding_function=self.embeddings,
            persist_directory=config.chroma_persist_dir,
        )

    def load_documents(self, source: str | Path) -> List[Document]:
        """소스 유형을 자동 감지해 문서를 로드한다."""
        source = str(source)

        if source.startswith(('http://', 'https://')):
            loader = WebBaseLoader(source)
        elif source.endswith('.pdf'):
            loader = PyPDFLoader(source)
        elif source.endswith('.md'):
            loader = UnstructuredMarkdownLoader(source)
        elif Path(source).is_dir():
            loader = DirectoryLoader(
                source,
                glob="**/*.{pdf,md,txt}",
                show_progress=True,
                use_multithreading=True,
            )
        else:
            raise ValueError(f"Unsupported source: {source}")

        docs = loader.load()
        logger.info(f"Loaded {len(docs)} documents from {source}")
        return docs

    def split_documents(self, documents: List[Document]) -> List[Document]:
        """문서를 청크로 분할하고 메타데이터를 보강한다."""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap,
            separators=["\n\n", "\n", ".", "!", "?", ",", " "],
            length_function=len,
            add_start_index=True,   # 원본 문서 내 위치 추적
        )
        chunks = splitter.split_documents(documents)

        # 청크 번호 메타데이터 추가
        for i, chunk in enumerate(chunks):
            chunk.metadata['chunk_index'] = i
            chunk.metadata['chunk_count'] = len(chunks)

        logger.info(f"Split into {len(chunks)} chunks")
        return chunks

    def ingest(self, source: str | Path, batch_size: int = 100) -> int:
        """문서를 로드·분할·임베딩·저장하는 전체 파이프라인."""
        docs = self.load_documents(source)
        chunks = self.split_documents(docs)

        # 배치 처리로 API 레이트 리밋 방지
        total = 0
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            self.vectorstore.add_documents(batch)
            total += len(batch)
            logger.info(f"Ingested {total}/{len(chunks)} chunks")

        return total
```

**retriever.py:**
```python
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.retrievers import (
    EnsembleRetriever,
    ContextualCompressionRetriever,
)
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_community.retrievers import BM25Retriever
from langchain_openai import ChatOpenAI
from config import RAGConfig


class RAGRetriever:
    def __init__(self, config: RAGConfig, documents=None):
        self.config = config
        embeddings = OpenAIEmbeddings(model=config.embedding_model)
        self.vectorstore = Chroma(
            collection_name=config.collection_name,
            embedding_function=embeddings,
            persist_directory=config.chroma_persist_dir,
        )
        self._documents = documents  # BM25용 원본 문서

    def get_retriever(self):
        """설정에 따라 적절한 검색기를 반환한다."""
        if self.config.search_type == "similarity":
            return self._similarity_retriever()
        elif self.config.search_type == "mmr":
            return self._mmr_retriever()
        elif self.config.search_type == "hybrid":
            return self._hybrid_retriever()
        raise ValueError(f"Unknown search type: {self.config.search_type}")

    def _similarity_retriever(self):
        return self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": self.config.top_k},
        )

    def _mmr_retriever(self):
        """MMR: 관련성과 다양성을 균형 있게 검색한다."""
        return self.vectorstore.as_retriever(
            search_type="mmr",
            search_kwargs={
                "k": self.config.top_k,
                "fetch_k": self.config.top_k * 4,   # 후보 풀
                "lambda_mult": self.config.mmr_lambda,
            },
        )

    def _hybrid_retriever(self):
        """BM25(키워드) + 벡터(시맨틱) 앙상블 검색."""
        if not self._documents:
            raise ValueError("Documents required for hybrid retrieval")

        bm25_retriever = BM25Retriever.from_documents(
            self._documents,
            k=self.config.top_k,
        )
        vector_retriever = self._mmr_retriever()

        return EnsembleRetriever(
            retrievers=[bm25_retriever, vector_retriever],
            weights=[0.4, 0.6],   # BM25 40%, 벡터 60%
        )

    def get_compressed_retriever(self):
        """LLM으로 검색 결과를 압축·재순위화한다 (비용 증가)."""
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        compressor = LLMChainExtractor.from_llm(llm)
        return ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=self.get_retriever(),
        )
```

**chain.py:**
```python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.messages import HumanMessage, AIMessage
from config import RAGConfig
from retriever import RAGRetriever


RAG_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant that answers questions based on the provided context.

## Instructions
- Answer ONLY based on the provided context
- If the answer is not in the context, say "제공된 문서에서 해당 정보를 찾을 수 없습니다."
- Cite the source document when possible
- Be concise and accurate

## Context
{context}"""),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{question}"),
])


def format_docs(docs) -> str:
    """검색된 문서를 LLM 입력용 텍스트로 포맷한다."""
    return "\n\n---\n\n".join(
        f"[Source: {doc.metadata.get('source', 'unknown')}]\n{doc.page_content}"
        for doc in docs
    )


class RAGChain:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.retriever = RAGRetriever(config).get_retriever()
        self.llm = ChatOpenAI(
            model=config.llm_model,
            temperature=config.temperature,
            max_tokens=config.max_tokens,
        )
        self.memory = ConversationBufferWindowMemory(
            k=5,
            return_messages=True,
            memory_key="chat_history",
        )
        self.chain = self._build_chain()

    def _build_chain(self):
        return (
            RunnableParallel(
                context=self.retriever | format_docs,
                question=RunnablePassthrough(),
                chat_history=lambda _: self.memory.load_memory_variables({})["chat_history"],
            )
            | RAG_PROMPT
            | self.llm
            | StrOutputParser()
        )

    def ask(self, question: str) -> dict:
        """질문에 답하고 소스 문서도 반환한다."""
        # 검색 및 답변 생성
        answer = self.chain.invoke(question)

        # 소스 문서 별도 검색 (투명성)
        source_docs = self.retriever.invoke(question)

        # 대화 기록 저장
        self.memory.save_context(
            {"input": question},
            {"output": answer},
        )

        return {
            "answer": answer,
            "sources": [
                {
                    "content": doc.page_content[:200] + "...",
                    "source": doc.metadata.get("source", "unknown"),
                    "page": doc.metadata.get("page"),
                }
                for doc in source_docs
            ],
        }

    def ask_stream(self, question: str):
        """스트리밍 응답을 생성한다."""
        for chunk in self.chain.stream(question):
            yield chunk
```

### 4단계: 검색 품질 평가

**evaluate.py:**
```python
from ragas import evaluate
from ragas.metrics import (
    answer_relevancy,
    faithfulness,
    context_recall,
    context_precision,
)
from datasets import Dataset


def evaluate_rag_pipeline(rag_chain, test_cases: list[dict]) -> dict:
    """
    RAGAS로 RAG 파이프라인 품질을 평가한다.

    test_cases 형식:
    [
      {
        "question": "질문",
        "ground_truth": "정답",
      },
      ...
    ]
    """
    results = []
    for case in test_cases:
        response = rag_chain.ask(case["question"])
        results.append({
            "question": case["question"],
            "answer": response["answer"],
            "contexts": [s["content"] for s in response["sources"]],
            "ground_truth": case["ground_truth"],
        })

    dataset = Dataset.from_list(results)
    scores = evaluate(
        dataset,
        metrics=[
            answer_relevancy,    # 답변이 질문과 관련 있는가
            faithfulness,        # 답변이 컨텍스트에 근거하는가 (환각 측정)
            context_recall,      # 필요한 컨텍스트가 검색됐는가
            context_precision,   # 검색된 컨텍스트가 관련 있는가
        ],
    )
    return scores
```

### 5단계: 실행 스크립트

```python
# main.py
import asyncio
from config import RAGConfig
from ingest import DocumentIngestor
from chain import RAGChain

async def main():
    config = RAGConfig()

    # 1. 문서 수집
    ingestor = DocumentIngestor(config)
    count = ingestor.ingest("./docs/")
    print(f"Ingested {count} chunks")

    # 2. RAG 체인 초기화
    rag = RAGChain(config)

    # 3. 질문 답변
    result = rag.ask("환불 정책이 어떻게 되나요?")
    print(f"Answer: {result['answer']}")
    print(f"Sources: {[s['source'] for s in result['sources']]}")

if __name__ == "__main__":
    asyncio.run(main())
```

**requirements.txt:**
```
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-chroma>=0.1.0
langchain-community>=0.2.0
chromadb>=0.5.0
pypdf>=4.0.0
ragas>=0.1.0
pydantic-settings>=2.0.0
```

## 출력 형식

```
## RAG 파이프라인 생성 결과

### 아키텍처 결정
- 청킹: RecursiveCharacterTextSplitter (512 토큰, 64 overlap)
- 임베딩: text-embedding-3-small (1536차원)
- 벡터 DB: Chroma (로컬)
- 검색: MMR (k=5, lambda=0.5)
- LLM: gpt-4o-mini

### 생성된 파일
- `ingest.py` - 문서 수집 및 벡터화
- `retriever.py` - 검색 전략
- `chain.py` - RAG 체인
- `config.py` - 설정
- `evaluate.py` - 품질 평가

### 실행 방법
\`\`\`bash
pip install -r requirements.txt
python main.py
\`\`\`
```

## 사용 예시

**입력:**
```
회사 내부 문서 PDF들을 RAG로 검색하는 챗봇 만들어줘.
Pinecone 쓰고 싶고, 한국어 문서야.
```

**출력:** Pinecone 연동 RAG 파이프라인 + 한국어 임베딩 모델 설정 + 전체 코드

## 주의사항

- 임베딩 모델과 청크 크기는 쌍으로 최적화해야 한다. `text-embedding-3-small`은 512 토큰 내외에서 최적 성능을 보인다.
- 한국어 문서는 `HuggingFaceEmbeddings(model_name="jhgan/ko-sroberta-multitask")`를 고려한다.
- 벡터 DB 재구축 시 기존 컬렉션 삭제 여부를 명시적으로 처리해야 중복 문서 문제를 방지한다.
- RAGAS 평가에는 LLM 호출 비용이 발생한다. 개발 초기엔 소규모 테스트셋(10-20개)으로 시작한다.
- `chunk_overlap`이 없으면 청크 경계에서 중요한 정보가 잘릴 수 있다.
