본문 바로가기
최신 IT

로컬 데이터 분석 도구 — Python + Streamlit

by 구라100단 2026. 4. 10.

Python + Streamlit + DuckDB + Plotly로 만든 로컬 앱. CSV, Excel, SQLite, JSON, Parquet 파일을 불러오고, 스키마를 시각화하고, 관계를 자동 추천하고, SQL로 로컬 JOIN 쿼리를 실행할 수 있다.


소개

Streamlit을 탐구하는 일련의 글들 — 초기 시각 컴포넌트부터 더 정교한 대시보드까지 — 을 거치면서, 이 도구가 데이터를 다루는 사람들의 일상적인 문제를 해결하는 데 얼마나 큰 잠재력을 가지고 있는지 점점 더 분명해졌다. 그러나 그 축적된 배움을 진정으로 유용한 것, 어떤 분석가든 바로 열어서 쓸 수 있는 것으로 만들어야 했다. 그 고민에서 로컬 테이블레이터(Tabulador Local) 아이디어가 탄생했다. 다양한 형식의 파일을 불러오고, 테이블을 연결하고, SQL 쿼리를 실행하는 앱으로, 외부 서비스에 전혀 의존하지 않고 내 컴퓨터에서 모두 돌아간다.

이 프로젝트가 생겨난 배경은 데이터를 다루는 누구에게나 익숙한 상황이다. 정보는 항상 흩어져 있다. 재무팀이 이메일로 보낸 Excel 보고서, 시스템에서 내려받은 CSV 파일, 아무도 마이그레이션하지 못한 레거시 SQLite 데이터베이스, API가 뱉어낸 JSON... 이것들을 합치려면 보통 일회용 스크립트, 급조한 ETL, 아니면 라이선스·설정·학습 비용이 드는 무거운 클라우드 도구에 의존하게 된다.

로컬 테이블레이터는 이 문제를 직접적으로 해결한다. 내 컴퓨터에서 완전히 실행되고, 원격 백엔드 없이 여러 형식을 불러오고, 테이블 간 관계를 정의하고, 그 위에서 SQL 분석 쿼리를 실행할 수 있다. 쿼리 엔진은 DuckDB, 인터페이스는 Streamlit이다. 이 조합 덕분에 아키텍처 복잡도와 개발 시간이 크게 줄어든다.


왜 Python + Streamlit인가?

이전 글들을 통해 Streamlit이 내부 데이터 도구 프로젝트를 지연시키거나 불가능하게 만드는 여러 장벽을 없애준다는 게 분명해졌다. 핵심 포인트를 정리하면 다음과 같다.

  • 런타임 하나: Python. 별도의 React, Vite, Electron 프론트엔드를 관리할 필요가 없다.
  • IPC/preload 브릿지 없음. 인터페이스와 비즈니스 로직 간 통신이 같은 프로세스에서 자연스럽게 이뤄진다.
  • TypeScript 빌드 없음. 개발 사이클이 즉각적이다 — 저장하면 바로 결과가 보인다.
  • 네이티브 데이터 생태계. pandas, openpyxl, sqlite3, plotly — 별도 수정 없이 모두 사용 가능하다.

물론 트레이드오프가 있다. 실행 파일로 패키징된 데스크탑 앱 대신, 브라우저에서 localhost로 실행된다. 개인 사용이나 소규모 팀에는 충분하고, 바이너리 배포·업데이트의 번거로움도 없다.


이 글의 목표

이 가이드는 처음부터 프로젝트를 그대로 따라 만들 수 있는 완전한 단계별 안내서다. 다음 내용을 다룬다.

  • MVP의 기술 아키텍처
  • 프로젝트 폴더 구조
  • 각 모듈의 전체 소스 코드
  • 앱 실행 방법
  • 자동 생성된 예시 데이터로 테스트하는 방법

읽고 나면 각 결정의 이유를 이해할 뿐 아니라, 내 컴퓨터에서 실제로 돌아가는 프로젝트를 갖게 된다.


기술 스택

  • Python + Streamlit — 인터페이스
  • DuckDB — 로컬 SQL 쿼리 엔진
  • Pandas — 데이터 처리
  • Plotly — 인터랙티브 차트
  • openpyxl — Excel 파일 처리

최종 프로젝트 구조

tabulador-local/
├── app.py
├── core/
│   ├── __init__.py
│   ├── engine.py
│   ├── sql_builder.py
│   └── suggestions.py
├── ui/
│   ├── __init__.py
│   ├── sidebar.py
│   ├── importer.py
│   ├── source_preview.py
│   ├── relations.py
│   ├── column_selector.py
│   ├── filter_builder.py
│   ├── query_editor.py
│   ├── result_grid.py
│   ├── chart_panel.py
│   ├── query_history.py
│   ├── postgres_connector.py
│   ├── project_actions.py
│   └── relation_suggestions.py
├── scripts/
│   └── generate_demo_data.py
├── sample_data/
├── data/
├── .streamlit/
│   └── config.toml
├── requirements.txt
└── ARTIGO_TABULADOR_LOCAL_MVP.md

core/ui/를 분리한 건 의도적이다. core/ 디렉터리에는 데이터 로직 전체 — 불러오기, SQL 생성, 관계 추천 — 가 들어 있으며 Streamlit에 대한 의존이 전혀 없다. ui/는 인터페이스만 담당한다. 이 분리 덕분에 테스트·유지보수가 쉽고, 나중에 표현 계층을 교체하더라도 비즈니스 로직을 다시 쓸 필요가 없다.


1부 — 설치 및 환경 설정

1) requirements.txt

streamlit>=1.38.0
duckdb>=1.1.0
pandas>=2.2.0
plotly>=5.24.0
openpyxl>=3.1.0

2) .streamlit/config.toml

Streamlit은 설정 파일을 통해 앱의 시각적 테마를 커스터마이징할 수 있다. 여기서는 긴 분석 세션에서 눈의 피로를 줄이고 Plotly 다크 모드 차트와도 잘 어울리는 네이비 계열 다크 테마를 선택했다.

[theme]
primaryColor = "#06b6d4"
backgroundColor = "#0f172a"
secondaryBackgroundColor = "#1e293b"
textColor = "#e2e8f0"
font = "sans serif"

[server]
maxUploadSize = 500

maxUploadSize = 500은 최대 500MB 업로드를 허용하며, 별도 설정 없이 웬만한 크기의 Excel 및 Parquet 파일을 수용한다.

3) 설치 및 실행

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
streamlit run app.py

마지막 명령을 실행하면 Streamlit이 자동으로 브라우저를 열어 http://localhost:8501로 이동한다. 이제 테이블레이터를 사용할 준비가 끝났다.


2부 — 데이터 핵심 (core/)

core/ 디렉터리는 테이블레이터의 심장부다. 세 가지 명확한 역할을 집중적으로 담당한다. 데이터 엔진(불러오기, 쿼리, 내보내기), SQL 빌더(쿼리 프로그래밍 방식 생성), 그리고 추천 모듈(테이블 간 관계를 찾는 휴리스틱)이다.

core/__init__.py

빈 파일이다 — Python이 디렉터리를 패키지로 인식하게 해주는 역할만 한다.

core/engine.py

TabuladorEngine은 프로젝트의 핵심 클래스다. DuckDB와의 모든 상호작용을 캡슐화한다. 불러온 파일에서 테이블을 생성하고, 쿼리를 실행하고, 결과를 내보낸다. 각 파일 형식마다 고유한 불러오기 메서드가 있으며, 모두 동일한 내부 구조 — SQL로 조회 가능한 DuckDB 테이블 — 로 수렴된다.

중요한 점은 테이블 이름 처리다. 이름이 사용자 파일에서 오기 때문에 (한글, 공백, 특수문자가 포함될 수 있다), sanitize_identifier 메서드가 모든 이름을 ASCII 영숫자와 밑줄로 정규화해 SQL 문법 오류를 방지한다.

from __future__ import annotations
import os
import re
import tempfile
import unicodedata
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import duckdb
import pandas as pd

@dataclass
class SourceColumn:
    name: str
    type: str

@dataclass
class DataSource:
    table_name: str
    file_path: str
    source_type: str  # 'csv' | 'xlsx' | 'sqlite' | 'json' | 'parquet' | 'postgres'
    columns: list[SourceColumn] = field(default_factory=list)
    sheet_name: Optional[str] = None

