프로덕션 GenAI의 보이지 않는 문제
정교한 GenAI 또는 에이전트 AI 파이프라인을 만들었다. 노트북이나 레포지토리에서는 완벽하게 작동한다. 그런데 프로덕션에 배포하고 3주 후에 관계자로부터 메시지가 온다.

"지난 화요일 오후 2시에 챗봇이 주문에 관한 간단한 질문에 완전히 틀린 답변을 했는데, 왜 했나요?"
로그를 열어본다. 거기서 발견하는 건... 타임스탬프와 HTTP 상태 코드뿐이다.
익숙한 상황 아닌가? 이것이 GenAI 시스템의 핵심 문제다.
GenAI 시스템은 조용히, 비싸게, 그리고 기존 소프트웨어와는 근본적으로 다른 방식으로 실패한다.
REST API는 200을 반환하거나 반환하지 않거나 둘 중 하나다. LLM은 200을 반환하면서 동시에 자신 있게 틀린 답변을 반환한다.
지연 시간만 추적해서는 모델이 환각(hallucination)을 일으켰는지 알 수 없다. 컨텍스트 없이 프롬프트만 로깅해서는 어떤 검색 단계가 잘못됐는지 알 수 없다.
MLflow 2.x는 바로 이 세계를 위해 설계된 네이티브 추적 시스템을 도입했다. 다중 홉 LLM 호출, RAG 파이프라인, 도구를 사용하는 에이전트를 위한 시스템이다. 이 글에서는 LangGraph와 OpenAI API를 사용해 프로덕션 수준의 Text2SQL + RAG 파이프라인 + 웹 검색을 구축하고, MLflow로 완전히 계측하고, GenAI 시스템에서 진정한 관측 가능성이 어떤 모습인지 살펴본다.
GenAI 관측 가능성이 다른 이유
전통적인 ML 관측 가능성은 모델 드리프트, 데이터 분포 변화, 오프라인에서 계산된 정확도 지표에 집중한다. GenAI 관측 가능성은 완전히 다른 형태를 띤다.
전통적 ML vs GenAI 관측 가능성 비교
| 구분 | 전통적 ML | GenAI |
|---|---|---|
| 실패 방식 | 기술적 오류 (500, 타임아웃) | 의미론적 실패 (200이지만 틀린 답) |
| 추적 단위 | 단일 모델 호출 | 다중 홉 체인 (라우터 → 검색 → LLM → 합성) |
| 품질 측정 | 정확도, F1, RMSE | 관련성, 근거성, 일관성, 충실도 |
| 디버깅 | 입력/출력 비교 | 어느 단계에서 무엇이 잘못됐는가? |
MLflow의 추적 시스템은 세 가지 핵심 기본 요소로 이 모든 것을 해결한다.
- 스팬(Spans): 명명되고 시간이 측정된 작업 단위 (검색 호출, LLM 호출, SQL 실행)
- 트레이스(Traces): 전체 요청 생명주기를 나타내는 스팬의 계층적 트리
- 평가(Evaluations): LLM-as-judge 또는 커스텀 스코러를 사용한 구조화된 지표 로깅
MLflow 추적의 세 가지 핵심 기본 요소
MLflow GenAI 관측 가능성 시스템의 핵심에는 겉으로는 단순해 보이는 세 가지 개념이 있다. 스팬, 트레이스, 평가다.
개별적으로는 단순하다. 함께 사용하면 모든 단일 요청의 완전하고 검사 가능한 실행 스토리를 제공한다.
1. 스팬: 실행의 구성 요소
모든 것은 스팬에서 시작된다.
GenAI 시스템에서 스팬은 개념적 작업에 매핑된다. 벡터 스토어에서의 검색, LLM 완성 호출, SQL 실행, 또는 도구 호출이 그 예다.
스팬은 파이프라인 내부의 단일하고 잘 정의된 작업 단위를 나타낸다.
스팬은 다음을 가진다.
- 이름 (무슨 일이 일어나고 있는가)
- 시작 및 종료 시간 (얼마나 걸렸는가)
- 입력 및 출력 (무엇이 들어가고 무엇이 나왔는가)
- 선택적 메타데이터 (토큰, 모델, 파라미터, 오류)
MLflow의 스팬 유형

