모든 데이터 엔지니어가 알아야 할 5가지 데이터 수집 방법
데이터가 실제로 파이프라인에 들어오는 방법 — 풀 로드, 증분, CDC, 스트리밍, API를 Python 코드와 함께
변환 코드를 작성하기 전에, 예측 모델을 만들기 전에, Spark 파이프라인을 실행하기 전에, 가장 먼저 해야 할 일은 데이터를 가져오는 것이다.
데이터가 어디에 있든 그곳에서 끌어와 작업할 수 있는 곳에 안착시키는 그 첫 단계가 데이터 수집(Data Ingestion)이다.
단순하게 들린다. 하지만 항상 그렇지는 않다.
변환 로직이 틀려서가 아니라 수집 방법이 맞지 않아서 파이프라인이 무너지는 것을 본 적이 있다. 누군가는 증분 로드가 필요한 상황에서 풀 로드를 사용했다. 누군가는 거의 실시간 처리가 필요한 데이터에 배치 작업을 만들었다.
파이프라인은 기술적으로는 작동했다. 단지 비즈니스가 필요로 하는 일을 하지 않았을 뿐이다.
이 글에서는 다섯 가지 수집 방법을 살펴본다. 풀 로드, 증분 로드, CDC(변경 데이터 캡처), 스트리밍, 그리고 API 기반 수집.
각각에 대해 수집이 무엇인지, 언제 사용해야 하는지, Python으로 어떻게 구현하는지 다룬다. 개념을 설명하기 위해 비유도 사용한다. 글이 좀 길지만, 다 읽으면 무언가를 배웠다고 느낄 것이다.
가장 흔하고 단순한 것부터 더 고급으로 넘어가자. 시작해보자!

Full Load
풀 로드는 말 그대로다. 파이프라인을 트리거할 때마다 소스에서 전체 데이터셋을 가져와서 목적지에 있던 것을 모두 교체한다.
히스토리 추적도 없고, 비교도 없다. 그냥 완전한 덮어쓰기다.
기술적으로 풀 로드는 목적지 테이블을 비우고(truncate) 소스의 모든 행을 다시 삽입한다. 이것은 파이프라인이 마지막 실행 이후 무엇이 바뀌었는지 알 필요가 없다는 의미다. 모든 실행을 새로운 실행처럼 취급한다.
트레이드오프는 구현은 단순하지만 규모가 커지면 비용이 비싸다는 것이다. 소스 테이블에 5,000만 행이 있다면, 매번 5,000만 행을 옮기게 된다.

풀 로드 비유
매일 아침 지워지고 다시 그려지는 화이트보드를 생각해보자.
보드에 항목이 5개 있다면 빠르다. 지우고, 다시 그리고, 끝. 하지만 항목이 10,000개라면, 밤사이 세 가지만 바뀌었어도 모든 것을 다시 그려야 한다. 풀 로드도 같은 방식으로 작동한다. 한 행이 바뀌었든 백만 행이 바뀌었든, 매 실행은 전체를 다시 그리는 것이다.
풀 로드를 사용할 수 있는 실제 상황
- 국가 코드 목록처럼 자주 바뀌지 않고 저렴하게 다시 불러올 수 있을 만큼 작은 소규모 참조 테이블
- 항상 설정 테이블의 완전한 현재 상태를 반영해야 하는 야간 리포트
- 증분 추적을 도입하는 것이 실질적인 이득 없이 복잡성만 더하는, 10,000행 미만의 룩업 테이블
흔히 쓰는 툴
Airbyte, Fivetran, AWS Glue, pandas, SQLAlchemy
소스 데이터베이스에서 전체 국가 목록을 동기화해야 하는 프로젝트를 만들 때, 나는 풀 로드를 선택했다. 테이블이 작았고, 거의 바뀌지 않았고, 워터마크 추적을 설정하는 것이 데이터가 가진 가치보다 더 많은 코드를 필요로 했을 것이다.
샘플 코드는 다음과 같다.
import pandas as pd
from sqlalchemy import create_engine
# 소스: 소스 데이터베이스에서 테이블 전체를 읽는다
source_engine = create_engine("postgresql://user:pass@source-host/source_db")
df = pd.read_sql("SELECT * FROM country_codes", source_engine) # 전체 추출 — 필터 없음
# 목적지: 비우고 다시 로드한다
dest_engine = create_engine("postgresql://user:pass@dest-host/dest_db")
df.to_sql(
"country_codes",
dest_engine,
if_exists="replace", # 매 실행마다 테이블을 삭제하고 다시 생성한다 — 이것이 풀 로드다
index=False
)
print(f"풀 로드 완료: {len(df)} 행이 로드됨")