class TabuladorEngine:
    """DuckDB와의 모든 상호작용을 캡슐화한다."""
    def __init__(self, db_path: str = "data/tabulador.duckdb"):
        base_dir = os.path.dirname(db_path) or "."
        os.makedirs(base_dir, exist_ok=True)
        self.conn = duckdb.connect(db_path)

    @staticmethod
    def sanitize_identifier(name: str) -> str:
        """SQL에 안전한 테이블 이름을 생성한다."""
        nfkd = unicodedata.normalize("NFKD", name)
        ascii_only = nfkd.encode("ascii", "ignore").decode("ascii")
        clean = re.sub(r"[^a-zA-Z0-9_]", "_", ascii_only)
        clean = re.sub(r"_+", "_", clean).strip("_").lower()
        return clean or "tabela"

    def _describe_table(self, table_name: str) -> list[SourceColumn]:
        result = self.conn.execute(f"DESCRIBE {table_name}").fetchall()
        return [SourceColumn(name=row[0], type=row[1]) for row in result]

    def list_tables(self) -> list[str]:
        rows = self.conn.execute("SHOW TABLES").fetchall()
        return [r[0] for r in rows]

    def _rename_temp_import_table(self, old_table: str, new_table: str) -> None:
        """임시 테이블을 최종 이름으로 변경한다. 필요시 기존 테이블을 덮어쓴다."""
        if old_table == new_table:
            return
        self.conn.execute(f"DROP TABLE IF EXISTS {new_table}")
        self.conn.execute(f"ALTER TABLE {old_table} RENAME TO {new_table}")

    def import_csv(self, file_path: str) -> DataSource:
        table = self.sanitize_identifier(Path(file_path).stem)
        sql_path = file_path.replace("\\", "/")
        self.conn.execute(
            f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT * FROM read_csv_auto('{sql_path}')
            """
        )
        return DataSource(
            table_name=table,
            file_path=file_path,
            source_type="csv",
            columns=self._describe_table(table),
        )

    def import_csv_from_upload(self, uploaded_file) -> DataSource:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
        tmp.write(uploaded_file.getvalue())
        tmp.close()
        source = self.import_csv(tmp.name)
        source.file_path = uploaded_file.name
        new_table = self.sanitize_identifier(Path(uploaded_file.name).stem)
        old_table = source.table_name
        self._rename_temp_import_table(old_table, new_table)
        source.table_name = new_table
        source.columns = self._describe_table(new_table)
        os.unlink(tmp.name)
        return source

    def list_excel_sheets(self, file_path: str) -> list[str]:
        import openpyxl
        wb = openpyxl.load_workbook(file_path, read_only=True)
        names = wb.sheetnames
        wb.close()
        return names

    def list_excel_sheets_from_upload(self, uploaded_file) -> tuple[str, list[str]]:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
        tmp.write(uploaded_file.getvalue())
        tmp.close()
        sheets = self.list_excel_sheets(tmp.name)
        return tmp.name, sheets

    def import_excel(
        self,
        file_path: str,
        sheet_name: Optional[str] = None,
        display_name: Optional[str] = None,
    ) -> DataSource:
        actual_sheet = sheet_name or "Sheet1"
        df = pd.read_excel(file_path, sheet_name=sheet_name or 0)
        base_name = display_name or Path(file_path).stem
        table = self.sanitize_identifier(f"{base_name}_{actual_sheet}")
        self.conn.execute(f"DROP TABLE IF EXISTS {table}")
        self.conn.execute(f"CREATE TABLE {table} AS SELECT * FROM df")
        return DataSource(
            table_name=table,
            file_path=display_name or file_path,
            source_type="xlsx",
            sheet_name=actual_sheet,
            columns=self._describe_table(table),
        )

    def import_json(self, file_path: str) -> DataSource:
        table = self.sanitize_identifier(Path(file_path).stem)
        sql_path = file_path.replace("\\", "/")
        self.conn.execute(
            f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT * FROM read_json_auto('{sql_path}')
            """
        )
        return DataSource(
            table_name=table,
            file_path=file_path,
            source_type="json",
            columns=self._describe_table(table),
        )

    def import_json_from_upload(self, uploaded_file) -> DataSource:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
        tmp.write(uploaded_file.getvalue())
        tmp.close()
        source = self.import_json(tmp.name)
        source.file_path = uploaded_file.name
        new_table = self.sanitize_identifier(Path(uploaded_file.name).stem)
        old_table = source.table_name
        self._rename_temp_import_table(old_table, new_table)
        source.table_name = new_table
        source.columns = self._describe_table(new_table)
        os.unlink(tmp.name)
        return source

    def import_parquet(self, file_path: str) -> DataSource:
        table = self.sanitize_identifier(Path(file_path).stem)
        sql_path = file_path.replace("\\", "/")
        self.conn.execute(
            f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT * FROM read_parquet('{sql_path}')
            """
        )
        return DataSource(
            table_name=table,
            file_path=file_path,
            source_type="parquet",
            columns=self._describe_table(table),
        )

    def import_parquet_from_upload(self, uploaded_file) -> DataSource:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
        tmp.write(uploaded_file.getvalue())
        tmp.close()
        source = self.import_parquet(tmp.name)
        source.file_path = uploaded_file.name
        new_table = self.sanitize_identifier(Path(uploaded_file.name).stem)
        old_table = source.table_name
        self._rename_temp_import_table(old_table, new_table)
        source.table_name = new_table
        source.columns = self._describe_table(new_table)
        os.unlink(tmp.name)
        return source

    def import_sqlite(self, file_path: str) -> list[DataSource]:
        sql_path = file_path.replace("\\", "/")
        self.conn.execute("INSTALL sqlite; LOAD sqlite;")
        rows = self.conn.execute(
            f"""
            SELECT name FROM sqlite_scan('{sql_path}', 'sqlite_master')
            WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
            """
        ).fetchall()
        sources: list[DataSource] = []
        for (original_name,) in rows:
            table = self.sanitize_identifier(original_name)
            self.conn.execute(
                f"""
                CREATE OR REPLACE TABLE {table} AS
                SELECT * FROM sqlite_scan('{sql_path}', '{original_name}')
                """
            )
            sources.append(
                DataSource(
                    table_name=table,
                    file_path=file_path,
                    source_type="sqlite",
                    columns=self._describe_table(table),
                )
            )
        return sources

    def import_sqlite_from_upload(self, uploaded_file) -> list[DataSource]:
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".sqlite")
        tmp.write(uploaded_file.getvalue())
        tmp.close()
        sources = self.import_sqlite(tmp.name)
        for src in sources:
            src.file_path = uploaded_file.name
        os.unlink(tmp.name)
        return sources

    def connect_postgres(
        self,
        host: str,
        port: int,
        database: str,
        user: str,
        password: str,
        schema: str = "public",
    ) -> list[str]:
        self.conn.execute("INSTALL postgres; LOAD postgres;")
        conn_str = f"host={host} port={port} dbname={database} user={user} password={password}"
        rows = self.conn.execute(
            f"""
            SELECT table_name
            FROM postgres_scan('{conn_str}', 'information_schema', 'tables')
            WHERE table_schema = '{schema}' AND table_type = 'BASE TABLE'
            ORDER BY table_name
            """
        ).fetchall()
        return [r[0] for r in rows]

    def import_postgres_table(
        self,
        host: str,
        port: int,
        database: str,
        user: str,
        password: str,
        table_name: str,
        schema: str = "public",
    ) -> DataSource:
        conn_str = f"host={host} port={port} dbname={database} user={user} password={password}"
        local_name = self.sanitize_identifier(f"pg_{table_name}")
        self.conn.execute(
            f"""
            CREATE OR REPLACE TABLE {local_name} AS
            SELECT * FROM postgres_scan('{conn_str}', '{schema}', '{table_name}')
            """
        )
        return DataSource(
            table_name=local_name,
            file_path=f"postgres://{host}:{port}/{database}",
            source_type="postgres",
            columns=self._describe_table(local_name),
        )

    def preview_table(self, table_name: str, limit: int = 50) -> pd.DataFrame:
        return self.conn.execute(f"SELECT * FROM {table_name} LIMIT {limit}").fetchdf()

    def run_query(self, sql: str) -> pd.DataFrame:
        return self.conn.execute(sql).fetchdf()

    def export_csv(self, sql: str, output_path: str) -> None:
        self.conn.execute(f"COPY ({sql}) TO '{output_path}' (HEADER, DELIMITER ',')")

    def export_parquet(self, sql: str, output_path: str) -> None:
        self.conn.execute(f"COPY ({sql}) TO '{output_path}' (FORMAT PARQUET)")

    def export_query_to_bytes(self, sql: str, fmt: str = "csv") -> bytes:
        df = self.run_query(sql)
        if fmt == "csv":
            return df.to_csv(index=False).encode("utf-8")
        if fmt == "parquet":
            import io
            buf = io.BytesIO()
            df.to_parquet(buf, index=False)
            return buf.getvalue()
        raise ValueError(f"지원하지 않는 형식: {fmt}")

core/sql_builder.py

sql_builder 모듈은 사용자가 인터페이스에서 선택한 내용(컬럼 목록, 필터, 관계)을 바탕으로 SQL을 프로그래밍 방식으로 조립한다. 유효한 SQL 문자열을 반환한다.

이 방식 덕분에 사용자가 SQL을 직접 작성하지 않아도 된다(물론 원하면 직접 쓸 수 있다). 생성된 SQL은 텍스트 편집기에 표시되어 실행 전에 수정할 수 있다 — 두 방법의 장점을 모두 취한 구조다.

from __future__ import annotations
from dataclasses import dataclass

@dataclass
class SelectedColumn:
    table: str
    column: str
    aggregate: str = ""  # '', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX', 'COUNT_DISTINCT'
    alias: str = ""

@dataclass
class Relation:
    left_table: str
    left_column: str
    right_table: str
    right_column: str
    join_type: str = "INNER"  # 'INNER', 'LEFT', 'RIGHT', 'FULL'

@dataclass
class QueryFilter:
    id: str
    table: str
    column: str
    operator: str
    value: str = ""
    value_to: str = ""
    connector: str = "AND"
    enabled: bool = True

def build_select_clause(columns: list[SelectedColumn]) -> str:
    if not columns:
        return "*"
    parts: list[str] = []
    for col in columns:
        expr = f"{col.table}.{col.column}"
        if col.aggregate == "COUNT_DISTINCT":
            expr = f"COUNT(DISTINCT {col.table}.{col.column})"
        elif col.aggregate:
            expr = f"{col.aggregate}({col.table}.{col.column})"
        if col.alias:
            expr += f" AS {col.alias}"
        parts.append(expr)
    return ",\n  ".join(parts)

def _is_number(value: str) -> bool:
    if value is None:
        return False
    text = str(value).strip()
    if not text:
        return False
    if text.lstrip("-").isdigit():
        return True
    if text.count(".") == 1 and text.replace(".", "").lstrip("-").isdigit():
        return True
    return False

def _quote_if_needed(value: str) -> str:
    return value if _is_number(value) else f"'{value}'"

def build_where_clause(filters: list[QueryFilter]) -> str:
    active = [flt for flt in filters if flt.enabled]
    if not active:
        return ""
    parts: list[str] = []
    for idx, flt in enumerate(active):
        ref = f"{flt.table}.{flt.column}"
        if flt.operator in ("IS NULL", "IS NOT NULL"):
            condition = f"{ref} {flt.operator}"
        elif flt.operator in ("IN", "NOT IN"):
            raw_values = [v.strip() for v in flt.value.split(",") if v.strip()]
            values = ", ".join(_quote_if_needed(v) for v in raw_values)
            condition = f"{ref} {flt.operator} ({values})"
        elif flt.operator == "BETWEEN":
            val_from = _quote_if_needed(flt.value)
            val_to = _quote_if_needed(flt.value_to)
            condition = f"{ref} BETWEEN {val_from} AND {val_to}"
        elif flt.operator in ("LIKE", "NOT LIKE"):
            condition = f"{ref} {flt.operator} '%{flt.value}%'"
        else:
            condition = f"{ref} {flt.operator} {_quote_if_needed(flt.value)}"
        if idx == 0:
            parts.append(condition)
        else:
            parts.append(f"{flt.connector} {condition}")
    return "WHERE " + "\n  ".join(parts)

def build_group_by_clause(columns: list[SelectedColumn]) -> str:
    has_agg = any(col.aggregate for col in columns)
    if not has_agg or not columns:
        return ""
    non_agg = [f"{col.table}.{col.column}" for col in columns if not col.aggregate]
    if not non_agg:
        return ""
    return "GROUP BY " + ", ".join(non_agg)

def build_sql(
    sources: list,
    relations: list[Relation],
    selected_columns: list[SelectedColumn],
    filters: list[QueryFilter],
    limit: int = 200,
) -> str:
    if not sources:
        return ""
    base = sources[0].table_name
    select = build_select_clause(selected_columns)
    where = build_where_clause(filters)
    group_by = build_group_by_clause(selected_columns)
    sql = f"SELECT {select}\nFROM {base}"
    for rel in relations:
        sql += (
            f"\n{rel.join_type} JOIN {rel.right_table} ON "
            f"{rel.left_table}.{rel.left_column} = {rel.right_table}.{rel.right_column}"
        )
    if where:
        sql += f"\n{where}"
    if group_by:
        sql += f"\n{group_by}"
    sql += f"\nLIMIT {limit};"
    return sql

core/suggestions.py

이 모듈은 테이블 간 관계를 자동으로 추천하는 메커니즘을 구현한다. 아이디어는 단순하다. 사용자가 여러 데이터 소스를 불러오면 시스템이 컬럼 이름에서 공통 패턴을 분석한다. _id로 끝나는 같은 이름의 컬럼, 테이블명_id 규칙을 따르는 컬럼, _code, _key, _cod 같은 비슷한 접미사를 가진 컬럼들이 탐지 대상이다.

각 추천에는 신뢰도("높음" 또는 "보통")와 이유에 대한 텍스트 설명이 붙는다. 사용자는 각 추천을 개별적으로 수락하거나 무시할 수 있어 쿼리에 사용할 관계를 완전히 통제할 수 있다.

from __future__ import annotations
from dataclasses import dataclass
from core.engine import DataSource, TabuladorEngine

@dataclass
class RelationSuggestion:
    left_table: str
    left_column: str
    right_table: str
    right_column: str
    confidence: str  # 'high' | 'medium'
    reason: str

def suggest_relations(engine: TabuladorEngine, sources: list[DataSource]) -> list[RelationSuggestion]:
    """컬럼 이름 휴리스틱으로 관계를 추천한다."""
    _ = engine
    suggestions: list[RelationSuggestion] = []
    seen: set[str] = set()
    schemas: dict[str, list[dict[str, str]]] = {}
    for src in sources:
        schemas[src.table_name] = [{"name": c.name, "type": c.type} for c in src.columns]
    tables = list(schemas.keys())
    id_suffixes = ("_id", "_code", "_key", "_cod", "_numero")
    for i in range(len(tables)):
        for j in range(i + 1, len(tables)):
            t_a, t_b = tables[i], tables[j]
            cols_a, cols_b = schemas[t_a], schemas[t_b]
            for ca in cols_a:
                for cb in cols_b:
                    key = "|".join(sorted([t_a, ca["name"], t_b, cb["name"]]))
                    if key in seen:
                        continue
                    if ca["name"] == cb["name"] and (ca["name"].endswith("_id") or ca["name"] == "id"):
                        seen.add(key)
                        suggestions.append(
                            RelationSuggestion(
                                left_table=t_a,
                                left_column=ca["name"],
                                right_table=t_b,
                                right_column=cb["name"],
                                confidence="high",
                                reason=f'컬럼 "{ca["name"]}"이 두 테이블 모두에 존재함',
                            )
                        )
                    if ca["name"] == "id" and cb["name"] == f"{t_a}_id":
                        seen.add(key)
                        suggestions.append(
                            RelationSuggestion(
                                left_table=t_a,
                                left_column=ca["name"],
                                right_table=t_b,
                                right_column=cb["name"],
                                confidence="high",
                                reason=f'"{t_b}.{cb["name"]}"이 테이블명_id 패턴으로 "{t_a}.id"를 참조함',
                            )
                        )
                    if cb["name"] == "id" and ca["name"] == f"{t_b}_id":
                        seen.add(key)
                        suggestions.append(
                            RelationSuggestion(
                                left_table=t_a,
                                left_column=ca["name"],
                                right_table=t_b,
                                right_column=cb["name"],
                                confidence="high",
                                reason=f'"{t_a}.{ca["name"]}"이 테이블명_id 패턴으로 "{t_b}.id"를 참조함',
                            )
                        )
                    for suffix in id_suffixes:
                        if (
                            ca["name"].endswith(suffix)
                            and cb["name"].endswith(suffix)
                            and ca["name"] != cb["name"]
                            and key not in seen
                        ):
                            root_a = ca["name"].removesuffix(suffix)
                            root_b = cb["name"].removesuffix(suffix)
                            if root_a in root_b or root_b in root_a or t_a.find(root_b) >= 0 or t_b.find(root_a) >= 0:
                                seen.add(key)
                                suggestions.append(
                                    RelationSuggestion(
                                        left_table=t_a,
                                        left_column=ca["name"],
                                        right_table=t_b,
                                        right_column=cb["name"],
                                        confidence="medium",
                                        reason=(
                                            f'컬럼 "{ca["name"]}"과 "{cb["name"]}"이 '
                                            f'접미사 "{suffix}"를 공유함'
                                        ),
                                    )
                                )
    return suggestions

3부 — 인터페이스 (ui/)

인터페이스 계층은 독립적인 모듈들로 구성되어 있으며, 각각 화면의 한 섹션을 담당한다. 이 세분화 덕분에 다른 컴포넌트에 영향을 주지 않고 원하는 컴포넌트만 수정하거나 교체할 수 있다. 각 파일의 목적이 명확하고 독립적이라 코드 가독성도 높다.

ui/__init__.py

빈 파일 — Python이 디렉터리를 패키지로 인식하게 해주는 역할이다.

ui/sidebar.py

사이드바는 불러온 소스들의 시각적 인덱스 역할을 한다. 각 소스에는 원본 형식을 나타내는 아이콘이 표시되며, 클릭하면 메인 패널에서 미리보기로 선택된다.

import streamlit as st
from core.engine import DataSource

def render_sidebar() -> None:
    """불러온 소스 목록을 사이드바에 렌더링한다."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    st.sidebar.title("📋 소스")
    if not sources:
        st.sidebar.info("불러온 소스가 없습니다.")
        return
    for source in sources:
        icon = {
            "csv": "📄",
            "xlsx": "📊",
            "sqlite": "🗃️",
            "json": "🔖",
            "parquet": "📦",
            "postgres": "🐘",
        }.get(source.source_type, "📁")
        label = f"{icon} {source.table_name}"
        if source.sheet_name:
            label += f" ({source.sheet_name})"
        if st.sidebar.button(label, key=f"src_{source.table_name}", use_container_width=True):
            st.session_state["selected_source"] = source.table_name

ui/importer.py

임포터는 테이블레이터의 데이터 진입점이다. 여러 파일 업로드를 받아 확장자로 각 파일의 형식을 자동으로 인식한다. 여러 시트가 있는 Excel 파일은 특별히 처리된다. 시스템이 사용 가능한 시트 목록을 표시하고 사용자가 불러올 시트를 선택할 수 있다.

중복 불러오기 방지 메커니즘(processed_upload_keys)은 같은 파일이 같은 세션에서 두 번 처리되지 않도록 보장한다. Streamlit이 사용자 상호작용마다 스크립트 전체를 재실행하기 때문에 이 처리가 필요하다.

import os
import streamlit as st
from core.engine import DataSource, TabuladorEngine

def render_importer(engine: TabuladorEngine) -> None:
    """파일 업로드 및 불러오기 컴포넌트."""
    st.subheader("데이터 불러오기")
    uploaded_files = st.file_uploader(
        "파일을 끌어다 놓거나 선택하여 불러오기",
        type=["csv", "xlsx", "json", "parquet", "sqlite", "db"],
        accept_multiple_files=True,
        key="file_uploader",
    )
    if not uploaded_files:
        st.session_state["processed_upload_keys"] = set()
        return
    if "processed_upload_keys" not in st.session_state:
        st.session_state["processed_upload_keys"] = set()
    current_keys = {_upload_key(f) for f in uploaded_files}
    st.session_state["processed_upload_keys"] = st.session_state["processed_upload_keys"].intersection(current_keys)
    for uploaded_file in uploaded_files:
        upload_key = _upload_key(uploaded_file)
        if upload_key in st.session_state["processed_upload_keys"]:
            continue
        ext = uploaded_file.name.rsplit(".", 1)[-1].lower()
        imported = False
        if ext == "csv":
            _add_source(engine.import_csv_from_upload(uploaded_file))
            imported = True
        elif ext == "xlsx":
            imported = _handle_excel_upload(engine, uploaded_file)
        elif ext == "json":
            _add_source(engine.import_json_from_upload(uploaded_file))
            imported = True
        elif ext == "parquet":
            _add_source(engine.import_parquet_from_upload(uploaded_file))
            imported = True
        elif ext in ("sqlite", "db"):
            for source in engine.import_sqlite_from_upload(uploaded_file):
                _add_source(source)
            imported = True
        else:
            st.warning(f"지원하지 않는 형식: .{ext}")
        if imported:
            st.session_state["processed_upload_keys"].add(upload_key)

def _upload_key(uploaded_file) -> str:
    size = getattr(uploaded_file, "size", None)
    if size is None:
        size = len(uploaded_file.getvalue())
    return f"{uploaded_file.name}:{size}"

def _handle_excel_upload(engine: TabuladorEngine, uploaded_file) -> bool:
    tmp_path, sheets = engine.list_excel_sheets_from_upload(uploaded_file)
    try:
        if len(sheets) == 1:
            _add_source(engine.import_excel(tmp_path, sheet_name=sheets[0], display_name=uploaded_file.name))
            return True
        st.info(f"📊 **{uploaded_file.name}**에 시트가 {len(sheets)}개 있습니다. 불러올 시트를 선택하세요:")
        selected_sheets = st.multiselect(
            "사용 가능한 시트",
            sheets,
            default=[sheets[0]],
            key=f"sheets_{uploaded_file.name}",
        )
        if st.button(f"{len(selected_sheets)}개 시트 불러오기", key=f"import_sheets_{uploaded_file.name}"):
            for sheet in selected_sheets:
                _add_source(engine.import_excel(tmp_path, sheet_name=sheet, display_name=uploaded_file.name))
            st.success(f"✅ {uploaded_file.name}에서 {len(selected_sheets)}개 시트를 불러왔습니다.")
            return True
        return False
    finally:
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)

