OLTP(MySQL) -> S3 -> OLAP(Redshift)를 Full Refresh 버전과 Incremental Update 버전으로 나눠서 진행.
AWS 관련 권한 설정
- Airflow DAG에서 S3 접근 (쓰기 권한)
- Redshift가 S3 접근 (읽기 권한)
- IAM을 사용해 별도의 사용자 생성 -> 해당 사용자에게 권한 부여 -> 액세스 Key ID와 Secret 액세스 Key 복사(시크릿 액세스 키의 경우, 생성된 첫 화면에서만 보여 주니 잘 저장해야 함.).
- https://docs.google.com/document/d/1FArSdUmDWHM9zbgEWtmYSJnxPXDX-LB7HT33AYJlWIA/edit#heading=h.9u82ph29nth9
MySQL Connection 설정
- Airflow Admin 메뉴에서 Connections 선택 후, MySQL 설정.
- Connection Id : mysql_conn_id
- Connection Type : MySQL
- host, schema, login, pw, port 입력
S3 Connection 설정
- Airflow Admin 메뉴에서 Connections 선택 후, S3 설정.
- Connection Id : aws_conn_id
- Connection Type : Amazon Web Services
- Access Key ID, Secret Access Key, Extra에 {"region_name": "ap-northeast-2"} 입력.
SqlToS3Operator (airflow에서 제공하는 기본 오퍼레이터)
S3ToRedshift (airflow에서 제공하는 기본 오퍼레이터)
코드 MySQL_to_Redshift.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "jaeho"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE', # Replace로 적용함으로써 Full Refresh 방식으로 적용.
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Incremental Update 방식
필요한 필드 리스트 : created, modified, deleted
ROW_NUMBER 방식과 Redshift에서 제공하는 UPSERT 방식이 있음. (후자 선택.)
MySQL_to_Redshift_v2.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "jaeho"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
# 동일한 created_at과 execution_date가 동일한 것만 SELECT. {{,}}는 airflow 문법.
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT", # method 파라미터를 UPSERT로 지정함으로써, Incremental 업데이트를 적용함.
upsert_keys = ["id"], # upsert_key를 id(primary key)로 지정. 만약 겹치는 id가 있다면 해당 레코드를 갱신, 없으면 추가.
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
- AWS S3 Connections 설정
- Redshift S3 Connection 설정
- MySQL 관련 모듈 설치 (Docker)
- MySQL_to_Redshift DAG 실행
- MySQL_to_Redshift_v2 DAG 실행
만약 2023-01-01 부터 2023-01-31 까지의 데이터만 필요하다면(backfill),
airflow dags backfill dag_id -s 2023-01-01 -e 2023-02-01
로 커맨드 입력.