풀 로드 결과: 새 국가가 추가되지 않으면, 매 실행마다 같은 행 개수가 나온다. 목적지는 항상 소스를 정확히 그대로 반영한다.
Incremental Load
매번 모든 것을 가져오는 대신, 증분 로드는 마지막 실행 이후 새로 생성되거나 업데이트된 레코드만 가져온다. 보통 타임스탬프나 자동 증가 ID인 워터마크를 추적해서, 그 값이 마지막으로 기록된 값보다 늦은 행만 추출하는 방식으로 동작한다.
워터마크 방식은 소스 데이터에 신뢰할 수 있는 타임스탬프나 ID가 있을 때 잘 작동한다. 하지만 updated_at 컬럼을 바꾸지 않고 레코드가 업데이트될 수 있거나, 행이 삭제될 수 있다면, 증분 로드는 그런 변경을 조용히 놓치게 된다.
이 방법에서 가장 흔하게 보는 단 하나의 실수다. 증분 로드가 삭제까지 처리한다고 가정하는 것. 그렇지 않다.

증분 로드 비유
친구가 계속 추가하는 우표 수집품의 우표 개수를 세고 있다고 상상해보자. 방문할 때마다 총합을 적어두고, 다음번에는 떠난 후 추가된 새 우표만 센다.
이것은 완벽하게 작동한다. 친구가 일부를 버리거나, 말하지 않고 하나를 바꿔치기하지 않는 한. 개수가 줄지 않고 아무것도 플래그되지 않는다면, 무언가 바뀌었다는 것을 절대 알 수 없다.
증분 로드가 적합한 실제 상황
- 매일 수천 개의 새 행이 생기는 주문 테이블 — 매일 밤 3년치 히스토리를 옮기고 싶지 않다
updated_at컬럼이 있는 고객 테이블 — 지난 24시간 동안 변경된 행만 가져온다- 레코드가 추가되기만 하고 절대 삭제되지 않는 판매 거래 시스템
흔히 쓰는 툴
Airbyte(커서 필드 사용), Apache Spark, Pandas, SQLAlchemy(풀 로드와 증분 로드 둘 다에 사용 가능)
매일 주문 레코드를 가져오는 파이프라인을 만들었을 때, 400만 행 테이블에 하루 약 2,000개의 새 행이 생겼다. 증분 로드가 명확한 선택이었다.
매일 밤 전체 테이블을 옮기는 것은 느리고 무의미했을 것이다. 패턴은 다음과 같다.
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
last_run = datetime.now() - timedelta(hours=24) # 워터마크: 마지막 성공 실행 타임스탬프
source_engine = create_engine("postgresql://user:pass@source-host/source_db")
# 마지막 실행 이후 업데이트된 행만 추출한다
query = f"""
SELECT *
FROM orders
WHERE updated_at > '{last_run.strftime('%Y-%m-%d %H:%M:%S')}'
"""
df = pd.read_sql(query, source_engine)
dest_engine = create_engine("postgresql://user:pass@dest-host/dest_db")
df.to_sql(
"orders",
dest_engine,
if_exists="append", # 새 행을 추가한다 — 전체 테이블을 덮어쓰지 않는다
index=False
)
print(f"증분 로드 완료: {last_run} 이후 {len(df)} 행이 로드됨")