def _add_source(source: DataSource) -> None:
    if "sources" not in st.session_state:
        st.session_state["sources"] = []
    st.session_state["sources"] = [s for s in st.session_state["sources"] if s.table_name != source.table_name] + [source]
    st.session_state["selected_source"] = source.table_name

ui/source_preview.py

사이드바에서 소스를 선택하면 이 컴포넌트가 테이블 스키마(각 컬럼의 이름과 타입)와 첫 번째 레코드 샘플을 표시한다. 불러오기가 잘 됐는지, DuckDB가 데이터 타입을 올바르게 추론했는지 가장 빠르게 확인할 수 있는 방법이다.

import streamlit as st
from core.engine import DataSource, TabuladorEngine

def render_source_preview(engine: TabuladorEngine) -> None:
    """선택한 소스의 스키마와 미리보기를 표시한다."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    selected = st.session_state.get("selected_source")
    if not selected or not sources:
        return
    source = next((s for s in sources if s.table_name == selected), None)
    if not source:
        return
    st.subheader(f"미리보기: {source.table_name}")
    col1, col2 = st.columns(2)
    with col1:
        st.metric("테이블", source.table_name)
    with col2:
        st.metric("타입", source.source_type.upper())
    st.caption("스키마")
    st.dataframe([{"컬럼": c.name, "타입": c.type} for c in source.columns], use_container_width=True, hide_index=True)
    st.caption("데이터 샘플")
    try:
        df = engine.preview_table(source.table_name, limit=20)
        st.dataframe(df, use_container_width=True, hide_index=True)
    except Exception as exc:
        st.error(f"미리보기 불러오기 오류: {exc}")

ui/relations.py

관계 편집기는 사용자가 테이블 연결 방식을 수동으로 정의할 수 있게 해준다. 인터페이스는 왼쪽 테이블, 왼쪽 컬럼, JOIN 타입, 오른쪽 테이블, 오른쪽 컬럼 — 5개의 선택 박스를 나란히 표시하고 목록에 관계를 추가하는 버튼을 제공한다.

여기서 정의된 관계는 SQL 자동 생성에 직접 반영된다. 각 관계를 개별적으로 제거할 수 있어 실험하기 편리하다. 처음부터 쿼리를 다시 작성하지 않고도 다양한 JOIN 조합을 테스트할 수 있다.

import streamlit as st

from core.engine import DataSource
from core.sql_builder import Relation

def render_relation_editor() -> None:
    """테이블 간 관계 시각적 편집기."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    if "relations" not in st.session_state:
        st.session_state["relations"] = []
    relations: list[Relation] = st.session_state["relations"]
    st.subheader("관계 정의")
    if len(sources) < 2:
        st.info("관계를 정의하려면 소스를 두 개 이상 불러오세요.")
        return
    table_names = [s.table_name for s in sources]
    columns_map = {s.table_name: [c.name for c in s.columns] for s in sources}
    col1, col2, col3, col4, col5 = st.columns(5)
    with col1:
        left_table = st.selectbox("왼쪽 테이블", [""] + table_names, key="rel_left_table")
    with col2:
        left_column = st.selectbox("왼쪽 컬럼", [""] + columns_map.get(left_table, []), key="rel_left_col")
    with col3:
        join_type = st.selectbox("JOIN 타입", ["INNER", "LEFT", "RIGHT", "FULL"], key="rel_join")
    with col4:
        right_table = st.selectbox("오른쪽 테이블", [""] + table_names, key="rel_right_table")
    with col5:
        right_column = st.selectbox("오른쪽 컬럼", [""] + columns_map.get(right_table, []), key="rel_right_col")
    if st.button("➕ 관계 추가", key="add_relation"):
        if left_table and left_column and right_table and right_column:
            relations.append(
                Relation(
                    left_table=left_table,
                    left_column=left_column,
                    right_table=right_table,
                    right_column=right_column,
                    join_type=join_type,
                )
            )
            st.session_state["relations"] = relations
            st.rerun()
        else:
            st.warning("모든 필드를 채워주세요.")
    if not relations:
        st.caption("정의된 관계가 없습니다.")
        return
    for idx, rel in enumerate(relations):
        c1, c2 = st.columns([5, 1])
        with c1:
            st.code(f"{rel.left_table}.{rel.left_column}  {rel.join_type} JOIN  {rel.right_table}.{rel.right_column}")
        with c2:
            if st.button("🗑️", key=f"rm_rel_{idx}"):
                st.session_state["relations"].pop(idx)
                st.rerun()

