
- MySQL 테이블 복사하기
- Backfill 실행해보기
프로덕션 데이터베이스(MySQL)에서 데이터 웨어하우스(Redshift)로 테이블을 복사하는 방법을 알아보자.

레코드가 적은 경우 #2 방식으로 INSERT INTO 명령어로 적재하면 되고, 레코드가 많은 경우 #1 방식같이 S3에 파일을 올린 뒤 Redshift로 COPY 명령어를 이용하여 벌크 업데이트 하는 방식을 선택하면 된다.
자신의 Mysql Host명과 ID, PW로 Airflow 웹 UI에서 Connection을 만들어주자.

Access Key ID와 Secret Access Key를 사용하여 접근하기 위해 AWS IAM으로 별도의 사용자를 만들고, 그 사용자에게 S3 bucket을 읽고 쓸 수 있는 권한을 제공한다. 그 후 그 사용자의 Access Key ID와 Secret Access Key를 사용해서 S3에 접근하도록 설정.



이 액세스키를 가지고 aws에 연결할 것이기 때문에 두 가지 키를 별도로 저장해두자.


우리가 만들 DAG는 의 TASK는 총 두개로 SqlToS3Operator, S3ToRedshiftOperator를 만들어 보자
SqlToS3Operator
S3ToRedshiftOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "jwa4610"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Incremental Update 방식으로 진행하려면 MySQL / PostgreSQL 테이블이라면 다음을 만족해야한다.
S3ToRedshiftOperator로 구현하는 경우
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)UPSERT를 지정upsert_keys 파라미터로 Primary Key를 지정
전체 코드
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "jwa4610"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
상황을 먼저 가정해보자. Dailly Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 하는 경우 만약 Airflow에서 추천하는 방식으로 Incremental Update르 구현했다면 Backfill이 쉬워진다.
명령어 : airflow dags backfill dag_id -s 2018- 07- 01 -e 2018- 08- 01