프로덕션 DB로 가장 많이 사용되는 MySQL(OLTP)로부터 DW인 Redshift로 데이터가 이동하는 ETL 과정에 대해 실습해보고자 한다.
우선 MySQL DB의 데이터를 파일로 저장하여 Airflow server에 저장한 후 S3와 같은 클라우드 스토리지에 업로드, 이후 Redshift로 copy 커맨드로 벌크 업데이트 진행하는 3단계 절차를 거치게 된다.결국 MySQL 과 Airflow의 커넥션과 Airflow와 S3간 커넥션, S3와 Redshift간 커넥션 작업이 필요로 해진다.
< Airflow 서버에 S3, MySQL, Redshift connection 결과 >
🎈 MySQL-To-Redshift DAG의 Task 구성
- SqlToS3Operator
: MySQL 결과 -> S3 (s3://s3_bucket/s3_key)- S3ToRedshiftOperator
: S3 -> Redshift (COPY 커맨드 사용, s3://grepp-data-engineering/{본인ID}-nps)2개의 operator를 활용하여 DAG를 구성하였고 코드는 다음과 같다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
import requests
import logging
import psycopg2
import json
from datetime import datetime, timedelta
dag = DAG(
dag_id='MySQL_to_Redshift',
start_date=datetime(2023, 6, 10),
schedule='0 9 * * *',
max_active_runs=1,
catchup=False,
default_args={
'retries':1,
'retry_delay':timedelta(minutes=3),
}
)
# default 변수 미리 지정
schema = 'taejun3305'
table = 'nps'
s3_bucket = 'grepp-data-engineering'
s3_key = schema + "-" + table
# 모든 쿼리 조회하기 위해 IAM - S3FullAccess 권한 필요
# jinja 탬플릿 format 활용해 airflow가 넘겨주는 system valuable 값을 파이썬 코드로 사용
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs = {"index": False, "header": False},
dag= dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options = ['csv'],
redshift_conn_id = 'redshift_dev_db',
aws_conn_id = 'aws_conn_id',
method = 'UPSERT',
upsert_keys = ['id'],
dag = dag
)
# task flow
mysql_to_s3_nps >> s3_to_redshift_nps
🎈 MySQL 테이블 Incremental Update
- create(timestamp) - 생성 modified(timestamp) - 수정 deleted (boolean) 형태로 진행해 시점 저장, 삭제되는 경우 Incremental Update 불가능
- ROW_NUMBER로 구현 - execution_date 사용하여 문제 발생 줄여 데이터 정합성 확보
- Redshift에서 제공하는 UPSERT 방식 사용
-> query 파라미터 : SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
-> method 파라미터 : UPSERT
-> upsert_keys 파라미터 : Primary key로 지정
추가적인 실습과정은 다음과 같다.
1. AWS S3 Connections 설정 (IAM User 설정 - Policy 생성필요 - json 업로드)
2. Redshift S3 Connections 설정 (IAM Role 설정)
3. MySQL 관련 모듈 설치 (Docker)
-> 아래 명령 container에 root user로 로그인해 실행 필요
# 그전에 docker 킨 상태로 docker ps로 containerID 저장하기.
docker exec --user root -it containerID sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
앞서 학습한 대로 1년치 데이터를 Incremental Update 해야 한다면 반복 루프를 돌면서 여러 번 작업해야 하는 환경이 발생한다. 이를 해결하는 방법은 다음과 같다
-> 한번에 여러 날짜를 동시에 진행하는 방법 : DAG 파라미터 : max_active_runs
- 모든 DAG가 backfill필요로 하진 않음 (Full Refresh의 경우 backfill 의미 X)
- backfill은 일별, 시간 별로 업데이트 진행
- 데이터 크기가 커질수록 backfill기능은 선택이 아닌 필수
-> 커맨드라인에서 구현방식은 다음과 같다.
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
(start_date부터 시작하지만 end_date는 포함 X, catchup이 반드시 True, default_args = { 'depends_on_past': True}로 설정할 것.-> Airflow에서 구현방식은 다음과 같다.
- 데이터 소스가 backfill을 지원하도록 구현
- execution_date 사용해 업데이트할 데이터 결정
- catchup 변수를 True로 설정하여 날짜 지난 것에 대해 작업 실행 구현
- start_date, end_date를 backfill하려는 날짜로 지정
- dag구현이 execution_date를 고려해야 하며 idempotent(멱등성 보장)해야 함.