ui/column_selector.py

컬럼 선택기는 SELECT 절을 시각적으로 구성할 수 있게 해준다. 사용자는 테이블, 컬럼, 선택적으로 집계 함수(COUNT, SUM, AVG, MIN, MAX, COUNT DISTINCT), 그리고 별칭(alias)을 선택한다. 컬럼을 선택하지 않으면 생성된 SQL은 SELECT *를 사용한다.

집계 함수가 포함되면 sql_builder에서 자동으로 GROUP BY 절도 생성된다. 집계 함수가 없는 컬럼들이 자동으로 그룹화 대상이 되어 일반적인 분석 쿼리처럼 동작한다.

import streamlit as st
from core.engine import DataSource
from core.sql_builder import SelectedColumn
AGGREGATES = ["", "COUNT", "SUM", "AVG", "MIN", "MAX", "COUNT_DISTINCT"]

def render_column_selector() -> None:
    """집계를 포함한 시각적 컬럼 선택."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    if "selected_columns" not in st.session_state:
        st.session_state["selected_columns"] = []
    selected: list[SelectedColumn] = st.session_state["selected_columns"]
    st.subheader("컬럼 선택")
    if not sources:
        return
    if selected:
        st.caption(f"{len(selected)}개 컬럼 선택됨")
    else:
        st.caption("선택된 컬럼 없음 — SQL은 SELECT *를 사용합니다")
    table_names = [s.table_name for s in sources]
    columns_map = {s.table_name: [c.name for c in s.columns] for s in sources}
    col1, col2, col3, col4 = st.columns(4)
    with col1:
        table = st.selectbox("테이블", [""] + table_names, key="colsel_table")
    with col2:
        column = st.selectbox("컬럼", [""] + columns_map.get(table, []), key="colsel_col")
    with col3:
        aggregate = st.selectbox("집계", AGGREGATES, key="colsel_agg")
    with col4:
        alias = st.text_input("별칭 (선택사항)", key="colsel_alias")
    btn_col1, btn_col2 = st.columns([1, 1])
    with btn_col1:
        if st.button("➕ 컬럼 추가", key="add_col"):
            if table and column:
                selected.append(SelectedColumn(table=table, column=column, aggregate=aggregate, alias=alias))
                st.session_state["selected_columns"] = selected
                st.rerun()
    with btn_col2:
        if selected and st.button("🗑️ 전체 삭제", key="clear_cols"):
            st.session_state["selected_columns"] = []
            st.rerun()
    for idx, col in enumerate(selected):
        dcol, rcol = st.columns([5, 1])
        with dcol:
            agg = f"{col.aggregate}(" if col.aggregate else ""
            end = ")" if col.aggregate else ""
            alias_str = f" AS {col.alias}" if col.alias else ""
            st.code(f"{agg}{col.table}.{col.column}{end}{alias_str}")
        with rcol:
            if st.button("🗑️", key=f"rm_col_{idx}"):
                st.session_state["selected_columns"].pop(idx)
                st.rerun()

ui/filter_builder.py

필터 빌더는 WHERE 절을 시각적으로 구성할 수 있는 인터페이스를 제공한다. 다양한 연산자 — 동등, 비교, LIKE, IN, BETWEEN, IS NULL — 를 지원하고 AND 또는 OR로 조건을 연결할 수 있다. 각 필터를 개별적으로 활성화, 비활성화, 제거할 수 있어 지금까지 설정한 필터를 잃지 않고 가설을 테스트하기에 매우 편리하다.

import time
import streamlit as st
from core.engine import DataSource
from core.sql_builder import QueryFilter
OPERATORS = [
    ("=", "= (같음)"),
    ("!=", "!= (다름)"),
    (">", "> (초과)"),
    (">=", ">= (이상)"),
    ("<", "< (미만)"),
    ("<=", "<= (이하)"),
    ("LIKE", "LIKE (포함)"),
    ("NOT LIKE", "NOT LIKE"),
    ("IN", "IN (목록)"),
    ("NOT IN", "NOT IN"),
    ("IS NULL", "IS NULL (비어있음)"),
    ("IS NOT NULL", "IS NOT NULL (채워짐)"),
    ("BETWEEN", "BETWEEN (사이)"),
]
NO_VALUE_OPS = {"IS NULL", "IS NOT NULL"}

def render_filter_builder() -> None:
    """WHERE 절을 위한 시각적 필터."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    if "filters" not in st.session_state:
        st.session_state["filters"] = []
    filters: list[QueryFilter] = st.session_state["filters"]
    st.subheader("필터 (WHERE)")
    if not sources:
        return
    table_names = [s.table_name for s in sources]
    columns_map = {s.table_name: [c.name for c in s.columns] for s in sources}
    c1, c2, c3 = st.columns(3)
    with c1:
        table = st.selectbox("테이블", [""] + table_names, key="flt_table")
    with c2:
        column = st.selectbox("컬럼", [""] + columns_map.get(table, []), key="flt_col")
    with c3:
        labels = [f"{op[0]} - {op[1]}" for op in OPERATORS]
        op_idx = st.selectbox("연산자", range(len(OPERATORS)), format_func=lambda i: labels[i], key="flt_op")
        operator = OPERATORS[op_idx][0]
    value = ""
    value_to = ""
    if operator not in NO_VALUE_OPS:
        if operator == "BETWEEN":
            v1, v2 = st.columns(2)
            with v1:
                value = st.text_input("시작 값", key="flt_val_from")
            with v2:
                value_to = st.text_input("끝 값", key="flt_val_to")
        elif operator in ("IN", "NOT IN"):
            value = st.text_input("값 (쉼표로 구분)", key="flt_val_in")
        else:
            value = st.text_input("값", key="flt_val")
    conn_col, btn_col = st.columns([1, 1])
    with conn_col:
        connector = st.selectbox("연결자", ["AND", "OR"], key="flt_connector")
    with btn_col:
        if st.button("➕ 필터 추가", key="add_filter"):
            if table and column and (operator in NO_VALUE_OPS or value.strip()):
                filters.append(
                    QueryFilter(
                        id=f"f_{int(time.time() * 1000)}",
                        table=table,
                        column=column,
                        operator=operator,
                        value=value,
                        value_to=value_to,
                        connector=connector,
                        enabled=True,
                    )
                )
                st.session_state["filters"] = filters
                st.rerun()
    for idx, flt in enumerate(filters):
        fc1, fc2, fc3 = st.columns([5, 1, 1])
        with fc1:
            prefix = f"{flt.connector} " if idx > 0 else ""
            val_str = ""
            if flt.operator not in NO_VALUE_OPS:
                val_str = f" {flt.value}" if flt.operator != "BETWEEN" else f" {flt.value} AND {flt.value_to}"
            status = "✅" if flt.enabled else "⏸️"
            st.code(f"{status} {prefix}{flt.table}.{flt.column} {flt.operator}{val_str}")
        with fc2:
            label = "비활성화" if flt.enabled else "활성화"
            if st.button(label, key=f"toggle_{flt.id}"):
                flt.enabled = not flt.enabled
                st.rerun()
        with fc3:
            if st.button("🗑️", key=f"rm_flt_{flt.id}"):
                st.session_state["filters"] = [x for x in filters if x.id != flt.id]
                st.rerun()