전통적인 시스템에서 이것은 함수 호출처럼 느껴질 수 있다. GenAI 시스템에서 스팬은 훨씬 더 중요해진다. 각 단계가 기술적으로만이 아니라 의미론적으로도 독립적으로 실패할 수 있기 때문이다.
2. 트레이스: 단일 요청의 이야기
트레이스는 단일 사용자 요청에 대한 스팬의 완전한 트리다.
스팬이 "이 단계에서 무슨 일이 있었나?"라고 답한다면, 트레이스는 "이 대화 턴에서 무슨 일이 있었나?"라고 답한다.
사용자가 쿼리를 보낼 때마다 MLflow는 모든 것을 하나로 묶는 하나의 트레이스를 생성한다.
스팬은 독립적으로 존재하지 않는다. 연결되어 있다.
트레이스: user_query_request
│
├── 스팬: parse_query_llm
├── 스팬: retrieve_context
├── 스팬: generate_sql_llm
├── 스팬: execute_sql
└── 스팬: summarize_response_llm
이 구조는 다음을 제공한다.
- 엔드투엔드 지연 시간
- 실행 순서
- 부모-자식 관계
- 전체 컨텍스트 전파
3. 평가: 규모에서의 품질
스팬과 트레이스는 무슨 일이 일어났는지를 알려준다. 평가는 그것이 좋았는지를 알려준다. 이것이 GenAI 관측 가능성이 전통적인 시스템과 완전히 갈라지는 지점이다.
왜냐하면 GenAI에서는:
- 요청이 기술적으로 성공할 수 있고
- 동시에 의미론적으로 실패할 수 있기 때문이다
MLflow는 두 가지 평가 모드를 지원한다.
온라인 (트레이스별) 피드백
개별 트레이스에 연결된 실시간 요청 수준 평가를 캡처한다. 피드백은 사람(예: 평점, 주석)이나 자동화된 LLM 판사로부터 올 수 있으며, 각 상호작용이 발생할 때 품질을 평가하는 데 도움이 된다.
오프라인 (배치) 평가
집계 지표를 계산하기 위해 데이터셋 전체에 걸쳐 구조화된 평가를 실행한다. 회귀 테스트, 벤치마킹, 그리고 규모에서 다양한 모델이나 프롬프트 전략을 비교하는 데 주로 사용된다.
이 글의 구현에서는 평가를 범위 밖으로 둔다.
이제 실제 시스템으로 이것들을 실제로 살펴보겠다.
케이스 스터디: 이커머스 대화형 분석
다음과 같은 이커머스 대화형 에이전트를 구축한다.
- 각 사용자 질문을 최적의 도구로 라우팅하고
- 로컬 SQLite 데이터베이스에서 분석적 SQL 질문에 답하고
- FAISS 벡터 검색을 사용해 업로드된 PDF에서 관련 텍스트를 검색하고
- 외부 정보가 필요할 때 Serper를 사용해 실시간 웹 검색을 수행하고
- 모든 단계에 대해 MLflow로 관측 가능성 트레이스를 로깅한다
에이전트는 라우팅, 도구, 합성, 대화 기록을 위한 명확히 분리된 노드를 가진 LangGraph 상태 머신으로 구현된다.
1. 아키텍처 개요: 관측 가능한 에이전트
에이전트는 명확하고 관측 가능한 상태 머신 패턴을 따른다.
사용자 질문
│
▼
┌─────────────────────────┐
│ 라우터 노드 (LLM) │ 분류: sql, rag, 또는 web_search
└──────────┬──────────────┘
│
┌──────┼──────┐
│ │ │
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌──────────┐
│ SQL │ │ RAG │ │ 웹 │
│노드 │ │노드 │ │ 검색 │
└──┬──┘ └──┬──┘ └────┬─────┘
│ │ │
└───────┼────────┘
▼
┌──────────────────┐
│ 합성 노드 │ 결과 + 컨텍스트 결합
└────────┬─────────┘
▼
┌──────────────────┐
│ 대화 기록 │ 다음 턴을 위해 기록 업데이트
│ 노드 │
└────────┬─────────┘
▼
최종 답변
이 그래프의 각 노드는 MLflow에서 트레이스 스팬이 되어 계층적 가시성을 제공한다.
이제 시작한다.
전체 코드는 오픈 소스로 공개되어 있다.
2. 사전 요구사항
시작하기 전에 다음이 필요하다.
- Python 3.11 이상 설치
pip사용 가능- 데이터 다운로드 및 API 호출을 위한 인터넷 연결
- OpenAI API 키
- 웹 검색을 위한 Serper API 키
레포지토리를 클론할 수 있다.
레포지토리에는 이미 주요 소스 파일과 설정이 포함되어 있다.
├── .env
├── .gitignore
├── README.md
├── agent
│ ├── __init__.py
│ ├── graph.py
│ ├── nodes.py
│ ├── router.py
│ └── state.py
├── config.py
├── data
│ ├── ecommerce.db
│ ├── faiss_index
│ │ ├── index.faiss
│ │ └── metadata.pkl
│ └── raw
│ ├── df_Customers.csv
│ ├── df_OrderItems.csv
│ ├── df_Orders.csv
│ ├── df_Payments.csv
│ └── df_Products.csv
├── data_loader.py
├── main.py
├── medium_tutorial.md
├── mlflow.db
├── observability.py
├── pdf_docs
│ └── shopbr_return_policy.pdf
├── requirements.txt
├── session.py
├── setup.py
└── tools
├── __init__.py
├── rag_tool.py
├── sql_tool.py
└── web_search_tool.py
.csv와 .db 파일을 제외한 모든 파일이 레포지토리에 포함되어 있다. .csv 파일은 글 후반부에 설명하는 setup.py 파일을 사용해 다운로드할 수 있다.
3. 단계별 설정
1단계: 가상 환경 생성 및 활성화
macOS/Linux:
python3 -m venv .venv
source .venv/bin/activate
Windows (명령 프롬프트):
python -m venv .venv
.venv\Scripts\activate
2단계: Python 의존성 설치
requirements.txt에서 필요한 라이브러리를 설치한다.
pip install -r requirements.txt
다음 패키지들이 설치된다.
- openai
- langgraph
- faiss-cpu
- sentence-transformers
- pypdf
- pandas
- kagglehub
- mlflow
- python-dotenv
3단계: API 키 설정
.env 파일을 추가하고 아래 환경 변수를 업데이트한다.
OPENAI_API_KEY=your-open-api-key
SERPER_API_KEY=your-serper-api-key
MLFLOW_TRACKING_URI=http://localhost:5001
MLFLOW_EXPERIMENT=ecommerce-agent
4. 데이터 로드
4.1: 이커머스 데이터
출처: Kaggle — Ecommerce Order & Supply Chain Dataset
이 프로젝트에서 사용하는 데이터셋은 Kaggle에서 제공하는 E-commerce Order Dataset이다. 주문, 고객, 상품, 결제, 물류를 포함한 엔드투엔드 이커머스 시스템의 다중 테이블 뷰를 제공한다.
이 데이터셋은 고객이 주문하는 순간부터 최종 배송까지 주문의 전체 생명주기를 시뮬레이션한다.
데이터셋의 주요 테이블
1. Orders (주문)
핵심 주문 생명주기 정보를 담고 있다.
- 주문 ID, 고객 ID
- 주문 상태 (배송완료, 취소 등)
- 구매, 승인, 배송 타임스탬프
- 예상 배송 날짜
배송 지연, 주문 흐름, 생명주기 추적 분석에 유용하다.
2. Order Items (주문 항목)
각 주문 내 항목을 나타낸다.
- 상품 ID 및 판매자 ID
- 가격 및 배송비
- 주문당 여러 항목 지원
매출 분석과 장바구니 수준 인사이트에 핵심적이다.
3. Customers (고객)
고객 수준 메타데이터다.
- 고객 ID
- 위치 (도시, 주, 우편번호)
지역별 분석과 세분화에 활용된다.
4. Payments (결제)
결제 거래 세부 정보다.
- 결제 방법 (신용카드 등)
- 할부
- 결제 금액
결제 행동 및 성공률 이해에 유용하다.
5. Products (상품)
상품 카탈로그 정보다.
- 상품 카테고리
- 물리적 속성 (무게, 치수)
카테고리 수준 인사이트와 물류 분석에 활용된다.
# data_loader.py
# ─────────────────────────────────────────────────────────────────────────────
# Kaggle 이커머스 데이터셋을 다운로드하고 train/ CSV를 로컬 SQLite 데이터베이스에 수집한다.
#
# 테이블 매핑 (Kaggle train/ 폴더 → SQLite 테이블):
# orders_dataset.csv → orders
# order_items_dataset.csv → order_items
# customers_dataset.csv → customers
# payments_dataset.csv → payments (olist_order_payments_dataset.csv)
# products_dataset.csv → products
#
# 사용법:
# python data_loader.py
# ─────────────────────────────────────────────────────────────────────────────
import os
import sqlite3
import glob
import shutil
import pandas as pd
import kagglehub
from config import DB_PATH, RAW_DATA_DIR
# ── 각 테이블을 감지하기 위한 파일명 힌트 ─────────────────────────────────────
TABLE_HINTS = {
"orders": ["orders_dataset", "orders.csv"],
"order_items": ["order_items_dataset", "orderitems.csv"],
"customers": ["customers_dataset", "customers.csv"],
"payments": ["payments_dataset", "payment"],
"products": ["products_dataset", "products.csv"],
}
def _find_csv(train_dir: str, hints: list[str]) -> str | None:
"""train_dir에서 힌트 중 하나를 포함하는 첫 번째 CSV를 반환한다."""
for f in glob.glob(os.path.join(train_dir, "*.csv")):
fname = os.path.basename(f).lower()
if any(h.lower() in fname for h in hints):
return f
return None
def _resolve_delivery_col(df: pd.DataFrame) -> pd.DataFrame:
"""Kaggle orders 파일은 배송 날짜 컬럼명이 다를 수 있다."""
if "order_delivered_timestamp" in df.columns:
return df
for candidate in ("order_delivered_customer_date", "order_delivered_carrier_date"):
if candidate in df.columns:
df = df.rename(columns={candidate: "order_delivered_timestamp"})
return df
return df
def _copy_raw_csvs(source_dir: str, target_dir: str) -> None:
"""다운로드된 데이터셋에서 로컬 raw 데이터 폴더로 원본 CSV를 복사한다."""
csv_files = glob.glob(os.path.join(source_dir, "*.csv"))
if not csv_files:
print(f"[data_loader] 경고: {source_dir}에서 복사할 CSV 파일을 찾을 수 없습니다.")
return
for csv_path in csv_files:
target_path = os.path.join(target_dir, os.path.basename(csv_path))
shutil.copy2(csv_path, target_path)
def download_and_ingest(force: bool = False) -> str:
"""
Kaggle 데이터셋을 다운로드하고 (첫 실행 후 캐시됨) 모든 테이블을
DB_PATH의 SQLite 데이터베이스에 작성한다.
DB_PATH를 반환한다.
"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
os.makedirs(RAW_DATA_DIR, exist_ok=True)
if os.path.exists(DB_PATH) and not force:
print(f"[data_loader] SQLite DB가 이미 {DB_PATH}에 존재합니다. 수집을 건너뜁니다.")
print(" force=True를 전달하면 재수집합니다.")
return DB_PATH
print("[data_loader] Kaggle 데이터셋 다운로드 중 …")
kaggle_root = kagglehub.dataset_download("bytadit/ecommerce-order-dataset")
print(f"[data_loader] 데이터셋 루트: {kaggle_root}")
# train/ 하위 디렉터리 찾기
train_dir = None
for root, dirs, _ in os.walk(kaggle_root):
if "train" in dirs:
train_dir = os.path.join(root, "train")
break
if os.path.basename(root).lower() == "train":
train_dir = root
break
if train_dir is None:
# 폴백: train 하위 폴더가 없으면 루트를 직접 사용
train_dir = kaggle_root
print(f"[data_loader] CSV 디렉터리 사용: {train_dir}")
print(f"[data_loader] {RAW_DATA_DIR}에 원본 CSV 복사 중 …")
_copy_raw_csvs(train_dir, RAW_DATA_DIR)
conn = sqlite3.connect(DB_PATH)
for table_name, hints in TABLE_HINTS.items():
csv_path = _find_csv(train_dir, hints)
if csv_path is None:
print(f"[data_loader] 경고: '{table_name}' 테이블용 CSV를 찾을 수 없습니다. 건너뜁니다.")
continue
print(f"[data_loader] {os.path.basename(csv_path)} → '{table_name}' 테이블로 로드 중 …")
df = pd.read_csv(csv_path, low_memory=False)
# 컬럼명 정규화: 소문자, 공백 제거
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
# orders 테이블의 배송 컬럼명 수정
if table_name == "orders":
df = _resolve_delivery_col(df)
df.to_sql(table_name, conn, if_exists="replace", index=False)
print(f" → {len(df):,}행 로드 완료.")
conn.close()
print(f"[data_loader] ✓ 모든 테이블이 {DB_PATH}에 작성되었습니다.")
return DB_PATH
def get_table_schema(table_name: str) -> str:
"""주어진 테이블의 CREATE TABLE SQL을 반환한다 (프롬프트 구성에 유용)."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
)
row = cur.fetchone()
conn.close()
return row[0] if row else f"-- 테이블 '{table_name}'을 찾을 수 없습니다."
def list_tables() -> list[str]:
"""SQLite 데이터베이스의 모든 테이블명을 반환한다."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [r[0] for r in cur.fetchall()]
conn.close()
return tables
if __name__ == "__main__":
download_and_ingest()
print("\nDB의 테이블:", list_tables())
for t in list_tables():
print(f"\n── {t} ──")
print(get_table_schema(t))
4.2: PDF 데이터
레포 경로: pdf_docs/shopbr_return_policy.pdf (이 PDF는 합성적으로 생성된 파일이다.)
이커머스 플랫폼의 여러 상품 카테고리에 걸친 반품, 환불, 교환 규칙을 정리한 구조화된 LLM 생성 정책 문서다. 전자제품, 패션, 식료품 등 카테고리별 세부 정책(반품 기간, 조건, 예외 사항 포함)이 들어 있으며, RAG 파이프라인을 위한 비정형 지식 소스 역할을 하여 정책 관련 사용자 쿼리에 답할 수 있게 한다.
5. 설정 및 관측 가능성 함수
5.1 프로젝트 설정
config.py 파일은 전체 GenAI 파이프라인의 중앙 제어 계층 역할을 하여 모든 구성 요소가 일관된 설정과 공유 설정으로 동작하도록 보장한다.
- API 키, 모델 파라미터, 임베딩 설정 관리
- 데이터베이스, PDF, FAISS 인덱스 저장을 위한 데이터 경로 정의
- 청크 크기, 오버랩, top-k 검색 등 RAG 동작 설정
- SQL, RAG, 웹 검색 도구에 걸친 라우팅 옵션 지정
# config.py
# ─────────────────────────────────────────────────────────────────────────────
# 이커머스 대화형 에이전트를 위한 중앙 설정
# ─────────────────────────────────────────────────────────────────────────────
import os
from dotenv import load_dotenv
# .env 파일에서 환경 변수 로드
load_dotenv()
# ── API 키 ────────────────────────────────────────────────────────────────────
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
SERPER_API_KEY = os.getenv("SERPER_API_KEY", "")
# ── 모델 ──────────────────────────────────────────────────────────────────────
LLM_MODEL = "gpt-4o-mini"
LLM_TEMPERATURE = 0.0
LLM_MAX_TOKENS = 1024
# ── 임베딩 모델 (오픈소스, 로컬 실행) ──────────────────────────────────────────
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# ── 경로 ──────────────────────────────────────────────────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, "data", "ecommerce.db")
RAW_DATA_DIR = os.path.join(BASE_DIR, "data", "raw")
FAISS_INDEX_DIR = os.path.join(BASE_DIR, "data", "faiss_index")
PDF_DIR = os.path.join(BASE_DIR, "pdf_docs")
# ── RAG 설정 ──────────────────────────────────────────────────────────────────
RAG_TOP_K = 4 # 쿼리당 검색할 청크 수
CHUNK_SIZE = 500 # 청크당 문자 수
CHUNK_OVERLAP = 80 # 청크 간 문자 오버랩
# ── 라우팅 ────────────────────────────────────────────────────────────────────
# 라우터 LLM 호출이 정확히 이 라우트 이름 중 하나를 선택한다
ROUTES = ["sql", "rag", "web_search"]
5.2 관측 가능성 계층 (MLflow 추적 래퍼)
observability.py 모듈은 파이프라인의 모든 단계를 MLflow로 계측하는 재사용 가능한 추적 계층을 제공하여, 실행, 성능, 비용에 대한 깊은 가시성을 제공한다.
- 도구, LLM 호출, 파이프라인에 대한 스팬을 자동으로 생성하기 위해
@trace데코레이터로 함수를 감싼다 - 수동 로깅 없이 모든 스팬의 입력, 출력, 실행 시간, 오류를 캡처한다
- 효율성과 확장성을 모니터링하기 위해 토큰 사용량, 예상 비용, 데이터 크기를 추적한다
- 구조화된 메타데이터로 트레이스를 풍부하게 만들어 디버깅과 근본 원인 분석을 크게 쉽게 한다
스팬 및 트레이스에서 캡처되는 핵심 속성
이 속성들이 관측 가능성을 실제로 유용하게 만드는 것들이다 (단순 추적이 아닌).
핵심 실행 속성
duration_ms→ 각 스팬 소요 시간error,error.message,error.type→ 실패 추적func.name,func.module→ 실행 소스
LLM 및 비용 추적
model.name→ 사용된 모델 (예: gpt-4o-mini)tokens.input,tokens.output,tokens.total→ 토큰 사용량cost.usd→ 스팬당 예상 비용
데이터 및 페이로드 추적
bytes.input,bytes.output→ 입력/출력 크기request→ 사용자 쿼리 (자동 추출)response→ 모델/도구 출력
검색 특화 속성 (RAG)
retrieval.model→ 사용된 임베딩 모델retrieval.top_k→ 검색된 청크 수retrieval.chunks→ 반환된 청크 수retrieval.sources→ 소스 문서 수
데이터베이스 / SQL 속성
db.type,db.path→ 데이터베이스 메타데이터db.rows_returned→ 가져온 행 수sql→ 생성된 SQL 쿼리
웹 검색 속성
api.provider,api.endpoint→ 외부 API 세부 정보search.top_links_count→ 반환된 링크 수
트레이스 수준 컨텍스트
request_preview→ 사용자 쿼리 단축 버전response_preview→ 최종 출력 단축 버전
# observability.py
# ─────────────────────────────────────────────────────────────────────────────
# 자동 비용 계산 및 크기 추적이 포함된 MLflow 추적.
#
# 자동 캡처 속성:
# - cost.usd : 토큰 기반 예상 비용
# - tokens.input : 입력 토큰 수 (추정)
# - tokens.output : 출력 토큰 수 (추정)
# - bytes.input : 입력 크기 (바이트)
# - bytes.output : 출력 크기 (바이트)
# - duration_ms : 실행 시간
#
# 사용법:
# @trace(span_type="TOOL", model="gpt-4o-mini") # 비용 추적 활성화
# def run_sql_tool(...): ...
# ─────────────────────────────────────────────────────────────────────────────
import os
import functools
import time
import json
from contextlib import contextmanager
from typing import Any
import mlflow
# ── 가격 (100만 토큰당) ────────────────────────────────────────────────────
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"text-embedding-3-small": {"input": 0.02, "output": 0.00},
"text-embedding-3-large": {"input": 0.13, "output": 0.00},
}
# 대략적인 토큰 추정 (영어 기준 1 토큰 ≈ 4 문자)
def estimate_tokens(text: str) -> int:
if not text:
return 0
return len(text) // 4
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = MODEL_PRICING.get(model, {"input": 0.15, "output": 0.60})
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return round(input_cost + output_cost, 6)
# ── 설정 ─────────────────────────────────────────────────────────────────────
ENABLED = os.getenv("MLFLOW_ENABLED", "true").lower() == "true"
_initialized = False
def _init():
global _initialized
if not _initialized and ENABLED:
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment(os.getenv("MLFLOW_EXPERIMENT", "ecommerce-agent"))
_initialized = True
# ── 크기 계산 ─────────────────────────────────────────────────────────────────
def calculate_size(obj: Any) -> int:
"""객체의 대략적인 바이트 크기를 계산한다."""
try:
if isinstance(obj, str):
return len(obj.encode('utf-8'))
elif isinstance(obj, (list, dict)):
return len(json.dumps(obj).encode('utf-8'))
else:
return len(str(obj).encode('utf-8'))
except:
return 0
# ── 데코레이터 (비용 및 크기 추적 포함) ──────────────────────────────────────
def trace(name=None, span_type="CHAIN", attributes=None, model=None):
"""
자동 비용 및 크기 추적으로 모든 함수를 추적한다.
Args:
name: 스팬 이름 (기본값: 함수명)
span_type: CHAIN, TOOL, RETRIEVER, PARSER, LLM
attributes: 정적 속성 딕셔너리
model: 비용 추적을 위한 모델명 (예: "gpt-4o-mini")
"""
static_attrs = attributes or {}
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not ENABLED:
return func(*args, **kwargs)
_init()
span_name = name or func.__name__
# 입력 구성 및 입력 크기 계산
inputs = {}
input_text_parts = []
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
for i, arg_name in enumerate(arg_names):
if i < len(args):
val = args[i]
elif arg_name in kwargs:
val = kwargs[arg_name]
else:
continue
# 표시용으로 잘린 버전 저장
display_val = val
if isinstance(val, str):
input_text_parts.append(val)
if len(val) > 500:
display_val = val[:500] + "..."
elif isinstance(val, (list, dict)):
input_text_parts.append(json.dumps(val))
inputs[arg_name] = display_val
# "request"를 첫 번째 문자열 인수로 명시적 설정
request_arg = None
for i, arg_name in enumerate(arg_names):
if i < len(args):
val = args[i]
elif arg_name in kwargs:
val = kwargs[arg_name]
else:
continue
if isinstance(val, str) and any(keyword in arg_name.lower() for keyword in ['question', 'message', 'prompt', 'query', 'input']):
request_arg = val
break
# 폴백: 명확한 요청 인수가 없으면 첫 번째 문자열 인수 사용
if not request_arg:
for i, arg_name in enumerate(arg_names):
if i < len(args):
val = args[i]
elif arg_name in kwargs:
val = kwargs[arg_name]
else:
continue
if isinstance(val, str):
request_arg = val
break
if request_arg:
inputs["request"] = request_arg
input_size = calculate_size(args) + calculate_size(kwargs)
start = time.perf_counter()
with mlflow.start_span(name=span_name, span_type=span_type) as span:
# 입력 설정
if inputs:
span.set_inputs(inputs)
# 정적 속성
for key, value in static_attrs.items():
span.set_attribute(key, value)
# 함수 메타데이터
span.set_attribute("func.name", func.__name__)
span.set_attribute("func.module", func.__module__)
# 입력 크기 추적
span.set_attribute("bytes.input", input_size)
# 모델이 지정된 경우 입력 토큰 추정
if model:
input_text = " ".join(input_text_parts)
input_tokens = estimate_tokens(input_text)
span.set_attribute("tokens.input", input_tokens)
span.set_attribute("model.name", model)
try:
result = func(*args, **kwargs)
# 타이밍
elapsed = round((time.perf_counter() - start) * 1000, 2)
span.set_attribute("duration_ms", elapsed)
# 출력 크기
output_size = calculate_size(result)
span.set_attribute("bytes.output", output_size)
# 출력 토큰 추정 및 비용 계산
if model and isinstance(result, dict):
output_text = json.dumps(result) if result else ""
output_tokens = estimate_tokens(output_text)
span.set_attribute("tokens.output", output_tokens)
total_tokens = span.get_attribute("tokens.input") + output_tokens
cost = calculate_cost(model, total_tokens, 0) # 단순화됨
span.set_attribute("cost.usd", cost)
span.set_attribute("tokens.total", total_tokens)
# 스마트 출력 추출
if isinstance(result, dict):
outputs = {
"response": result # 전체 응답 명시적 캡처
}
if "error" in result:
outputs["error"] = result["error"]
span.set_attribute("error", True)
if "rows" in result:
outputs["row_count"] = len(result["rows"])
span.set_attribute("db.rows_returned", len(result["rows"]))
if "sql" in result:
outputs["sql"] = result["sql"] # 전체 SQL 캡처
if "table_md" in result:
outputs["table_md"] = result["table_md"] # 형식화된 결과 캡처
if "chunks" in result:
outputs["chunk_count"] = len(result["chunks"])
outputs["chunks"] = result["chunks"] # 전체 청크 세부 정보 캡처
span.set_attribute("retrieval.chunks", len(result["chunks"]))
if "sources" in result:
outputs["sources"] = result["sources"] # 소스 캡처
span.set_attribute("retrieval.sources", len(result["sources"]))
if "results" in result:
outputs["result_count"] = len(result["results"])
# 웹 검색 결과에서 top-k 링크 추출
if result["results"] and isinstance(result["results"], list):
top_links = [item.get("url") for item in result["results"] if item.get("url")]
if top_links:
outputs["top_links"] = top_links
span.set_attribute("search.top_links_count", len(top_links))
span.set_outputs(outputs)
else:
# dict가 아닌 결과도 response로 캡처
span.set_outputs({"response": result})
# 트레이스 수준 request 및 response 업데이트
request_str = None
for i, arg_name in enumerate(arg_names):
if i < len(args):
val = args[i]
elif arg_name in kwargs:
val = kwargs[arg_name]
else:
continue
if isinstance(val, str) and any(keyword in arg_name.lower() for keyword in ['question', 'message', 'prompt', 'query', 'input']):
request_str = val
break
if not request_str:
for i, arg_name in enumerate(arg_names):
if i < len(args):
val = args[i]
elif arg_name in kwargs:
val = kwargs[arg_name]
else:
continue
if isinstance(val, str):
request_str = val
break
response_str = None
if isinstance(result, dict):
response_str = json.dumps(result)
elif isinstance(result, str):
response_str = result
else:
response_str = str(result)
# request 미리보기와 response 미리보기로 트레이스 업데이트
trace_update_kwargs = {}
if request_str:
trace_update_kwargs["request_preview"] = request_str[:500]
if response_str:
trace_update_kwargs["response_preview"] = response_str[:500]
if trace_update_kwargs:
mlflow.update_current_trace(**trace_update_kwargs)
return result
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("error.message", str(e))
span.set_attribute("error.type", type(e).__name__)
raise
return wrapper
return decorator
# ── 컨텍스트 매니저 (비용/크기 추적 포함) ─────────────────────────────────────
@contextmanager
def trace_span(name, span_type="CHAIN", attributes=None, model=None):
"""전체 추적으로 코드 블록을 추적한다."""
if not ENABLED:
yield None
return
_init()
with mlflow.start_span(name=name, span_type=span_type) as span:
start = time.perf_counter()
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
if model:
span.set_attribute("model.name", model)
try:
yield span
elapsed = round((time.perf_counter() - start) * 1000, 2)
span.set_attribute("duration_ms", elapsed)
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("error.message", str(e))
raise
# ── 헬퍼: 수동 속성 업데이트 ──────────────────────────────────────────────────
def set_attr(key: str, value):
"""현재 스팬에 속성을 설정한다."""
if ENABLED:
current = mlflow.get_current_active_span()
if current:
current.set_attribute(key, value)
def set_attrs(attributes: dict):
"""여러 속성을 설정한다."""
if ENABLED:
current = mlflow.get_current_active_span()
if current:
for key, value in attributes.items():
current.set_attribute(key, value)
def log_cost(model: str, input_tokens: int, output_tokens: int):
"""스팬의 비용을 수동으로 로깅한다."""
if ENABLED:
cost = calculate_cost(model, input_tokens, output_tokens)
set_attrs({
"cost.usd": cost,
"tokens.input": input_tokens,
"tokens.output": output_tokens,
"tokens.total": input_tokens + output_tokens,
"model.name": model
})
6. 도구 정의
6.1 RAG 도구
PDF 문서를 수집하고, 오버랩 청크로 분할하고, 로컬 sentence-transformer 모델을 사용해 임베딩으로 변환한다. 효율적인 의미론적 유사도 검색을 위해 FAISS 벡터 인덱스에 임베딩을 저장한다. 쿼리 시 사용자 질문을 기반으로 가장 관련 있는 청크를 검색해 근거가 있는 컨텍스트를 제공한다. 검색된 텍스트와 소스 메타데이터를 반환하고, 최종 답변 생성은 다운스트림 LLM 구성 요소에 맡긴다.
# tools/rag_tool.py
# ─────────────────────────────────────────────────────────────────────────────
# RAG 도구: PDF 수집 → FAISS 벡터 스토어 → 청크 검색
#
# 임베딩 모델: sentence-transformers/all-MiniLM-L6-v2 (로컬 실행, 무료)
# 벡터 스토어: FAISS (CPU)
#
# 주의: 이 도구는 관련 청크만 검색한다. 답변 합성은
# 중앙 합성 노드에서 처리된다.
#
# 사용법:
# # 최초 빌드 (또는 PDF 변경 시):
# python -c "from tools.rag_tool import build_index; build_index()"
#
# # 쿼리:
# result = run_rag_tool("반품 정책이 어떻게 되나요?", history)
# ─────────────────────────────────────────────────────────────────────────────
import os
import json
import pickle
import re
import numpy as np
import faiss
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
from observability import trace # 관측 가능성 임포트
from config import (
PDF_DIR, FAISS_INDEX_DIR, EMBEDDING_MODEL,
CHUNK_SIZE, CHUNK_OVERLAP, RAG_TOP_K,
)
# ── 임베딩 헬퍼 ──────────────────────────────────────────────────────────────
_embedder = None # 전역 임베더 캐시
def _get_embedder() -> SentenceTransformer:
global _embedder
if _embedder is None:
print(f"[rag_tool] 임베딩 모델 로드 중: {EMBEDDING_MODEL} …")
_embedder = SentenceTransformer(EMBEDDING_MODEL)
return _embedder
def embed_texts(texts: list[str]) -> np.ndarray:
"""L2 정규화된 임베딩의 (N, D) float32 배열을 반환한다."""
model = _get_embedder()
vecs = model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
return np.array(vecs, dtype="float32")
# ── PDF → 청크 ────────────────────────────────────────────────────────────────
def _extract_text_from_pdf(pdf_path: str) -> str:
"""PDF 파일에서 모든 텍스트를 추출한다."""
reader = PdfReader(pdf_path)
pages = []
for page in reader.pages:
text = page.extract_text() or ""
pages.append(text)
return "\n".join(pages)
def _chunk_text(text: str, source: str) -> list[dict]:
"""
텍스트를 오버랩 청크로 분할한다.
각 청크는 딕셔너리: {"text": str, "source": str, "chunk_id": int}
"""
text = re.sub(r"\s+", " ", text).strip()
chunks = []
start = 0
idx = 0
while start < len(text):
end = min(start + CHUNK_SIZE, len(text))
chunk = text[start:end].strip()
if chunk:
chunks.append({"text": chunk, "source": source, "chunk_id": idx})
idx += 1
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
# ── 인덱스 빌드 및 로드 ──────────────────────────────────────────────────────
_INDEX_FILE = None
_META_FILE = None
def _index_paths():
os.makedirs(FAISS_INDEX_DIR, exist_ok=True)
idx_file = os.path.join(FAISS_INDEX_DIR, "index.faiss")
meta_file = os.path.join(FAISS_INDEX_DIR, "metadata.pkl")
return idx_file, meta_file
def build_index(force: bool = False) -> None:
"""
PDF_DIR의 모든 PDF를 수집하고 FAISS flat IP 인덱스를 빌드한다.
인덱스 + 메타데이터를 FAISS_INDEX_DIR에 저장한다.
"""
idx_file, meta_file = _index_paths()
if os.path.exists(idx_file) and not force:
print(f"[rag_tool] FAISS 인덱스가 이미 {idx_file}에 존재합니다. 빌드를 건너뜁니다.")
print(" build_index(force=True)를 호출하면 재빌드합니다.")
return
pdf_files = [
os.path.join(PDF_DIR, f)
for f in os.listdir(PDF_DIR)
if f.lower().endswith(".pdf")
]
if not pdf_files:
print(f"[rag_tool] 경고: {PDF_DIR}에서 PDF를 찾을 수 없습니다.")
print(" PDF 파일을 추가하고 build_index()를 다시 실행하세요.")
# 나머지 코드가 충돌하지 않도록 비어있지만 유효한 인덱스를 빌드한다
dim = 384 # all-MiniLM-L6-v2 출력 차원
index = faiss.IndexFlatIP(dim)
faiss.write_index(index, idx_file)
with open(meta_file, "wb") as fh:
pickle.dump([], fh)
return
all_chunks: list[dict] = []
for pdf_path in pdf_files:
source = os.path.basename(pdf_path)
print(f"[rag_tool] {source} 파싱 중 …")
text = _extract_text_from_pdf(pdf_path)
chunks = _chunk_text(text, source)
all_chunks.extend(chunks)
print(f" → {len(chunks)} 청크")
texts = [c["text"] for c in all_chunks]
vecs = embed_texts(texts)
dim = vecs.shape[1]
index = faiss.IndexFlatIP(dim) # 정규화된 벡터의 내적 = 코사인 유사도
index.add(vecs)
faiss.write_index(index, idx_file)
with open(meta_file, "wb") as fh:
pickle.dump(all_chunks, fh)
print(f"[rag_tool] ✓ 인덱스 빌드 완료: {len(all_chunks)} 청크, dim={dim}")
def _load_index():
"""FAISS 인덱스와 메타데이터를 디스크에서 로드한다. 없으면 빌드한다."""
idx_file, meta_file = _index_paths()
if not os.path.exists(idx_file):
build_index()
index = faiss.read_index(idx_file)
with open(meta_file, "rb") as fh:
metadata = pickle.load(fh)
return index, metadata
# ── 검색 ──────────────────────────────────────────────────────────────────────
def retrieve_chunks(query: str, k: int = RAG_TOP_K) -> list[dict]:
"""
쿼리를 임베딩하고 가장 유사한 상위 k개 청크를 반환한다.
반환된 각 딕셔너리: {"text": str, "source": str, "chunk_id": int, "score": float}
"""
index, metadata = _load_index()
if index.ntotal == 0:
return []
q_vec = embed_texts([query])
scores, idxs = index.search(q_vec, min(k, index.ntotal))
results = []
for score, i in zip(scores[0], idxs[0]):
if i < 0:
continue
chunk = dict(metadata[i])
chunk["score"] = float(score)
results.append(chunk)
return results
@trace(span_type="RETRIEVER", attributes={ # 추적 추가
"retrieval.model": EMBEDDING_MODEL,
"retrieval.top_k": RAG_TOP_K,
})
def run_rag_tool(user_question: str, conversation_history: list[dict]) -> dict:
"""
RAG 도구의 메인 진입점.
반환 딕셔너리:
{
"chunks": list[dict], # 점수와 텍스트가 포함된 검색된 청크
"sources": list[str], # 고유한 소스 문서명
}
"""
chunks = retrieve_chunks(user_question, k=RAG_TOP_K)
sources = list({c["source"] for c in chunks})
return {
"chunks": chunks,
"sources": sources,
}
6.2 SQL 도구 (Text2SQL 실행 엔진)
자연어 질문을 LLM을 사용해 SQLite SQL 쿼리로 변환한다. 상세한 스키마와 JOIN 관계를 가이드로 사용한다. 생성된 SQL을 로컬 데이터베이스에 실행하고 구조화된 결과를 반환한다. 쿼리 출력을 다운스트림 소비를 위한 읽기 쉬운 마크다운 테이블로 형식화한다. 오류를 우아하게 처리하고 관측 가능성과 디버깅을 위해 생성된 SQL과 실행 결과를 모두 노출한다.
# tools/sql_tool.py
# ─────────────────────────────────────────────────────────────────────────────
# SQL 도구: 로컬 SQLite DB에 대한 SQL 쿼리를 생성하고 실행한다.
#
# 흐름:
# 1. 모든 테이블 스키마 + 샘플 행을 포함하는 시스템 프롬프트 빌드
# 2. GPT-4o-mini를 호출해 사용자 질문을 SQLite SQL로 변환
# 3. SQL 실행; 결과를 마크다운 테이블 문자열로 반환
# ─────────────────────────────────────────────────────────────────────────────
import sqlite3
import json
from openai import OpenAI
from config import DB_PATH, LLM_MODEL, LLM_TEMPERATURE, LLM_MAX_TOKENS, OPENAI_API_KEY
from observability import trace # 관측 가능성 임포트
client = OpenAI(api_key=OPENAI_API_KEY)
# ── 스키마 문서화 (수동 작성, Kaggle 데이터셋을 미러링) ─────────────────────────
SCHEMA_DESCRIPTION = """
다음 테이블을 가진 SQLite 데이터베이스에 접근할 수 있습니다:
TABLE: orders
order_id TEXT (기본 키)
customer_id TEXT
order_status TEXT (delivered, cancelled, invoiced, processing, shipped, unavailable, created, approved)
order_purchase_timestamp TEXT (ISO datetime)
order_approved_at TEXT (ISO datetime)
order_delivered_timestamp TEXT (ISO datetime, NULL일 수 있음)
order_estimated_delivery_date TEXT (ISO date)
TABLE: order_items
order_id TEXT
order_item_id INTEGER (주문 내 항목 시퀀스)
product_id TEXT
seller_id TEXT
price REAL
shipping_charges REAL
TABLE: customers
customer_id TEXT (기본 키)
customer_zip_code_prefix TEXT
customer_city TEXT
customer_state TEXT
TABLE: payments
order_id TEXT
payment_sequential INTEGER
payment_type TEXT (credit_card, boleto, voucher, debit_card)
payment_installments INTEGER
payment_value REAL
TABLE: products
product_id TEXT (기본 키)
product_category_name TEXT
product_weight_g REAL
product_length_cm REAL
product_height_cm REAL
product_width_cm REAL
JOIN 힌트:
orders.customer_id → customers.customer_id
order_items.order_id → orders.order_id
order_items.product_id → products.product_id
payments.order_id → orders.order_id
규칙:
- 원시 SQLite SQL 쿼리만 출력하고, 마크다운이나 설명은 없습니다.
- 가독성을 위해 테이블 별칭을 사용합니다.
- 날짜 필터링: strftime('%Y-%m', order_purchase_timestamp) = '2018-01'
- 사용자가 모든 행을 요청하지 않는 한 항상 LIMIT 50을 추가합니다.
- 적절한 경우 SUM / COUNT / AVG / GROUP BY로 집계합니다.
- SELECT *는 사용하지 않고 항상 컬럼을 명시적으로 지정합니다.
"""
SQL_SYSTEM_PROMPT = (
"당신은 이커머스 분석 데이터베이스를 위한 전문 SQLite 쿼리 작성자입니다.\n"
+ SCHEMA_DESCRIPTION
)
def _generate_sql(user_question: str, conversation_history: list[dict]) -> str:
"""GPT-4o-mini를 호출해 자연어 질문을 SQLite SQL로 변환한다."""
messages = [{"role": "system", "content": SQL_SYSTEM_PROMPT}]
# 멀티턴 컨텍스트를 위한 최근 대화 포함 (최근 6개 메시지 = 3턴)
messages.extend(conversation_history[-6:])
messages.append({"role": "user", "content": user_question})
response = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
temperature=LLM_TEMPERATURE,
max_tokens=512,
)
return response.choices[0].message.content.strip()
def _execute_sql(sql: str) -> tuple[list[dict], str | None]:
"""
SQLite DB에 sql을 실행한다.
(딕셔너리 리스트로서의 행, 오류_문자열_또는_None)을 반환한다.
"""
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(sql)
rows = [dict(r) for r in cur.fetchall()]
conn.close()
return rows, None
except sqlite3.Error as exc:
return [], str(exc)
def _rows_to_markdown(rows: list[dict], max_rows: int = 20) -> str:
"""딕셔너리 리스트를 간결한 마크다운 테이블 문자열로 변환한다."""
if not rows:
return "_반환된 행이 없습니다._"
headers = list(rows[0].keys())
display = rows[:max_rows]
header_line = "| " + " | ".join(headers) + " |"
sep_line = "| " + " | ".join(["---"] * len(headers)) + " |"
data_lines = [
"| " + " | ".join(str(r.get(h, "")) for h in headers) + " |"
for r in display
]
table = "\n".join([header_line, sep_line] + data_lines)
if len(rows) > max_rows:
table += f"\n\n_… 그리고 {len(rows) - max_rows}개 행 더 있음 (처음 {max_rows}개 표시)_"
return table
@trace(span_type="TOOL", model="gpt-4o-mini", attributes={ # 추적 추가
"db.type": "sqlite",
"db.path": DB_PATH,
})
def run_sql_tool(user_question: str, conversation_history: list[dict]) -> dict:
"""
SQL 도구의 메인 진입점.
반환 딕셔너리:
{
"sql": str, # 생성된 SQL
"rows": list[dict], # 원시 쿼리 결과
"table_md": str, # 마크다운 형식의 결과 테이블
"error": str | None, # 실행 오류 (있는 경우)
}
"""
sql = _generate_sql(user_question, conversation_history)
rows, error = _execute_sql(sql)
return {
"sql": sql,
"rows": rows,
"table_md": _rows_to_markdown(rows) if not error else f"_SQL 오류: {error}_",
"error": error,
}
6.3 웹 검색 도구
Serper API를 사용해 웹에서 실시간 정보를 가져오고 구조화된 결과(제목, URL, 요약)를 반환한다. 내부 데이터나 RAG 컨텍스트로 답할 수 없는 쿼리의 폴백 역할을 한다. 다운스트림 합성과 추론을 위한 깔끔하고 LLM 친화적인 검색 결과를 반환한다.
# tools/web_search_tool.py
# ─────────────────────────────────────────────────────────────────────────────
# 웹 검색 도구: Serper API를 사용해 실시간 웹 결과를 가져온다.
#
# Serper는 구조화된 결과(제목, 링크, 요약)를 반환한다 —
# LLM 소비를 위한 원시 HTML 스크래핑보다 훨씬 깔끔하다.
#
# 주의: 이 도구는 원시 검색 결과만 검색한다. 답변 합성은
# 중앙 합성 노드에서 처리된다.
# ─────────────────────────────────────────────────────────────────────────────
import requests
from config import SERPER_API_KEY
from observability import trace # 관측 가능성 임포트
@trace(span_type="TOOL", attributes={ # 추적 추가
"api.provider": "serper.dev",
"api.endpoint": "google.serper.dev/search",
})
def run_web_search_tool(user_question: str, conversation_history: list[dict]) -> dict:
"""
웹 검색 도구의 메인 진입점.
단계:
1. Serper API를 호출해 웹 검색
2. 합성 노드가 처리할 원시 결과 반환
반환 딕셔너리:
{
"results": list[dict], # 원시 Serper 결과 [{title, url, content}]
"query": str, # Serper에 보낸 쿼리
}
"""
# ── 1. 웹 검색 ──────────────────────────────────────────────────────────────
try:
if not SERPER_API_KEY:
return {
"results": [],
"query": user_question,
}
url = "https://google.serper.dev/search"
headers = {
"X-API-KEY": SERPER_API_KEY,
"Content-Type": "application/json"
}
payload = {"q": user_question, "num": 5}
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code != 200:
return {
"results": [],
"query": user_question,
}
data = response.json()
raw_results = []
# 유기적 결과 추출
for item in data.get("organic", [])[:5]:
raw_results.append({
"title": item.get("title"),
"url": item.get("link"),
"content": item.get("snippet")
})
except Exception as exc:
print(f"[web_search_tool] 오류: {exc}")
return {
"results": [],
"query": user_question,
}
return {
"results": raw_results,
"query": user_question,
}
7. 에이전트 흐름 정의
에이전트 흐름 정의에는 LangGraph를 사용한다.
7.1 에이전트 상태 정의
파이프라인의 모든 노드에 걸쳐 사용자 입력, 라우팅 결정, 도구 출력, 최종 응답을 전달하는 공유 LangGraph 상태 객체를 정의한다.
# agent/state.py
# ─────────────────────────────────────────────────────────────────────────────
# LangGraph 상태 정의.
# 모든 필드는 그래프를 통해 흐른다. 노드는 필요한 것을 읽고
# 업데이트하는 키만 반환한다.
# ─────────────────────────────────────────────────────────────────────────────
from typing import TypedDict, Optional, Any
class AgentState(TypedDict):
# ── 입력 ────────────────────────────────────────────────────────────────────
user_message: str
conversation_history: list[dict] # [{"role": "user"|"assistant", "content": str}]
# ── 라우팅 ─────────────────────────────────────────────────────────────────
route: str # "sql" | "rag" | "web_search"
route_reason: str # 라우터 LLM의 설명
# ── 도구 출력 (턴당 하나만 채워짐) ──────────────────────────────────────────
sql_result: Optional[dict] # {sql, rows, table_md, error}
rag_result: Optional[dict] # {answer, chunks, sources}
web_search_result: Optional[dict] # {answer, results, query}
# ── 최종 합성 답변 ──────────────────────────────────────────────────────────
final_answer: str
# ── 턴 메타데이터 ───────────────────────────────────────────────────────────
turn_number: int
7.2 라우터
LLM 기반 분류기를 사용해 각 사용자 쿼리를 적절한 경로(SQL, RAG, 또는 웹 검색)로 라우팅하고 결정에 대한 이유를 제공한다.
@trace(span_type="PARSER", model="gpt-4o-mini") 데코레이터와 함수 내 스팬 span = mlflow.get_current_active_span()을 로깅에 추가한 것을 주목하자.
# agent/router.py
# ─────────────────────────────────────────────────────────────────────────────
# 라우터: GPT-4o-mini를 사용해 모든 사용자 메시지를 세 가지
# 라우트 중 하나로 분류한 후 적절한 도구 노드로 넘긴다.
#
# 라우트:
# sql → 로컬 SQLite 이커머스 데이터베이스 쿼리
# rag → 로드된 PDF 문서 검색
# web_search → Tavily를 통한 실시간 인터넷 검색
# ─────────────────────────────────────────────────────────────────────────────
import json
import re
import mlflow
from openai import OpenAI
from config import LLM_MODEL, OPENAI_API_KEY, ROUTES
from observability import trace # 관측 가능성 임포트
client = OpenAI(api_key=OPENAI_API_KEY)
ROUTER_SYSTEM = """\
당신은 이커머스 분석 어시스턴트의 라우팅 에이전트입니다.
사용자 메시지와 대화 기록을 보고 어떤 도구를 사용할지 결정해야 합니다.
사용 가능한 도구:
sql — 데이터베이스에 있는 데이터를 묻는 질문에 사용:
주문, 주문 항목, 고객, 결제, 상품, 매출,
집계, 트렌드, 특정 주문/고객, 구조화된 이커머스 데이터에 대한 SQL 스타일 쿼리.
rag — 업로드된 문서의 정보를 묻는 질문에 사용:
정책, 매뉴얼, 문서, FAQ, 사용자가 PDF로 업로드한 모든 것.
PDF가 로드되지 않은 경우에도 적용됨.
web_search — 다음과 같은 질문에 사용:
• 현재 이벤트, 뉴스, 또는 실시간 정보
• 일반 이커머스 산업 트렌드 또는 벤치마크
• 데이터베이스 또는 PDF 문서로 답할 수 없는 질문
• "X가 무엇인가", "X는 어떻게 작동하나", 또는 일반 지식 질문
출력 형식 (유효한 JSON만 응답하고 다른 텍스트는 없음):
{
"route": "<sql|rag|web_search>",
"reason": "<한 문장 설명>"
}
"""
@trace(span_type="PARSER", model="gpt-4o-mini")
def route_question(user_message: str, conversation_history: list[dict]) -> dict:
"""
user_message를 ROUTES 중 하나로 분류한다.
반환:
{
"route": str, # "sql", "rag", "web_search" 중 하나
"reason": str, # 라우팅 근거
}
"""
messages = [{"role": "system", "content": ROUTER_SYSTEM}]
# 간단한 대화 컨텍스트 추가 (최근 4개 메시지 = 2턴)
for msg in conversation_history[-4:]:
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": user_message})
# 시스템 및 사용자 프롬프트를 스팬 입력으로 캡처
span = mlflow.get_current_active_span()
if span:
span.set_inputs({
"system_prompt": ROUTER_SYSTEM,
"user_prompt": user_message,
"conversation_context": conversation_history[-4:]
})
response = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
temperature=0.0,
max_tokens=128,
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content.strip()
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
# 폴백: 원시 텍스트에서 라우트 추출
for route in ROUTES:
if route in raw.lower():
return {"route": route, "reason": "잘못된 JSON에서 추출됨"}
return {"route": "web_search", "reason": "라우터 출력을 파싱할 수 없음"}
route = parsed.get("route", "web_search").lower().strip()
if route not in ROUTES:
route = "web_search"
return {
"route": route,
"reason": parsed.get("reason", ""),
}
7.3 LangGraph 노드
nodes.py 모듈은 파이프라인의 핵심 실행 로직을 정의한다. 각 노드는 라우팅, 도구 호출, 응답 합성 등 특정 단계를 수행하면서 그래프를 통해 상태를 전달한다.
router_node():route_question()을 호출해 sql, rag, 또는 web_search를 선택한다.sql_node():run_sql_tool()을 호출한다.rag_node():run_rag_tool()을 호출한다.web_search_node():run_web_search_tool()을 호출한다.synthesise_node(): LLM으로 최종 답변을 작성한다.update_history_node(): 기록에 턴을 추가한다.
# agent/nodes.py
# ─────────────────────────────────────────────────────────────────────────────
# LangGraph 노드 함수. 각 노드는 전체 AgentState 딕셔너리를 받고
# 업데이트하는 키만 포함하는 부분 딕셔너리를 반환한다.
# ─────────────────────────────────────────────────────────────────────────────
import mlflow
from openai import OpenAI
from agent.router import route_question
from tools.sql_tool import run_sql_tool
from tools.rag_tool import run_rag_tool
from tools.web_search_tool import run_web_search_tool
from config import LLM_MODEL, LLM_TEMPERATURE, LLM_MAX_TOKENS, OPENAI_API_KEY
from observability import trace, set_attrs # 관측 가능성 유틸리티 임포트
client = OpenAI(api_key=OPENAI_API_KEY)
# ─────────────────────────────────────────────────────────────────────────────
# 노드 1 — 라우터
# ─────────────────────────────────────────────────────────────────────────────
def router_node(state: dict) -> dict:
"""
사용자 메시지를 분류하고 state["route"]를 설정한다.
"""
routing = route_question(
user_message=state["user_message"],
conversation_history=state["conversation_history"],
)
print(f"\n[router] → {routing['route'].upper()} | {routing['reason']}")
return {
"route": routing["route"],
"route_reason": routing["reason"],
}
# ─────────────────────────────────────────────────────────────────────────────
# 노드 2a — SQL 도구 노드
# ─────────────────────────────────────────────────────────────────────────────
def sql_node(state: dict) -> dict:
"""
SQL 쿼리를 생성하고 실행하며 원시 결과를 상태에 저장한다.
"""
print("[sql_node] SQL 생성 및 실행 중 …")
result = run_sql_tool(
user_question=state["user_message"],
conversation_history=state["conversation_history"],
)
print(f"[sql_node] SQL: {result['sql']}")
if result["error"]:
print(f"[sql_node] 오류: {result['error']}")
else:
print(f"[sql_node] {len(result['rows'])}행 반환됨.")
return {"sql_result": result}
# ─────────────────────────────────────────────────────────────────────────────
# 노드 2b — RAG 도구 노드
# ─────────────────────────────────────────────────────────────────────────────
def rag_node(state: dict) -> dict:
"""
관련 PDF 청크를 검색하고 답변을 합성한다.
"""
print("[rag_node] 문서 청크 검색 중 …")
result = run_rag_tool(
user_question=state["user_message"],
conversation_history=state["conversation_history"],
)
print(f"[rag_node] {len(result['chunks'])}개 청크 검색됨, 소스: {result['sources']}")
return {"rag_result": result}
# ─────────────────────────────────────────────────────────────────────────────
# 노드 2c — 웹 검색 도구 노드
# ─────────────────────────────────────────────────────────────────────────────
def web_search_node(state: dict) -> dict:
"""
Tavily 검색을 실행하고 답변을 합성한다.
"""
print("[web_node] 웹 검색 중 …")
result = run_web_search_tool(
user_question=state["user_message"],
conversation_history=state["conversation_history"],
)
print(f"[web_node] 쿼리 '{result['query']}'에 대한 결과 {len(result['results'])}개")
return {"web_search_result": result}
# ─────────────────────────────────────────────────────────────────────────────
# 노드 3 — 답변 합성기
# ─────────────────────────────────────────────────────────────────────────────
_SYNTHESISE_SYSTEM = """\
당신은 친절하고 정확한 이커머스 분석 어시스턴트입니다.
사용자에게 명확하고 잘 형식화된 최종 답변을 제공하는 것이 당신의 역할입니다.
지침:
- 간결하되 완전하게 답하세요.
- SQL 결과에서 데이터가 온 경우, 핵심 수치를 먼저 요약한 다음 설명하세요.
- 문서에서 데이터가 온 경우, 출처를 인용하세요.
- 웹 검색에서 데이터가 온 경우, URL을 언급하세요.
- 가독성을 높이는 경우 마크다운 형식을 사용하세요 (글머리 기호, 굵은 글씨, 표).
- 도움이 되고 전문적인 어조를 유지하세요.
"""
@trace(span_type="CHAIN", model="gpt-4o-mini") # 추적 추가
def synthesise_node(state: dict) -> dict:
"""
도구 출력을 GPT-4o-mini를 사용해 세련된 최종 답변으로 결합한다.
- SQL: 원시 행에서 내러티브 합성
- RAG: 검색된 문서 청크에서 답변 합성
- 웹 검색: 웹 검색 결과에서 답변 합성
"""
route = state.get("route", "")
# ── SQL: 도구가 원시 행을 반환 — 내러티브 합성 ────────────────────────────
if route == "sql":
sql_result = state.get("sql_result", {})
sql_query = sql_result.get("sql", "")
table_md = sql_result.get("table_md", "_결과 없음_")
error = sql_result.get("error")
if error:
content = (
f"SQL 쿼리에서 오류가 발생했습니다:\n\n"
f"```sql\n{sql_query}\n```\n\n"
f"오류: {error}\n\n"
"질문을 다시 표현해 보세요."
)
else:
content = (
f"실행된 SQL 쿼리:\n```sql\n{sql_query}\n```\n\n"
f"결과:\n{table_md}"
)
messages = [{"role": "system", "content": _SYNTHESISE_SYSTEM}]
messages.extend(state["conversation_history"][-4:])
user_content = (
f"원래 질문: {state['user_message']}\n\n"
f"쿼리 출력:\n{content}\n\n"
"이 결과에 대한 명확하고 비즈니스 친화적인 요약을 제공하세요."
)
messages.append({"role": "user", "content": user_content})
# 프롬프트를 스팬 입력으로 캡처
span = mlflow.get_current_active_span()
if span:
span.set_inputs({
"system_prompt": _SYNTHESISE_SYSTEM,
"user_prompt": user_content,
"synthesis_type": "sql",
"user_message": state['user_message']
})
response = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
temperature=LLM_TEMPERATURE,
max_tokens=LLM_MAX_TOKENS,
)
answer = response.choices[0].message.content.strip()
# ── RAG: 검색된 문서 청크에서 답변 합성 ─────────────────────────────────
elif route == "rag":
rag_result = state.get("rag_result", {})
chunks = rag_result.get("chunks", [])
sources = rag_result.get("sources", [])
if not chunks:
answer = "로드된 문서에서 관련 정보를 찾을 수 없습니다."
else:
# 검색된 청크로 컨텍스트 블록 구성
context_parts = []
for i, chunk in enumerate(chunks, 1):
context_parts.append(
f"[발췌 {i} — {chunk['source']} (점수: {chunk['score']:.3f})]:\n"
f"{chunk['text']}"
)
context = "\n\n---\n\n".join(context_parts)
messages = [{"role": "system", "content": _SYNTHESISE_SYSTEM}]
messages.extend(state["conversation_history"][-4:])
user_content = (
f"문서 발췌:\n\n{context}\n\n"
f"질문: {state['user_message']}"
)
messages.append({"role": "user", "content": user_content})
# 프롬프트를 스팬 입력으로 캡처
span = mlflow.get_current_active_span()
if span:
span.set_inputs({
"system_prompt": _SYNTHESISE_SYSTEM,
"user_prompt": user_content,
"synthesis_type": "rag",
"retrieval_sources": ", ".join(sources) if sources else "없음",
"user_message": state['user_message']
})
response = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
temperature=LLM_TEMPERATURE,
max_tokens=LLM_MAX_TOKENS,
)
answer = response.choices[0].message.content.strip()
if sources:
answer += f"\n\n_출처: {', '.join(sources)}_"
# ── 웹 검색: 검색 결과에서 답변 합성 ──────────────────────────────────────
elif route == "web_search":
web_result = state.get("web_search_result", {})
results = web_result.get("results", [])
query = web_result.get("query", "")
if not results:
answer = "웹 검색에서 쿼리에 대한 결과가 반환되지 않았습니다."
else:
# 컨텍스트용 결과 형식화
result_blocks = []
for i, r in enumerate(results, 1):
block = (
f"[결과 {i}]\n"
f"제목: {r.get('title', 'N/A')}\n"
f"URL: {r.get('url', 'N/A')}\n"
f"요약: {r.get('content', '')}"
)
result_blocks.append(block)
formatted_results = "\n\n".join(result_blocks)
messages = [{"role": "system", "content": _SYNTHESISE_SYSTEM}]
messages.extend(state["conversation_history"][-4:])
user_content = (
f"'{query}'에 대한 웹 검색 결과:\n\n{formatted_results}\n\n"
f"원래 질문: {state['user_message']}\n\n"
"이 결과에서 명확하고 정확한 답변을 합성하세요. "
"사용자가 확인할 수 있도록 항상 소스 URL을 언급하세요."
)
messages.append({"role": "user", "content": user_content})
# 프롬프트를 스팬 입력으로 캡처
span = mlflow.get_current_active_span()
if span:
span.set_inputs({
"system_prompt": _SYNTHESISE_SYSTEM,
"user_prompt": user_content,
"synthesis_type": "web_search",
"web_search_query": query,
"user_message": state['user_message']
})
response = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
temperature=LLM_TEMPERATURE,
max_tokens=LLM_MAX_TOKENS,
)
answer = response.choices[0].message.content.strip()
else:
answer = "질문에 어떻게 답해야 할지 결정할 수 없었습니다."
print(f"[synthesise_node] 답변 준비 완료 ({len(answer)}자).")
return {"final_answer": answer}
# ─────────────────────────────────────────────────────────────────────────────
# 노드 4 — 기록 업데이터
# ─────────────────────────────────────────────────────────────────────────────
def update_history_node(state: dict) -> dict:
"""
현재 턴을 conversation_history에 추가한다.
(턴 번호는 session.ask()에서 이미 증가됨)
"""
history = list(state["conversation_history"])
history.append({"role": "user", "content": state["user_message"]})
history.append({"role": "assistant", "content": state["final_answer"]})
return {
"conversation_history": history,
"turn_number": state.get("turn_number", 0),
}
7.4 LangGraph 그래프
graph.py 모듈은 전체 LangGraph 워크플로우를 정의하며, 모든 노드를 조건부 실행 그래프로 연결하여 사용자 쿼리를 SQL, RAG, 또는 웹 검색 경로를 통해 동적으로 라우팅한 후 최종 응답을 합성한다.
# agent/graph.py
# ─────────────────────────────────────────────────────────────────────────────
# LangGraph 그래프 정의.
#
# 그래프 토폴로지:
#
# [START]
# │
# ▼
# router_node
# │
# ├── "sql" ──▶ sql_node ──┐
# ├── "rag" ──▶ rag_node ──┤
# └── "web_search" ──▶ web_search_node ──┤
# │
# ▼
# synthesise_node
# │
# ▼
# update_history_node
# │
# [END]
# ─────────────────────────────────────────────────────────────────────────────
from langgraph.graph import StateGraph, END
from agent.state import AgentState
from agent.nodes import (
router_node,
sql_node,
rag_node,
web_search_node,
synthesise_node,
update_history_node,
)
def _route_selector(state: dict) -> str:
"""조건부 엣지: state["route"]를 읽고 대상 노드명을 반환한다."""
route = state.get("route", "web_search")
return {
"sql": "sql_node",
"rag": "rag_node",
"web_search": "web_search_node",
}.get(route, "web_search_node")
def build_graph() -> StateGraph:
"""
LangGraph 라우팅 에이전트를 조립하고 컴파일한다.
호출 준비가 된 컴파일된 그래프를 반환한다.
"""
graph = StateGraph(AgentState)
# ── 노드 등록 ─────────────────────────────────────────────────────────────
graph.add_node("router_node", router_node)
graph.add_node("sql_node", sql_node)
graph.add_node("rag_node", rag_node)
graph.add_node("web_search_node", web_search_node)
graph.add_node("synthesise_node", synthesise_node)
graph.add_node("update_history_node", update_history_node)
# ── 진입점 ────────────────────────────────────────────────────────────────
graph.set_entry_point("router_node")
# ── 조건부 라우팅: 라우터 → 세 가지 도구 노드 중 하나 ──────────────────────
graph.add_conditional_edges(
source="router_node",
path=_route_selector,
path_map={
"sql_node": "sql_node",
"rag_node": "rag_node",
"web_search_node": "web_search_node",
},
)
# ── 모든 도구 노드가 합성으로 이어짐 ─────────────────────────────────────
for tool_node in ("sql_node", "rag_node", "web_search_node"):
graph.add_edge(tool_node, "synthesise_node")
# ── 선형 마지막 부분 ──────────────────────────────────────────────────────
graph.add_edge("synthesise_node", "update_history_node")
graph.add_edge("update_history_node", END)
return graph.compile()
def save_graph_visualization(compiled_graph, filepath: str = "agent_graph.png") -> None:
"""
컴파일된 그래프를 Mermaid PNG 시각화로 저장한다.
Args:
compiled_graph: build_graph()의 컴파일된 StateGraph
filepath: PNG 파일 저장 경로 (기본값: agent_graph.png)
"""
try:
graph_obj = compiled_graph.get_graph()
png_data = graph_obj.draw_mermaid_png()
with open(filepath, "wb") as f:
f.write(png_data)
print(f"그래프 시각화가 {filepath}에 저장되었습니다.")
except Exception as e:
print(f"그래프 시각화 저장 오류: {e}")

8. setup.py 정의 및 실행
설정 스크립트는 세 가지 작업을 수행한다.
- Kaggle 이커머스 데이터셋 다운로드
- CSV 파일을 SQLite로 수집
- PDF에서 FAISS 인덱스 빌드
전체 setup.py:
#!/usr/bin/env python3
# setup.py
# ─────────────────────────────────────────────────────────────────────────────
# 에이전트를 시작하기 전에 한 번만 실행한다.
#
# 수행 작업:
# 1. Kaggle 데이터셋 다운로드 (bytadit/ecommerce-order-dataset)
# 2. train/ CSV를 로컬 SQLite 데이터베이스로 로드
# 3. pdf_docs/의 모든 PDF에서 FAISS 벡터 인덱스 빌드
#
# 사용법:
# python setup.py
# python setup.py --force # DB/인덱스가 이미 존재해도 재수집
# ─────────────────────────────────────────────────────────────────────────────
import argparse
import os
import sys
# 직접 실행 시 프로젝트 루트가 경로에 있도록 보장
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from data_loader import download_and_ingest, list_tables
from tools.rag_tool import build_index
from config import PDF_DIR, DB_PATH, FAISS_INDEX_DIR
def main(force: bool = False) -> None:
print("=" * 60)
print(" 이커머스 에이전트 — 설정")
print("=" * 60)
# ── 1단계: SQLite DB ──────────────────────────────────────────────────────
print("\n[1/2] Kaggle 데이터셋을 SQLite로 수집 중 …")
download_and_ingest(force=force)
print(f" DB의 테이블: {list_tables()}")
# ── 2단계: FAISS 인덱스 ───────────────────────────────────────────────────
print(f"\n[2/2] '{PDF_DIR}'의 PDF에서 FAISS 인덱스 빌드 중 …")
os.makedirs(PDF_DIR, exist_ok=True)
pdf_count = len([f for f in os.listdir(PDF_DIR) if f.lower().endswith(".pdf")])
if pdf_count == 0:
print(f" {PDF_DIR}에서 PDF를 찾을 수 없습니다.")
print(" PDF 파일을 추가하고 다시 실행하세요: python setup.py --force")
print(" SQL과 웹 검색 라우트에 대해서는 에이전트가 계속 작동합니다.")
build_index(force=force)
print("\n" + "=" * 60)
print(" 설정 완료! 다음 명령으로 에이전트를 실행하세요: python main.py")
print("=" * 60 + "\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="이커머스 에이전트 설정")
parser.add_argument(
"--force", action="store_true",
help="DB/인덱스가 이미 존재해도 데이터를 재수집하고 FAISS 인덱스를 재빌드합니다."
)
args = parser.parse_args()
main(force=args.force)
터미널에서 다음을 실행한다.
python setup.py
이 명령은 레포지토리에 data/ecommerce.db 데이터셋과 벡터 DB용 data/faiss_index/index.faiss를 생성한다.
9. main.py
main.py 파일은 애플리케이션의 진입점 역할을 하며, 세션을 초기화하고 필요한 설정을 검증하면서 GenAI 에이전트와 상호작용하기 위한 대화형 CLI와 데모 모드를 제공한다.
- SQL, RAG, 웹 검색 경로를 테스트하기 위한 대화형 및 스크립트 데모 모드 지원
- LangGraph 파이프라인을 위한 세션 관리, 환경 검증, 그래프 시각화 처리
#!/usr/bin/env python3
# main.py
# ─────────────────────────────────────────────────────────────────────────────
# 이커머스 라우팅 에이전트를 위한 대화형 CLI.
#
# 사전 요구사항:
# 1. python setup.py (데이터 다운로드 + 인덱스 빌드)
# 2. export OPENAI_API_KEY=...
# 3. export SERPER_API_KEY=...
#
# 사용법:
# python main.py # 대화형 모드
# python main.py --demo # 스크립트 멀티턴 데모 실행
# ─────────────────────────────────────────────────────────────────────────────
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from session import EcommerceSession
from config import OPENAI_API_KEY, SERPER_API_KEY
from agent.graph import save_graph_visualization
def _check_env() -> None:
missing = []
if not OPENAI_API_KEY:
missing.append("OPENAI_API_KEY")
if not SERPER_API_KEY:
missing.append("SERPER_API_KEY")
if missing:
print(f"[오류] 누락된 환경 변수: {', '.join(missing)}")
print("실행 전에 설정하세요:")
for var in missing:
print(f" export {var}=your_key_here")
sys.exit(1)
def run_demo(session: EcommerceSession) -> None:
"""
세 가지 라우팅 경로를 모두 포함하는 스크립트 멀티턴 데모를 실행한다.
"""
demo_turns = [
# ── SQL 쿼리 ──────────────────────────────────────────────────────────
"성공적으로 배송된 주문은 몇 개인가?",
# ── 웹 검색 ────────────────────────────────────────────────────────────
"2024년 브라질의 최신 이커머스 트렌드는?",
# ── RAG (PDF 문서) ─────────────────────────────────────────────────────
"전자제품에 대한 반품 정책은 어떻게 되나?",
]
print("\n" + "═" * 60)
print(" 데모 모드 — 스크립트 멀티턴 대화")
print("═" * 60)
for i, question in enumerate(demo_turns, 1):
print(f"\n{'─'*60}")
print(f"[턴 {i}] 사용자: {question}")
print("─" * 60)
answer = session.ask(question)
print(f"\n어시스턴트:\n{answer}")
print("\n" + "═" * 60)
print(" 데모 완료.")
print("═" * 60)
session.print_history()
def run_interactive(session: EcommerceSession) -> None:
"""
REPL 스타일의 대화형 세션.
"""
print("\n" + "═" * 60)
print(" 이커머스 분석 에이전트")
print(" 라우트: SQL | RAG (PDF) | 웹 검색")
print(" 'quit' 또는 'exit'를 입력하면 종료됩니다.")
print(" 'reset'을 입력하면 대화 기록을 지웁니다.")
print(" 'history'를 입력하면 전체 대화를 출력합니다.")
print("═" * 60 + "\n")
while True:
try:
user_input = input("사용자: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n종료합니다!")
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
print("종료합니다!")
break
if user_input.lower() == "reset":
session.reset()
continue
if user_input.lower() == "history":
session.print_history()
continue
print()
answer = session.ask(user_input)
print(f"\n어시스턴트:\n{answer}\n")
def main() -> None:
_check_env()
parser = argparse.ArgumentParser(description="이커머스 라우팅 에이전트")
parser.add_argument(
"--demo", action="store_true",
help="대화형 모드 대신 스크립트 멀티턴 데모를 실행합니다."
)
args = parser.parse_args()
session = EcommerceSession()
print(f"[main] 세션 ID: {session.session_id}")
# 그래프 시각화 저장
save_graph_visualization(session.graph)
if args.demo:
run_demo(session)
else:
run_interactive(session)
if __name__ == "__main__":
main()
10. 애플리케이션 실행
먼저 터미널에서 아래 명령어로 MLflow를 활성화한다.
mlflow server --host 0.0.0.0 --port 5001
이 명령은 http://127.0.0.1:5001/에서 MLflow UI를 연다.

그 다음 터미널에서 아래 명령어를 실행한다.
python main.py --demo
이 명령은 3개의 쿼리를 자동으로 실행하고 모든 결과를 MLflow에 로깅한다. 각 요청이 MLflow UI에서 완전히 추적 가능하며, 모든 결정, 도구 호출, LLM 상호작용이 노출된다.
[턴 1] 사용자: 성공적으로 배송된 주문은 몇 개인가?
→ SQL
[턴 2] 사용자: 2024년 브라질의 최신 이커머스 트렌드는?
→ 웹 검색
[턴 3] 사용자: 전자제품에 대한 반품 정책은 어떻게 되나?
→ RAG
실제 터미널 출력:
Graph visualization saved to agent_graph.png
════════════════════════════════════════════════════════════
DEMO MODE — Scripted Multi-Turn Conversation
════════════════════════════════════════════════════════════
────────────────────────────────────────────────────────────
[Turn 1] USER: How many orders were delivered successfully?
────────────────────────────────────────────────────────────
[router] → SQL | The question asks for a specific count of orders, which is data that resides in the database.
[sql_node] Generating and executing SQL …
[sql_node] SQL: SELECT COUNT(o.order_id) AS delivered_orders_count
FROM orders AS o
WHERE o.order_status = 'delivered'
LIMIT 50;
[sql_node] 1 rows returned.
[synthesise_node] Answer ready (279 chars).
ASSISTANT:
### Summary of Delivered Orders
The total number of orders that were successfully delivered is **87,428**.
This figure indicates the volume of completed transactions where the order status is marked as 'delivered'. If you need further insights or breakdowns, feel free to ask!
────────────────────────────────────────────────────────────
[Turn 2] USER: What are the latest e-commerce trends in Brazil for 2024?
────────────────────────────────────────────────────────────
[router] → WEB_SEARCH | The question asks about current events and trends in the e-commerce industry, specifically for Brazil in 2024.
[web_node] Searching the web …
[web_node] 5 results for query: 'What are the latest e-commerce trends in Brazil for 2024?'
[synthesise_node] Answer ready (1621 chars).
ASSISTANT:
### Latest E-commerce Trends in Brazil for 2024
1. **Market Size and Growth**:
- Brazil's retail e-commerce market is projected to reach **$81.74 billion** in sales, making it the largest in Latin America (Source: [eMarketer](https://www.emarketer.com/content/retail-ecommerce-digital-buyer-trends-2024-brazil)).
- The market is expected to grow at a **CAGR of 30%** through 2024, indicating robust expansion (Source: [Merchant Risk Council](https://merchantriskcouncil.org/learning/resource-center/member-news/blog/2021/brazilian-ecommerce-data)).
2. **Market Share**:
- E-commerce in Brazil is anticipated to represent around **28.5%** of the total market share in the region, driven by increased digitalization and evolving consumer preferences (Source: [MBE Franchise](https://mbefranchise.com/blog/e-commerce-in-brazil-navigating-a-thriving-digital-landscape-in-2024-2/)).
3. **Future Projections**:
- Revenue in the Brazilian e-commerce market is expected to reach **$48.26 billion** by 2025, with a projected annual growth rate of **10.14%** from 2025 to 2029 (Source: [Asendia USA](https://www.asendiausa.com/hubfs/Brazil%20Ecommerce%20Market%20Insights_infographic_Asendia%20USA.pdf)).
- The B2C e-commerce market is forecasted to grow at a **CAGR of 17.7%** from 2024 to 2030 (Source: [Grand View Research](https://www.grandviewresearch.com/horizon/outlook/b2c-e-commerce-market/brazil)).
These trends highlight a thriving e-commerce landscape in Brazil, characterized by significant growth and increasing consumer engagement. For more detailed insights, you can explore the provided sources.
────────────────────────────────────────────────────────────
[Turn 3] USER: What does our return policy say about electronics?
────────────────────────────────────────────────────────────
[router] → RAG | The question asks for specific information about the return policy, which is likely contained in uploaded documents.
[rag_node] Retrieving document chunks …
[rag_tool] Loading embedding model: sentence-transformers/all-MiniLM-L6-v2 …
Loading weights: 100%|█████████████████████████████████| 103/103 [00:00<00:00, 7201.78it/s]
BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key | Status | |
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED | |
Notes:
- UNEXPECTED: can be ignored when loading from different task/architecture; not ok if you expect identical arch.
[rag_node] 4 chunks retrieved from: ['shopbr_return_policy.pdf']
[synthesise_node] Answer ready (1158 chars).
ASSISTANT:
### Return Policy for Electronics at ShopBR
According to the ShopBR return policy, the following points are highlighted regarding electronics:
1. **Quality Standards**:
- Electronics are subject to heightened quality standards due to their complexity and value.
2. **Return Eligibility**:
- Electronics can be returned if they are defective or damaged upon arrival.
- Returns for electronics that have been tampered with, misused, or physically damaged after delivery are ineligible.
3. **Replacement and Refund**:
- If an electronic item is defective or damaged on arrival, customers are eligible for a replacement or a full refund.
4. **Authorized Service Centers**:
- ShopBR partners with authorized service centers to ensure genuine replacements and professional repairs for electronics.
5. **Return and Replacement Windows**:
- Specific return and replacement windows may apply, which are detailed in the policy.
This policy is effective as of **January 1, 2024**, and complies with the Brazilian Consumer Defense Code (CDC). For more detailed information, please refer to the full document.
_Sources: shopbr_return_policy.pdf_
════════════════════════════════════════════════════════════
Demo complete.
════════════════════════════════════════════════════════════
════════════════════════════════════════════════════════════
Session fa83d5ae | 3 turns
════════════════════════════════════════════════════════════
[User]
How many orders were delivered successfully?
[Assistant]
### Summary of Delivered Orders
The total number of orders that were successfully delivered is **87,428**.
This figure indicates the volume of completed transactions where the order status is marked as 'delivered'. If you need further insights or breakdowns, feel free to ask!
[User]
What are the latest e-commerce trends in Brazil for 2024?
[Assistant]
### Latest E-commerce Trends in Brazil for 2024
1. **Market Size and Growth**:
- Brazil's retail e-commerce market is projected to reach **$81.74 billion** in sales, making it the largest in Latin America (Source: [eMarketer](https://www.emarketer.com/content/retail-ecommerce-digital-buyer-trends-2024-brazil)).
- The market is expected to grow at a **CAGR of 30%** through 2024, indicating robust expansion (Source: [Merchant Risk Council](https://merchantriskcouncil.org/learning/resource-center/member-news/blog/2021/brazilian-ecommerce-data)).
2. **Market Share**:
- E-commerce in Brazil is anticipated to represent around **28.5%** of the total market share in the region, driven by increased digitalization and evolving consumer preferences (Source: [MBE Franchise](https://mbefranchise.com/blog/e-commerce-in-brazil-navigating-a-thriving-digital-landscape-in-2024-2/)).
3. **Future Projections**:
- Revenue in the Brazilian e-commerce market is expected to reach **$48.26 billion** by 2025, with a projected annual growth rate of **10.14%** from 2025 to 2029 (Source: [Asendia USA](https://www.asendiausa.com/hubfs/Brazil%20Ecommerce%20Market%20Insights_infographic_Asendia%20USA.pdf)).
- The B2C e-commerce market is forecasted to grow at a **CAGR of 17.7%** from 2024 to 2030 (Source: [Grand View Research](https://www.grandviewresearch.com/horizon/outlook/b2c-e-commerce-market/brazil)).
These trends highlight a thriving e-commerce landscape in Brazil, characterized by significant growth and increasing consumer engagement. For more detailed insights, you can explore the provided sources.
[User]
What does our return policy say about electronics?
[Assistant]
### Return Policy for Electronics at ShopBR
According to the ShopBR return policy, the following points are highlighted regarding electronics:
1. **Quality Standards**:
- Electronics are subject to heightened quality standards due to their complexity and value.
2. **Return Eligibility**:
- Electronics can be returned if they are defective or damaged upon arrival.
- Returns for electronics that have been tampered with, misused, or physically damaged after delivery are ineligible.
3. **Replacement and Refund**:
- If an electronic item is defective or damaged on arrival, customers are eligible for a replacement or a full refund.
4. **Authorized Service Centers**:
- ShopBR partners with authorized service centers to ensure genuine replacements and professional repairs for electronics.
5. **Return and Replacement Windows**:
- Specific return and replacement windows may apply, which are detailed in the policy.
This policy is effective as of **January 1, 2024**, and complies with the Brazilian Consumer Defense Code (CDC). For more detailed information, please refer to the full document.
_Sources: shopbr_return_policy.pdf_

이것으로 MLflow가 복잡한 다단계 시스템을 투명하고, 디버깅 가능하고, 프로덕션 준비가 된 애플리케이션으로 어떻게 변환하는지 보여주는 완전히 관측 가능한 GenAI 파이프라인 구축에 관한 케이스 스터디가 완료된다.
결론
GenAI 시스템 구축은 더 이상 올바른 답변을 얻는 것만이 아니다. 그 답변이 어떻게 만들어졌는지 이해하는 것이다. 파이프라인이 검색, 추론, 외부 도구를 포함하는 다단계 에이전트 워크플로우로 진화함에 따라 전통적인 로깅은 부족해진다. 무슨 일이 일어났는지뿐만 아니라 왜 일어났는지에 대한 가시성이 필요하다.
바로 이곳에서 MLflow의 관측 가능성이 진정으로 빛난다. 스팬, 트레이스, 평가를 결합함으로써 개별 LLM 호출부터 전체 요청 생명주기와 품질 지표에 이르기까지 시스템의 완전하고 구조화된 뷰를 얻을 수 있다. Text2SQL + RAG + 웹 검색 파이프라인에서 이것은 디버깅을 추측에서 정밀하고 데이터 중심적인 프로세스로 변환한다.
궁극적으로 관측 가능성은 GenAI 시스템을 취약한 프로토타입에서 프로덕션 준비가 된 신뢰할 수 있는 애플리케이션으로 전환하는 것이다. 왜냐하면 현실 세계에서는 파이프라인이 작동하는 것만으로는 충분하지 않기 때문이다 — 어떻게 작동하는지 보고, 얼마나 잘 수행하는지 측정하고, 제대로 작동하지 않을 때 수정할 수 있어야 한다.
'최신 AI' 카테고리의 다른 글
| 개발자의 95%보다 AI를 잘 이해하게 해줄 15가지 개념 (1) | 2026.04.27 |
|---|---|
| AI Agents Explained : 지능형 시스템을 형성하는 6가지 개념 (0) | 2026.04.27 |
| 앱을 몇 분 만에 만들기: Google Stitch 3.0의 놀라운 AI 업그레이드 (0) | 2026.04.13 |
| NotebookLM × Discord 활용 가이드 – 협업과 학습의 새로운 흐름 (0) | 2026.03.12 |
| RAG (Retrieval-Augmented Generation) 완벽 가이드 (0) | 2026.03.10 |