MySQL 테이블 복사하기
Backfill 실행하기
구현하려는 ETL
가상의 프로덕션 MySQL(OLTP)의 레코드를 Redshift(OLAP)로 복사
AWS 관련 권한 설정
Airflow DAG에서의 S3 접근 권한(쓰기)
Redshift의 S3 접근 권한(읽기)
복사할 MySQL 테이블
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
created_at TIMESTAMP,
score SMALLINT
);
Redshift 테이블
CREATE TABLE schema_name.nps (
id INT NOT NULL PRIMARY KEY,
created_at TIMESTAMP,
score SMALLINT
);
MySQL_to_Redshift DAG의 태스크 구성
SqlToS3Operator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
mysql_to_s3_nps = SqlToS3Operator(
task_id = "mysql_to_s3_nps",
query = "SELECT * FROM prod.nps",
s3_bucket = "s3_bucket_name",
s3_key = "s3_key_name",
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs = {"index": False, "header": False},
dag = dag
)
S3ToRedshiftOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = "s3_to_redshift_nps",
s3_bucket = "s3_bucket_name",
s3_key = "s3_key_name",
schema = "schema_name",
table = "table_name",
copy_options ["csv"],
method = "REPLACE",
sql_conn_id = "redshift_dev_id",
aws_conn_id = "aws_conn_id",
dag = dag
)
Incremental Update Version
MySQL / PostgreSQL 테이블이라면 다음을 만족해야 함
created(TIMESTAMP): 생성시각, Optional
modified(TIMSTAMP): 변경시각
deleted(BOOLEAN): 레코드를 삭제하는 것이 아닌, deleted 파라미터를 True로 설정
ROW_NUMBER로 직접 구현
Redshift의 A 테이블 내용을 temp_A에 복사
MySQL의 A테이블 레코드 중 modified의 날짜가 이미 지난(execution_date) 모든 레코드를 temp_A에 복사
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
temp_A의 레코드들을 PK 기준으로 파티셔닝한 다음, modified 기준으로 역순 정렬해, 일련번호가 1인 것들만 원본 A테이블로 복사
S3TORedshiftOperator로 구현
query = “SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)”
method = “UPSERT”
upsert_keys 파라미터로 Primary Key 지정
upsert_keys = [”id”]
CLI에서 Backfill 실행
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
start_date는 포함, end_date는 미포함
기본적으로 시간 순이 아닌 랜덤이며, depends_on_past = True로 설정해 시간 순으로 실행 가능
DAG의 Backfill
모든 DAG가 backfill을 필요로 하지는 않음
Full Refresh로 구현했다면 backfill이 필요 X
마지막 업데이트 시간을 기준으로 backfill을 한다면, execution_date을 사용한 backfill이 필요 X
데이터의 크기가 매우 커질 경우, backfill 기능 구현이 필수적
airflow가 큰 도움이 됨
데이터 소스의 도움 없이는 불가능
backfill 구현 조건
가장 중요한 것은 데이터 소스가 backfill 방식을 지원해야 함
execution_date를 사용해 업데이트할 데이터 결정
start_date / end_date로 backfill하려는 날짜 설정
catchup = True로 설정
DAG 구현 시 execution_date를 고려해야 하며, idempotent해야 함