ui/query_editor.py

쿼리 편집기는 모든 것이 수렴되는 곳이다. 두 가지 작업 방식을 제공한다. "SQL 자동 생성" 버튼은 컬럼, 필터, 관계 패널에서 선택한 내용을 바탕으로 쿼리를 조립한다. 또는 사용자가 텍스트 영역에서 SQL을 자유롭게 직접 입력하거나 편집할 수 있다.

모든 실행은 SQL, 반환된 행 수, 실행 시간을 저장하는 기록에 남는다. 디버깅과 쿼리 최적화 모두에 유용한 정보다.

import time
from dataclasses import dataclass
from datetime import datetime

import streamlit as st
from core.engine import TabuladorEngine
from core.sql_builder import build_sql

@dataclass
class QueryHistoryEntry:
    id: str
    sql: str
    executed_at: str
    row_count: int
    duration_ms: int

def render_query_editor(engine: TabuladorEngine) -> None:
    """자동 생성 및 실행이 가능한 SQL 편집기."""
    sources = st.session_state.get("sources", [])
    relations = st.session_state.get("relations", [])
    selected_columns = st.session_state.get("selected_columns", [])
    filters = st.session_state.get("filters", [])
    if "query_sql" not in st.session_state:
        st.session_state["query_sql"] = ""
    if "query_result" not in st.session_state:
        st.session_state["query_result"] = None
    if "query_history" not in st.session_state:
        st.session_state["query_history"] = []
    st.subheader("SQL 쿼리")
    col1, col2 = st.columns([1, 1])
    with col1:
        if st.button("🔄 SQL 자동 생성", key="gen_sql"):
            st.session_state["query_sql"] = build_sql(sources, relations, selected_columns, filters)
    sql = st.text_area(
        "SQL",
        value=st.session_state.get("query_sql", ""),
        height=200,
        key="sql_editor",
        label_visibility="collapsed",
    )
    st.session_state["query_sql"] = sql
    with col2:
        st.empty()
    if st.button("▶️ 실행", key="run_query", type="primary"):
        if not sql.strip():
            st.warning("실행 전에 SQL을 작성하거나 생성하세요.")
            return
        try:
            start = time.perf_counter()
            df = engine.run_query(sql)
            duration = int((time.perf_counter() - start) * 1000)
            st.session_state["query_result"] = df
            entry = QueryHistoryEntry(
                id=f"h_{int(time.time() * 1000)}",
                sql=sql,
                executed_at=datetime.now().isoformat(),
                row_count=len(df),
                duration_ms=duration,
            )
            history = st.session_state.get("query_history", [])
            st.session_state["query_history"] = [entry] + history[:49]
            st.success(f"✅ {len(df)}행, {duration}ms")
        except Exception as exc:
            st.error(f"쿼리 오류: {exc}")