증분 로드 결과: 증분 로드를 사용하면, 400만 행 테이블 중 1,842개의 행만 옮기면 됐다. 풀 로드와 비교해서 훨씬 효율적이다.
CDC, Change Data Capture
CDC는 증분 로드보다 한 단계 더 깊이 들어간다.
소스 테이블을 쿼리하고 타임스탬프로 필터링하는 대신, CDC는 데이터베이스의 트랜잭션 로그를 읽는다. 모든 삽입, 업데이트, 삭제를 데이터베이스가 내부적으로 기록하는 로그다.
이것은 모든 것을 잡아낸다는 의미다. 새 행, 변경된 행, 삭제된 행까지.
내부적으로 Debezium 같은 CDC 툴은 데이터베이스의 바이너리 로그에 연결한다. MySQL의 binlog, PostgreSQL의 WAL. 그리고 일어나는 모든 데이터 이벤트를 방출한다.
각 이벤트는 작업 타입(INSERT, UPDATE, DELETE), 행의 이전 상태와 이후 상태, 그리고 타임스탬프를 캡처한다. 파이프라인은 이 이벤트들을 소비해서 목적지에 적용한다.
CDC가 적합한 실제 상황
- 사용자가 계정을 삭제할 수 있는 고객 테이블 — 새 가입자만 추적하는 게 아니라 그 삭제를 웨어하우스에도 전파해야 한다
- 모든 행 수정이 감사되고 역사적으로 추적되어야 하는 금융 시스템 — 변경 전/후 전체 상태가 중요하다
- 모든 테이블에 신뢰할 수 있는
updated_at타임스탬프가 없는 소스 데이터베이스 — CDC는 트랜잭션 로그를 기반으로 동작하므로 애플리케이션이 타임스탬프를 정확하게 쓰는지에 의존하지 않는다
흔히 쓰는 툴
Debezium, AWS DMS, Fivetran(CDC 모드), Airbyte(CDC 모드), Kafka Connect
소스 데이터가 삭제되고 다운스트림 분석이 존재하는 것의 정확한 카운트(단순히 한 번이라도 삽입된 것이 아니라)에 의존한다면, 증분 로드는 조용히 거짓말을 하게 된다. CDC가 그 해결책이다.
CDC 사용 예시는 다음과 같다.
# CDC는 보통 스크립트로 작성되는 것이 아니라 인프라 레벨에서 구성된다.
# 이 예시는 kafka-python을 사용해서 Kafka에서 CDC 이벤트를 소비하는 방법을 보여준다.
from kafka import KafkaConsumer # pip install kafka-python
import json
consumer = KafkaConsumer(
"db.public.customers", # Debezium 토픽 — database.schema.table 형식을 따른다
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest", # 로그의 처음부터 시작한다
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
for message in consumer:
event = message.value
operation = event["op"] # 'c' = 생성, 'u' = 업데이트, 'd' = 삭제
after = event.get("after") # 변경 이후 행 상태
before = event.get("before") # 변경 이전 행 상태 (삽입의 경우 None)
if operation == "c":
print(f"삽입: {after}")
elif operation == "u":
print(f"업데이트 — 이전: {before} | 이후: {after}")
elif operation == "d":
print(f"삭제: {before}")

Streaming Ingestion
스트리밍 수집은 데이터가 정해진 배치가 아니라 이벤트 단위로 시스템에 지속적으로 흘러들어온다는 의미다. "새벽 2시에 이 작업을 실행하라" 같은 것은 없다. 데이터가 생성되는 순간 도착하고, 파이프라인이 실시간(또는 거의 실시간)으로 처리한다.
스트리밍 파이프라인은 메시지 브로커를 중심으로 구축된다. Apache Kafka나 AWS Kinesis 같은 툴이 데이터 생산자(앱, 센서, 서비스)와 소비자(파이프라인, 분석 엔진) 사이에 자리한다.
생산자는 토픽에 이벤트를 발행한다. 소비자는 그 토픽을 구독하고 이벤트가 도착하는 즉시 처리한다. 브로커는 이벤트를 버퍼에 보관하기 때문에 소비자가 잠시 다운되더라도 데이터 손실 없이 따라잡을 수 있다.
배치 수집이 하루에 한 번 배달되는 신문 같다면, 스트리밍은 실시간 뉴스 자막과 같다. 신문은 어제 뉴스에는 괜찮다. 자막이 존재하는 이유는 어떤 것들은 아침까지 기다릴 수 없기 때문이다.
스트리밍이 필요한 실제 상황
- 거래가 승인되기 전에 의심스러운 거래를 플래그해야 하는 사기 탐지 시스템 — 24시간 배치 레이턴시는 선택지가 아니다
- 세일 행사 중 실시간 주문 수를 보여주는 이커머스 사이트의 실시간 대시보드
- 즉각적인 이상 탐지가 필요한 공장 장비의 IoT 센서 데이터
흔히 쓰는 툴
Apache Kafka, AWS Kinesis, Google Pub/Sub, Apache Flink, Spark Structured Streaming
Apache Kafka 예시는 다음과 같다.
from kafka import KafkaProducer, KafkaConsumer # pip install kafka-python
import json
import time
# --- 생산자 측: 주문 이벤트를 발행하는 앱을 시뮬레이션 ---
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
order_event = {
"order_id": "ORD-20241101-9921",
"customer_id": 4401,
"amount": 149.99,
"timestamp": "2024-11-01T14:23:01Z"
}
producer.send("orders", value=order_event) # 'orders' 토픽에 이벤트를 발행한다
producer.flush()
# --- 소비자 측: 이벤트가 도착하는 대로 처리하는 파이프라인 ---
consumer = KafkaConsumer(
"orders",
bootstrap_servers="localhost:9092",
auto_offset_reset="latest", # 과거 이벤트가 아니라 새 이벤트만 처리한다
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
for message in consumer:
event = message.value
print(f"주문 처리 중 {event['order_id']} — ${event['amount']} ({event['timestamp']})")
# 다운스트림: 실시간 분석 테이블에 쓰거나, 알림을 트리거하는 등의 작업

스트리밍 수집 결과: 이벤트가 발행되고 처리되기까지 걸린 시간은 38ms다. 이것이 스트리밍이 시간에 민감한 유스케이스를 위해 만들어진 이유다.
API-Based Ingestion
Salesforce, Stripe, HubSpot, Google Analytics 같은 대부분의 현대 SaaS 툴들은 직접적인 데이터베이스 접근을 주지 않는다. 대신 API를 노출한다. 한 번에 하나의 엔드포인트로 프로그래밍 방식으로 데이터를 요청할 수 있는 구조화된 인터페이스다.
API 기반 수집은 파이프라인이 이 엔드포인트들을 호출하고, 결과를 페이지네이션으로 넘기고, 속도 제한을 처리하고, 작업할 수 있는 곳에 데이터를 안착시킨다는 의미다.
API 수집의 기술적 현실은 다른 방법들보다 더 지저분하다. 그것이 이 방법이 마지막에 나오는 이유다.
다루어야 할 것들은 다음과 같다.
- 인증 (OAuth 토큰, API 키)
- 속도 제한 (대부분의 API는 분당 또는 일당 요청 수를 제한한다)
- 페이지네이션 (데이터는 100개나 1,000개씩 페이지로 돌아오므로 전부를 루프로 돌아야 한다)
- 스키마 변경 (API 제공자가 경고 없이 필드를 추가, 이름 변경, 삭제할 수 있다)
잘 만들어진 API 수집 작업은 이 모든 것을 우아하게 처리한다. 처음 API 수집을 만들었을 때, 속도 제한에 쉽게 걸렸고, 그것이 전략적으로 처리하는 법을 배우게 만들었다.

API 기반 수집 비유
한 번에 100개씩만 가져갈 수 있게 해주는 공급업체로부터 창고를 재입고하는 것을 생각해보자. 총 5,000개가 필요하므로 50번의 운행을 계획하고, 매 운행마다 가져온 것을 기록한 목록을 유지한다(페이지네이션).
공급업체가 하루당 방문 제한을 강제하므로 할당량 내에서 머물도록 운행을 계획한다(속도 제한). 그리고 공급업체가 카탈로그를 재구성한다면, 즉 제품 이름을 바꾸거나 라인을 중단한다면, 목록이 더 이상 일치하지 않아서 요청 방식을 업데이트해야 한다(스키마 변경).
자신의 창고에 들어가서 필요한 것을 가져가는 것과 달리, API로 작업하는 것은 다른 사람의 규칙대로 따라야 하는 것이다.
API 수집이 필요한 실제 상황
- 매출 분석을 위해 Stripe 결제 레코드를 웨어하우스로 가져오기
- 마케팅 어트리뷰션 모델을 위해 매일 밤 Google Analytics 페이지 뷰 데이터를 수집하기
- 배송 경로 계획을 보강하기 위해 서드파티 API에서 날씨 데이터를 가져오기
흔히 쓰는 툴
Airbyte(사전 구축된 커넥터), Singer(오픈소스 탭), requests(Python), httpx, dlt(data load tool)
requests는 API 호출을 처리하는 데 도움을 주는 인기 있는 라이브러리다. 프로젝트에서 자주 사용했다. 같은 코드는 다음과 같다.
import requests # pip install requests
import pandas as pd
API_KEY = "your_stripe_api_key"
BASE_URL = "https://api.stripe.com/v1/charges"
all_charges = []
params = {"limit": 100} # Stripe는 페이지당 최대 100개의 레코드를 반환한다
while True:
response = requests.get(
BASE_URL,
auth=(API_KEY, ""), # Stripe는 API 키를 기본 인증 사용자명으로 사용한다
params=params
)
data = response.json()
all_charges.extend(data["data"]) # 이 페이지의 레코드를 추가한다
if not data["has_more"]:
break # 더 이상 페이지가 없으면 — 루프를 종료한다
# 커서 기반 페이지네이션: 마지막 레코드의 ID를 시작점으로 사용한다
params["starting_after"] = data["data"][-1]["id"]
df = pd.DataFrame(all_charges)
print(f"Stripe API에서 {len(df)}개의 결제 내역을 가져왔습니다")
# 목적지에 저장
df.to_csv("stripe_charges.csv", index=False)

API 수집 결과: 1,147개의 레코드를 가져오는 데 12페이지가 걸렸고, 속도 제한 윈도우에 88개의 요청이 남아 있어서 다음 실행을 위한 여유가 충분하다.
흔한 실수: 모든 것에 한 가지 방법만 사용하기
가장 비용이 큰 수집 실수는 잘못된 툴을 고르는 것이 아니다. 수집을 일률적인 결정으로 취급하는 것이다.
엔지니어들은 단순하다는 이유로 풀 로드를 선택하고, 소스 테이블이 8천만 행으로 커졌을 때 왜 파이프라인이 새벽 3시에 타임아웃이 나는지 의아해한다.
또는 레코드가 삭제되는 테이블에 증분 로드를 만들고, 분석이 왜 틀린지 디버깅하느라 일주일을 쓴다.
방법은 데이터를 따라가야 한다. 무언가를 만들기 전에 세 가지 질문을 던진다.
- 이 데이터는 얼마나 자주 바뀌는가?
- 얼마나 큰가?
- 삭제되는가?
이 답변들이 거의 항상 올바른 방법으로 안내해줄 것이다.
올바른 방법 선택하기
각 방법이 언제 가장 잘 맞고, 언제 맞지 않는지에 대한 빠른 참조다.
| 방법 | 적합한 경우 | 부적합한 경우 |
|---|---|---|
| 풀 로드 | 작은 테이블, 자주 바뀌지 않는 데이터 | 대규모 테이블, 빈번한 변경 |
| 증분 로드 | 추가만 되는 대용량 테이블 | 레코드가 삭제될 수 있는 테이블 |
| CDC | 삭제 추적이 필요한 경우, 신뢰할 수 없는 타임스탬프 | 단순한 소규모 참조 데이터 |
| 스트리밍 | 즉각적인 처리가 필요한 시간 민감 데이터 | 배치로 충분한 데이터 |
| API 기반 | 직접 DB 접근이 없는 서드파티 SaaS 데이터 | 직접 데이터베이스 접근이 가능한 경우 |
💡 요약하면: 데이터의 크기, 변경 빈도, 삭제 여부라는 세 가지 질문에 답하면 다섯 가지 방법 중 어느 것이 자신의 상황에 맞는지 거의 항상 알 수 있다.
'최신 IT' 카테고리의 다른 글
| OLAP Tools 3가지 비교 : Databricks, Snowflake, BigQuery (0) | 2026.06.04 |
|---|---|
| 전문가급 대시보드를 만드는 10가지 핵심 Streamlit 디자인 팁 (0) | 2026.05.26 |
| 로컬 데이터 분석 도구 — Python + Streamlit (1) | 2026.04.10 |
| n8n + Google Sheets로 LinkedIn 포스팅을 자동화한 방법 (0) | 2026.03.10 |
| Node.js 에 대한 고찰 (5) | 2026.01.21 |