1. MySQL 테이블을 Redshift로 복사
2. Airflow의 Backfill 실행
3. Airflow와 데이터 파이프라인 정리
[프로덕션 데이터베이스 (OLTP)] MySQL에서 [데이터 웨어하우스 (OLAP)] Redshift로 테이블을 복사를 해보겠습니다.

복사할 레코드가 적은 경우
INSERT INTO를 통해 Redshift로 직접 추가
복사할 레코드가 많은 경우
S3에 적재 후 COPY 명령을 통해 Redshift에 추가
Airflow DAG에서 S3에 접근할 수 있도록
Redshift가 S3에 접근할 수 있도록

MySQL -> Redshift로 복제하는 과정에서
"ModuleNotFoundError: No module named 'MySQLdb'" 에러가 발생할 수 있습니다.이를 해결하기 위해 Airflow Scheduler Docker Container에 root 유저로 로그인을 해서 몇 가지 명령어를 실행해줘야 합니다.
# scheduler의 container에 root 유저로 접근
docker exec --user root -it container_id sh
sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
CREATE TABLE schema_name.nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
아래 2개의 Operator를 사용해서 구현
SqlToS3Operator
S3ToRedshiftOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24),
schedule = '0 9 * * *',
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "skqltldnjf77"
table = "nps"
# bucket 이름
s3_bucket = "grepp-data-engineering"
# s3_key, 즉 s3에 저장할 파일 이름
s3_key = schema + "-" + table
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
# Redshift table 생성
def create_redshift_table():
cur = get_Redshift_connection()
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);""")
create_redshift_table = PythonOperator(
task_id = 'create_redshift_table',
python_callable = create_redshift_table,
dag = dag
)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
# mysql prod.nps 테이블의 데이터를 읽어옴
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
# mysql 연결 정보
sql_conn_id = "mysql_conn_id",
# aws(s3) 연결 정보
aws_conn_id = "aws_conn_id",
verify = False,
# 만약, 지금 저장하려하는 S3 key가 이미 존재하고 있다면 교체할 것인지?
# Fail 처리할 것인지?
replace = True,
# S3에 업로드할 때, 어떤 식으로 할 것인지 정하는 것
# index와 header는 복사하지 않도록 함
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
# redshift 연결 정보
redshift_conn_id = "redshift_dev_db",
# S3 연결 정보
aws_conn_id = "aws_conn_id",
dag = dag
)
create_redshift_table >> mysql_to_s3_nps >> s3_to_redshift_nps
Production DB의 경우,
Incremental Update를 하기 위해 만족해야하는 조건들이 있습니다.
created(timestamp) : Optional
modified(timestamp)
deleted(boolean) : 레코드를 삭제하지 않고 deleted를 True로 설정
레코드가 삭제가 된다면 Incremental Update가 불가능하게 됩니다.
삭제가 된 상태인지 다 읽어오기전까지는 모르기 때문에,
실제로 물리적으로 삭제하는 것 대신에 deleted를 True로 체크하는 방식을 사용합니다.
아래와 같이 상황을 가정해보겠습니다.
Daily Update,
테이블 이름 : A,
MySQL -> Redshift
ROW_NUMBER로 구현하는 경우
- Redshift의 A 테이블의 내용을 임시 테이블에 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어서 임시 테이블에 추가
ex)SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)- 임시 테이블의 레코드들을 primary key를 기준으로 Partition 후 중복 제거하여 A로 복사
S3ToRedshiftOperator로 구현하는 경우
- query 파라미터로 아래 query문을 지정
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)- method 파라미터로 "UPSERT"를 지정
- upsert_keys 파라미터로 Primary key를 지정
( 지정한 upsert_keys를 기준으로 그 값이 같은 레코드들은 새로 복사되는 레코드들로 교체 -> Redshift에서 제공하는 기능 )
2개의 Operator를 사용해서 구현을 진행할 것입니다.
아래 DAG를 동작시키기 앞서 위의 Full Refresh 코드를 먼저 동작시켜야합니다.
Redshift의 테이블 생성 코드도 없을 뿐더러, execution_date = created가 일치하는 레코드가 없을 수도 있기 때문입니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1),
schedule = '0 9 * * *',
# 동시에 실행하는 DAG의 수 = 1
max_active_runs = 1,
catchup = False,
default_args = {
# 실패시 재시도 1회
'retries': 1,
# 실패 후 3분 후에 실행
'retry_delay': timedelta(minutes=3),
}
)
schema = "skqltdnjf77"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
# mysql에서 S3로 적재할 때는
# created, 즉 MySQL에서 생성된 날짜와
# Airflow의 execution_date가 동일한 날짜들만 추출해서 적재
# Airflow의 경우 '{{...}}`와 같은 형태로 사용하여,
# Airflow가 제공하는 시스템 변수 값을 파이썬 코드로 호출
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
# S3에 적재된 csv를 copy
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
# UPSERT를 진행
method = "UPSERT",
# nps테이블의 Primary key인 id를 선택
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Airflow에서 추천하는 방식을 통해 Incremental Update를 구현했다면 Backfill이 용이
방법 1 : 일일히 execution_date를 지정하여 실행 ( 하루씩 31번 실행 )
airflow dags test MySQL_to_Redshift_v2 2023-07-01airflow dags test MySQL_to_Redshift_v2 2023-07-31Airflow가 제공하는 Backfill 기능 중에는 시작 날짜와 끝 날짜를 지정하여 실행해주는 커맨드가 존재합니다. 이를 사용하면 보다 쉽게 Backfill을 실행할 수 있습니다.
방법 2 : 한번에 여러 날짜를 동시에 실행(병렬 실행)
이를 제어해주는 DAG 파라미터가 max_active_runs
문제점
Data Source에 들어가는 overhead가 클 수 있습니다. (Production DB가 멈출 수도 있음)
DAG 구현 방식에 따라 한번에 여러 날짜를 동시에 실행하면 충돌이 발생할 수 있습니다.
COPY가 동시에 실행되면 한 날짜의 데이터만 COPY가 될 수도 있습니다.
따라서, 동시에 실행하기보다는 한번에 하나씩 실행하는 것이 안전합니다.
( max_active_runs = 1 )
airflow dags backfill 'dag_id' -s 'start_date' -e 'end_date'
선행 조건
start_date는 포함되지만, end_date는 포함되지 않습니다.
실행 순서는 날짜/시간 순이 아닌 랜덤으로 실행
날짜 순으로 실행하고 싶다면 default_args를 수정
default_args = {
...
'depends_on_past': True,
}
모든 DAG가 backfill을 필요로 하지는 않음
Backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미
Backfill 조건
가장 중요한 것은 데이터 소스가 Backfill 방식을 지원해야함
"execution_date"를 사용해서 업데이트할 데이터 결정
"catchup" 필드를 True로 설정
start_date/end_date를 backfill하려는 날짜로 설정
DAG 구현이 execution_date를 고려해야하며 indempotent(멱등성)을 보장해야함!!
( UPSERT -> 중복제거 )
-> 두번째로 중요
Airflow는 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크
가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
Airflow의 장점
스케일링 방식
데이터 파이프라인에 관한 정보를 수집하는 것이 중요
비지니스 오너와 데이터 리니지에 주의할 것
비지니스 오너 : 해당 데이터 파이프라인의 데이터를 요청한 사람이 누구인지?
데이터 리니지 : 한 데이터가 뒷 단에서 어떻게 쓰이는지 흐름이 모두 파악이 되어야합니다. 작은 수정이 큰 영향을 끼칠 수 있기에..
-> 데이터 카탈로그가 필요하게 됨
데이터 품질 체크가 필요!
가능하면 Full Refresh
주기적인 청소 (데이터, 테이블, Dag)