ui/result_grid.py

결과 그리드는 마지막으로 실행된 쿼리가 반환한 DataFrame을 행 수와 컬럼 수 정보와 함께 표시한다. 450픽셀의 고정 높이 덕분에 큰 테이블도 스크롤로 탐색할 수 있으며, 나머지 인터페이스가 아래로 밀리지 않는다.

import streamlit as st

def render_result_grid() -> None:
    """마지막 쿼리의 결과를 표시한다."""
    df = st.session_state.get("query_result")
    if df is None:
        return
    st.subheader("결과")
    st.caption(f"{len(df)}행 × {len(df.columns)}열")
    st.dataframe(df, use_container_width=True, hide_index=True, height=450)

ui/chart_panel.py

시각화 패널은 Plotly를 통해 테이블 결과를 인터랙티브 차트로 변환한다. 막대, 선, 면적, 파이, 산점도 — 5가지 차트 타입을 지원하며 사용자가 결과 컬럼에서 축을 선택한다. 앱의 다크 테마와 시각적 일관성을 유지하기 위해 plotly_dark 템플릿이 적용된다.

import pandas as pd
import plotly.express as px
import streamlit as st

CHART_TYPES = {
    "bar": "막대",
    "line": "선",
    "area": "면적",
    "pie": "파이",
    "scatter": "산점도",
}

def render_chart_panel() -> None:
    """결과에 대한 인터랙티브 시각화."""
    df: pd.DataFrame | None = st.session_state.get("query_result")
    if df is None or df.empty:
        return
    st.subheader("시각화")
    numeric_cols = df.select_dtypes(include="number").columns.tolist()
    all_cols = df.columns.tolist()
    c1, c2, c3 = st.columns(3)
    with c1:
        chart_type = st.selectbox(
            "차트 타입",
            list(CHART_TYPES.keys()),
            format_func=lambda k: CHART_TYPES[k],
            key="chart_type",
        )
    with c2:
        x_col = st.selectbox("X축 / 카테고리", [""] + all_cols, key="chart_x")
    with c3:
        if chart_type == "pie":
            y_single = st.selectbox("값", [""] + numeric_cols, key="chart_y_single")
            y_cols = [y_single] if y_single else []
        else:
            y_cols = st.multiselect("Y축 / 값", numeric_cols, key="chart_y")
    if not x_col or not y_cols:
        st.caption("차트를 생성하려면 축을 선택하세요.")
        return
    try:
        if chart_type == "bar":
            fig = px.bar(df, x=x_col, y=y_cols, barmode="group")
        elif chart_type == "line":
            fig = px.line(df, x=x_col, y=y_cols)
        elif chart_type == "area":
            fig = px.area(df, x=x_col, y=y_cols)
        elif chart_type == "scatter":
            fig = px.scatter(df, x=x_col, y=y_cols[0])
        elif chart_type == "pie":
            fig = px.pie(df, names=x_col, values=y_cols[0])
        else:
            return
        fig.update_layout(
            template="plotly_dark",
            paper_bgcolor="#0f172a",
            plot_bgcolor="#1e293b",
            height=450,
        )
        st.plotly_chart(fig, use_container_width=True)
    except Exception as exc:
        st.error(f"차트 생성 오류: {exc}")

ui/query_history.py

쿼리 기록은 타임스탬프, 행 수, 처리 시간과 함께 최근 50개의 실행 기록을 저장한다. 과거 쿼리를 한 번의 클릭으로 재사용할 수 있어 같은 분석의 변형을 반복할 때 특히 유용하다.

import streamlit as st

def render_query_history() -> None:
    """재사용 가능한 쿼리 기록."""
    history = st.session_state.get("query_history", [])
    if not history:
        return
    st.subheader(f"쿼리 기록 ({len(history)}개)")
    if st.button("🗑️ 기록 지우기", key="clear_history"):
        st.session_state["query_history"] = []
        st.rerun()
    for entry in history:
        title = f"🕐 {entry.executed_at[:19]}  —  {entry.row_count}행  —  {entry.duration_ms}ms"
        with st.expander(title, expanded=False):
            st.code(entry.sql, language="sql")
            if st.button("♻️ 이 쿼리 재사용", key=f"reuse_{entry.id}"):
                st.session_state["query_sql"] = entry.sql
                st.rerun()

ui/postgres_connector.py

파일 외에도 테이블레이터는 PostgreSQL 데이터베이스에 직접 연결할 수 있다. 이 컴포넌트는 일반적인 필드(호스트, 포트, 데이터베이스, 사용자, 비밀번호, 스키마)가 있는 연결 양식을 표시하고 원격 테이블을 로컬 DuckDB로 가져올 수 있다. 가져온 후에는 다른 테이블과 동일하게 동작한다 — 관계를 정의하고, 필터를 적용하고, SQL로 쿼리할 수 있다.

import streamlit as st

from core.engine import TabuladorEngine

def render_postgres_connector(engine: TabuladorEngine) -> None:
    """테이블 불러오기가 포함된 PostgreSQL 연결 양식."""
    with st.expander("🐘 PostgreSQL 연결", expanded=False):
        c1, c2, c3 = st.columns(3)
        with c1:
            host = st.text_input("호스트", value="localhost", key="pg_host")
        with c2:
            port = st.number_input("포트", value=5432, min_value=1, max_value=65535, key="pg_port")
        with c3:
            database = st.text_input("데이터베이스", key="pg_database")
        c4, c5, c6 = st.columns(3)
        with c4:
            user = st.text_input("사용자", key="pg_user")
        with c5:
            password = st.text_input("비밀번호", type="password", key="pg_password")
        with c6:
            schema = st.text_input("스키마", value="public", key="pg_schema")
        if st.button("🔌 연결", key="pg_connect"):
            if not all([host, database, user, password]):
                st.warning("필수 필드를 모두 채워주세요.")
                return
            try:
                tables = engine.connect_postgres(
                    host=host,
                    port=int(port),
                    database=database,
                    user=user,
                    password=password,
                    schema=schema or "public",
                )
                st.session_state["pg_tables"] = tables
                st.session_state["pg_config"] = {
                    "host": host,
                    "port": int(port),
                    "database": database,
                    "user": user,
                    "password": password,
                    "schema": schema or "public",
                }
                st.success(f"✅ 연결됨! 테이블 {len(tables)}개를 찾았습니다.")
            except Exception as exc:
                st.error(f"연결 오류: {exc}")
        pg_tables = st.session_state.get("pg_tables", [])
        pg_config = st.session_state.get("pg_config", {})
        if not pg_tables:
            return
        st.caption(f"사용 가능한 테이블 ({len(pg_tables)}개)")
        selected_pg = st.multiselect("불러올 테이블 선택", pg_tables, key="pg_sel")
        if st.button("📥 선택 항목 불러오기", key="pg_import"):
            for table_name in selected_pg:
                try:
                    source = engine.import_postgres_table(table_name=table_name, **pg_config)
                    if "sources" not in st.session_state:
                        st.session_state["sources"] = []
                    st.session_state["sources"] = [
                        s for s in st.session_state["sources"] if s.table_name != source.table_name
                    ] + [source]
                    st.success(f"✅ 테이블 `{table_name}`을 `{source.table_name}`으로 불러왔습니다.")
                except Exception as exc:
                    st.error(f"`{table_name}` 불러오기 오류: {exc}")

ui/project_actions.py

프로젝트 작업 모듈은 작업 세션의 완전한 상태를 저장하고 복원할 수 있게 해준다. 프로젝트는 정의된 관계, 선택된 컬럼, 필터, 현재 SQL, 쿼리 기록이 담긴 JSON 파일로 내보내진다. 복원 시 모든 논리적 설정이 재생성된다. 단, 데이터 파일은 수동으로 다시 업로드해야 한다 — 테이블레이터가 원본 파일의 복사본을 저장하지 않기 때문이다.

결과를 CSV와 Parquet으로 내보내는 버튼도 이곳에 있으며, 처리된 데이터를 다른 도구나 파이프라인에서 활용할 수 있다.

