
- 데이터 파이프라인
- Airflow

데이터를 소스로부터 목적지로 복사하는 작업으로 보통 코딩(Python, Scala) 혹은 SQL을 통해 이뤄짐, 대부분의 경우 목적지는 데이터 웨어하우스가 된다.
Raw Data ETL Jobs
Summary / Report Jobs
Production Data Jobs
데이터가 작을 경우 가능하다면 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
Incremental Update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇 가지 조건이 있다.
멱등성(Idempotency)을 보장하는 것이 중요
실패한 데이터 파이프라인을 재실행하는 것이 쉬워야 하고, 과거 데이터를 다시 채우는 과정(Backfill)도 쉬워야 한다.
데이터 파이프라인의 입력과 출력을 명확히 하고 문서화해야 한다.
데이터 파이프라인 사고시 마다 사고 리포트(post-mortem) 작성
중요 데이터 파이프라인의 입력과 출력을 체크하기
%sql postgresql://ID:PW@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
DROP TABLE IF EXISTS jwa4610.name_gender;
CREATE TABLE jwa4610.name_gender (
name varchar(32) primary key,
gender varchar(8)
);
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "ID"
redshift_pass = "PW"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
extract, transform, load)import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO jwa4610.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
%%sql
-- 총 레코드 개수
SELECT COUNT(1)
FROM jwa4610.name_gender;
-- 레코드 확인
SELECT *
FROM jwa4610.name_gender
LIMIT 10;
-- 성별 별 레코드 개수
SELECT gender, COUNT(1) count
FROM jwa4610.name_gender
GROUP BY gender;


Apache Airflow는 워크플로우와 데이터 파이프라인을 자동화하고 모니터링하기 위한 오픈 소스 플랫폼입니다.
DAG(Directed Acyclic Graph)를 사용하여 작업 간의 의존성을 정의하고 스케줄링할 수 있습니다. 이를 통해 복잡한 데이터 처리 작업을 효율적으로 관리하고 오류를 쉽게 추적할 수 있습니다.
Airflow는 총 5개의 컴포넌트로 구성되어 있다.
웹 서버 (Web Server)
스케줄러 (Scheduler)
워커 (Worker)
메타 데이터 데이터베이스
큐



[장점]
[단점]