import json
from dataclasses import asdict
from datetime import datetime

import streamlit as st
from core.engine import TabuladorEngine
from core.sql_builder import QueryFilter, Relation, SelectedColumn
from ui.query_editor import QueryHistoryEntry

def render_project_actions(engine: TabuladorEngine) -> None:
    """프로젝트 저장/불러오기 및 결과 내보내기."""
    _ = engine
    st.subheader("프로젝트 및 내보내기")
    col1, col2, col3, col4 = st.columns(4)
    with col1:
        project_data = _build_project_dict()
        project_json = json.dumps(project_data, indent=2, default=str, ensure_ascii=False)
        st.download_button(
            "💾 프로젝트 저장",
            data=project_json.encode("utf-8"),
            file_name="my_project.tabproj.json",
            mime="application/json",
            key="save_project",
        )
    with col2:
        uploaded_project = st.file_uploader(
            "프로젝트 열기",
            type=["json"],
            key="open_project",
            label_visibility="collapsed",
        )
        if uploaded_project:
            _load_project(uploaded_project)
    with col3:
        sql = st.session_state.get("query_sql", "")
        app_engine: TabuladorEngine = st.session_state.get("engine")
        if sql.strip():
            try:
                csv_bytes = app_engine.export_query_to_bytes(sql, "csv")
                st.download_button(
                    "📄 CSV 내보내기",
                    data=csv_bytes,
                    file_name="result.csv",
                    mime="text/csv",
                    key="export_csv",
                )
            except Exception:
                st.button("📄 CSV 내보내기", disabled=True, key="export_csv_disabled")
    with col4:
        sql = st.session_state.get("query_sql", "")
        app_engine: TabuladorEngine = st.session_state.get("engine")
        if sql.strip():
            try:
                pq_bytes = app_engine.export_query_to_bytes(sql, "parquet")
                st.download_button(
                    "📦 Parquet 내보내기",
                    data=pq_bytes,
                    file_name="result.parquet",
                    mime="application/octet-stream",
                    key="export_parquet",
                )
            except Exception:
                st.button("📦 Parquet 내보내기", disabled=True, key="export_parquet_disabled")

def _build_project_dict() -> dict:
    sources = st.session_state.get("sources", [])
    return {
        "version": 2,
        "saved_at": datetime.now().isoformat(),
        "sources": [asdict(s) for s in sources],
        "relations": [asdict(r) for r in st.session_state.get("relations", [])],
        "query_sql": st.session_state.get("query_sql", ""),
        "selected_columns": [asdict(c) for c in st.session_state.get("selected_columns", [])],
        "filters": [asdict(f) for f in st.session_state.get("filters", [])],
        "query_history": [asdict(h) for h in st.session_state.get("query_history", [])],
    }

def _load_project(uploaded_file) -> None:
    """저장된 프로젝트를 불러와 논리적 상태를 복원한다."""
    try:
        data = json.loads(uploaded_file.read().decode("utf-8"))
        st.session_state["relations"] = [Relation(**r) for r in data.get("relations", [])]
        st.session_state["query_sql"] = data.get("query_sql", "")
        st.session_state["selected_columns"] = [SelectedColumn(**c) for c in data.get("selected_columns", [])]
        st.session_state["filters"] = [QueryFilter(**f) for f in data.get("filters", [])]
        st.session_state["query_history"] = [QueryHistoryEntry(**h) for h in data.get("query_history", [])]
        st.success(f"✅ 프로젝트 불러오기 완료 (소스 {len(data.get('sources', []))}개)")
        st.info("⚠️ 원본 파일은 수동으로 다시 업로드해야 합니다.")
    except Exception as exc:
        st.error(f"프로젝트 불러오기 오류: {exc}")

ui/relation_suggestions.py

이 컴포넌트는 core/의 추천 모듈을 인터페이스에 연결한다. "관계 추천" 버튼을 클릭하면 시스템이 불러온 모든 테이블을 분석해 신뢰도와 함께 발견된 일치 항목을 표시한다. 추천을 수락하면 편집기에서 관계로 자동 변환되어 SQL 생성에 즉시 사용할 수 있다.

import streamlit as st

from core.engine import DataSource, TabuladorEngine
from core.sql_builder import Relation
from core.suggestions import RelationSuggestion, suggest_relations

def render_relation_suggestions(engine: TabuladorEngine) -> None:
    """자동 관계 추천을 검색하고 표시한다."""
    sources: list[DataSource] = st.session_state.get("sources", [])
    if len(sources) < 2:
        return
    if "suggestions" not in st.session_state:
        st.session_state["suggestions"] = []
    col1, col2 = st.columns([3, 1])
    with col1:
        st.caption("컬럼 유사성 기반 자동 추천")
    with col2:
        if st.button("🔍 관계 추천", key="suggest_btn"):
            st.session_state["suggestions"] = suggest_relations(engine, sources)
    suggestions: list[RelationSuggestion] = st.session_state["suggestions"]
    if not suggestions:
        return
    st.caption(f"{len(suggestions)}개 추천 발견")
    for idx, sug in enumerate(suggestions):
        conf_badge = "🟢 높음" if sug.confidence == "high" else "🟡 보통"
        c1, c2 = st.columns([5, 1])
        with c1:
            st.markdown(
                f"**{sug.left_table}.{sug.left_column}** ↔ **{sug.right_table}.{sug.right_column}**  "
                f"&nbsp;&nbsp;{conf_badge}  \n"
                f"_{sug.reason}_"
            )
        with c2:
            if st.button("✅ 수락", key=f"accept_sug_{idx}"):
                if "relations" not in st.session_state:
                    st.session_state["relations"] = []
                st.session_state["relations"].append(
                    Relation(
                        left_table=sug.left_table,
                        left_column=sug.left_column,
                        right_table=sug.right_table,
                        right_column=sug.right_column,
                        join_type="INNER",
                    )
                )
                st.session_state["suggestions"] = [
                    x
                    for x in suggestions
                    if not (
                        x.left_table == sug.left_table
                        and x.left_column == sug.left_column
                        and x.right_table == sug.right_table
                        and x.right_column == sug.right_column
                    )
                ]
                st.rerun()

4부 — 메인 진입점

app.py

app.py는 앱의 시작점이다. 페이지를 설정하고, 엔진을 초기화하고(session_state를 통해 한 번만), 모든 컴포넌트를 화면에 보이는 순서대로 렌더링한다. 구조가 선형적이고 선언적이다 — 각 render_* 호출이 구분선으로 구분된 자신의 섹션을 레이아웃에 추가한다.

import streamlit as st

from core.engine import TabuladorEngine
from ui.chart_panel import render_chart_panel
from ui.column_selector import render_column_selector
from ui.filter_builder import render_filter_builder
from ui.importer import render_importer
from ui.postgres_connector import render_postgres_connector
from ui.project_actions import render_project_actions
from ui.query_editor import render_query_editor
from ui.query_history import render_query_history
from ui.relation_suggestions import render_relation_suggestions
from ui.relations import render_relation_editor
from ui.result_grid import render_result_grid
from ui.sidebar import render_sidebar
from ui.source_preview import render_source_preview

st.set_page_config(
    page_title="로컬 테이블레이터",
    page_icon="📊",
    layout="wide",
    initial_sidebar_state="expanded",
)
if "engine" not in st.session_state:
    st.session_state["engine"] = TabuladorEngine()
engine: TabuladorEngine = st.session_state["engine"]
st.markdown(
    """
    <h1 style="text-align: center; color: #ffffff; margin-bottom: 0.25rem;">
        로컬 데이터 테이블레이터
    </h1>
    <p style="text-align: center; color: #e2e8f0; margin-top: 0;">
        DuckDB로 소스를 불러오고, 테이블 관계를 정의하고, SQL 쿼리를 로컬에서 실행한다.
    </p>
    """,
    unsafe_allow_html=True,
)
render_sidebar()
render_importer(engine)
st.divider()
render_project_actions(engine)
st.divider()
render_postgres_connector(engine)
render_source_preview(engine)
st.divider()
render_relation_suggestions(engine)
render_relation_editor()
st.divider()
render_column_selector()
render_filter_builder()
st.divider()
render_query_editor(engine)
render_result_grid()
st.divider()
render_chart_panel()
st.divider()
render_query_history()

5부 — 데모 데이터

실제 데이터 없이도 첫 테스트를 바로 진행할 수 있도록 프로젝트에는 예시 데이터 생성 스크립트가 포함되어 있다. 서로 다른 형식의 4가지 소스를 생성한다 — 주문 CSV, 상품 Excel, 매장 JSON, 고객 SQLite — 이 데이터들은 cliente_id, produto_id, loja_id 같은 키로 서로 관계가 있다.

이 데이터 세트는 기본 소매 시나리오를 시뮬레이션하며 테이블레이터의 모든 기능을 연습하기에 충분하다. 다형식 불러오기, 관계 추천, JOIN 및 집계를 포함한 SQL 생성, 결과 시각화까지.

scripts/generate_demo_data.py

from __future__ import annotations

import json
import sqlite3
from pathlib import Path
import pandas as pd

def main() -> None:
    base = Path("sample_data")
    base.mkdir(parents=True, exist_ok=True)
    pedidos = pd.DataFrame(
        [
            {
                "pedido_id": 1001,
                "cliente_id": 1,
                "produto_id": 501,
                "loja_id": "L1",
                "quantidade": 2,
                "data_pedido": "2026-01-15",
            },
            {
                "pedido_id": 1002,
                "cliente_id": 2,
                "produto_id": 502,
                "loja_id": "L2",
                "quantidade": 1,
                "data_pedido": "2026-01-16",
            },
            {
                "pedido_id": 1003,
                "cliente_id": 1,
                "produto_id": 503,
                "loja_id": "L1",
                "quantidade": 3,
                "data_pedido": "2026-01-17",
            },
            {
                "pedido_id": 1004,
                "cliente_id": 3,
                "produto_id": 501,
                "loja_id": "L3",
                "quantidade": 1,
                "data_pedido": "2026-02-01",
            },
            {
                "pedido_id": 1005,
                "cliente_id": 4,
                "produto_id": 504,
                "loja_id": "L2",
                "quantidade": 2,
                "data_pedido": "2026-02-03",
            },
            {
                "pedido_id": 1006,
                "cliente_id": 2,
                "produto_id": 502,
                "loja_id": "L1",
                "quantidade": 5,
                "data_pedido": "2026-02-10",
            },
        ]
    )
    pedidos.to_csv(base / "pedidos.csv", index=False)
    produtos = pd.DataFrame(
        [
            {"produto_id": 501, "nome_produto": "기본 티셔츠", "categoria": "의류", "preco_unitario": 49.90},
            {"produto_id": 502, "nome_produto": "러너 운동화", "categoria": "신발", "preco_unitario": 299.90},
            {"produto_id": 503, "nome_produto": "어반 백팩", "categoria": "액세서리", "preco_unitario": 189.90},
            {"produto_id": 504, "nome_produto": "스냅백 모자", "categoria": "액세서리", "preco_unitario": 79.90},
        ]
    )
    with pd.ExcelWriter(base / "produtos.xlsx", engine="openpyxl") as writer:
        produtos.to_excel(writer, sheet_name="Catalogo", index=False)
    lojas = [
        {"loja_id": "L1", "nome_loja": "상파울루 센터", "regiao": "남동부"},
        {"loja_id": "L2", "nome_loja": "살바도르 쇼핑", "regiao": "북동부"},
        {"loja_id": "L3", "nome_loja": "포르탈레자 몰", "regiao": "북동부"},
    ]
    (base / "lojas.json").write_text(json.dumps(lojas, ensure_ascii=False, indent=2), encoding="utf-8")
    sqlite_path = base / "crm.sqlite"
    if sqlite_path.exists():
        sqlite_path.unlink()
    conn = sqlite3.connect(sqlite_path)
    try:
        cur = conn.cursor()
        cur.execute(
            """
            CREATE TABLE clientes (
                cliente_id INTEGER PRIMARY KEY,
                nome TEXT NOT NULL,
                cidade TEXT NOT NULL,
                segmento TEXT NOT NULL
            )
            """
        )
        cur.executemany(
            "INSERT INTO clientes (cliente_id, nome, cidade, segmento) VALUES (?, ?, ?, ?)",
            [
                (1, "Ana Silva", "Recife", "Premium"),
                (2, "Bruno Costa", "Salvador", "Standard"),
                (3, "Carla Souza", "Fortaleza", "Premium"),
                (4, "Diego Lima", "Brasília", "Standard"),
            ],
        )
        conn.commit()
    finally:
        conn.close()
    print("sample_data/에 파일 생성 완료:")
    for item in sorted(base.iterdir()):
        print(f"- {item.name}")

if __name__ == "__main__":
    main()

예시 데이터를 생성하려면 다음을 실행한다:

python3 scripts/generate_demo_data.py

6부 — 실제 사용 흐름

환경이 준비되고 예시 데이터가 생성됐다면, 테이블레이터의 사용 흐름은 데이터 분석의 사고 과정을 그대로 따르는 자연스러운 순서를 갖는다.

1단계 — 앱 실행: streamlit run app.py로 시작한다.

2단계 — 파일 불러오기: pedidos.csv, produtos.xlsx, crm.sqlite, lojas.json을 업로드한다. 각 파일이 자동으로 DuckDB 테이블로 변환되어 사이드바에 나타난다.

3단계 — 관계 발견: 관계 섹션에서 "관계 추천"을 클릭한다. 시스템이 공통 키(cliente_id, produto_id, loja_id)를 식별하고 신뢰도와 함께 추천 목록을 제시한다. 적절한 것을 수락한다.

4단계 — 컬럼 및 지표 선택: 컬럼 선택기에서 차원(세그먼트, 지역, 카테고리)과 집계 지표(수량 합계, 주문 수)를 선택한다.

5단계 — 필터 적용: 필터 빌더로 기간, 세그먼트, 카테고리 또는 기타 관련 기준으로 제한한다.

6단계 — SQL 생성 및 실행: "SQL 자동 생성"을 클릭해 선택 항목에서 쿼리를 조립하고, 필요시 조정한 후 실행한다.

7단계 — 결과 분석: 결과 그리드에서 데이터를 탐색하고 차트 패널에서 시각화를 만든다.

8단계 — 내보내기: CSV 또는 Parquet으로 결과를 저장해 다른 도구에서 활용한다.


7부 — SQL 분석 예시

테이블레이터가 자동으로 생성하는 쿼리 유형을 보여주기 위해, 4개의 데이터 소스를 크로스조인하여 고객 세그먼트별, 매장 지역별, 상품 카테고리별 총 항목 수와 주문 수를 반환하는 예시를 살펴보겠다.

SELECT clientes.segmento,
  lojas.regiao,
  produtos_xlsx_catalogo.categoria,
  SUM(pedidos.quantidade) AS total_itens,
  COUNT(pedidos.pedido_id) AS total_pedidos
FROM pedidos
INNER JOIN clientes ON pedidos.cliente_id = clientes.cliente_id
INNER JOIN produtos_xlsx_catalogo ON pedidos.produto_id = produtos_xlsx_catalogo.produto_id
INNER JOIN lojas ON pedidos.loja_id = lojas.loja_id
GROUP BY clientes.segmento, lojas.regiao, produtos_xlsx_catalogo.categoria
LIMIT 200;

produtos_xlsx_catalogo라는 이름은 테이블레이터의 명명 규칙을 반영한다는 점을 주목하자. 파일명(produtos), 형식(xlsx), 시트명(catalogo)을 연결하고 정규화한 것이다. 여러 Excel 파일이 서로 다른 시트 이름을 가진 채로 동시에 불러와져도 이름이 겹치지 않는다.


8부 — 실용적 참고사항

개발 과정에서 프로젝트를 확장하거나 수정하려는 분들이 알아두면 좋을 몇 가지 엔지니어링 결정들이 있다.

Streamlit은 사용자 상호작용마다 스크립트 전체를 재실행한다. 이는 st.session_state 없이는 모든 상태가 매 클릭마다 사라진다는 뜻이다. 그래서 프로젝트는 불러온 소스, 관계, 필터, SQL, 쿼리 기록을 유지하기 위해 session_state를 광범위하게 사용한다.

임포터에는 같은 업로드의 자동 재처리를 막는 장치(processed_upload_keys)가 있다. 이 제어 없이는 Streamlit이 재실행될 때마다 같은 파일을 계속 불러오려 해 테이블이 중복되고 성능이 저하된다.

엔진은 안전한 이름 변경을 사용한다 (DROP TABLE IF EXISTSALTER TABLE). 재실행 중에도 테이블이 일관된 이름을 유지하도록 충돌을 방지한다.

프로젝트 불러오기는 논리만 복원한다 — 관계, 필터, 선택된 컬럼, SQL, 기록. 데이터 파일은 업로드를 통해 수동으로 다시 불러와야 한다. 테이블레이터가 원본 파일의 복사본을 저장하지 않기 때문이다. 이 결정은 프로젝트를 가볍게 유지하고 개인정보 및 저장 공간 문제를 방지한다.


결론

이 MVP 하나로 완전한 로컬 데이터 분석 도구를 갖추게 된다.

  • 다형식 불러오기: CSV, Excel, JSON, Parquet, SQLite
  • 수동 및 자동 관계 정의
  • 시각적 컬럼·필터 빌더
  • SQL 자동 생성 + 자유 편집
  • DuckDB 로컬 쿼리 실행
  • 결과 그리드 및 인터랙티브 차트
  • 쿼리 기록 및 CSV/Parquet 내보내기

모두 Python 하나로, 프론트엔드 빌드 없이, 모듈화된 쉽게 확장 가능한 코드로 구현됩니다.