프로덕션 데이터베이스 (MySQL) : OLTP
가상의 프로덕션 데이터베이스 prod.nps
to
데이터 웨어하우스 (AWS Redshift) : OLAP
raw_data.nps
mySQL에서 airflow로 데이터 파일로 변환 후 S3 버킷에 저장한다음 S3에서 해당 파일 Redshift로 COPY











docker exec —user root -it 0017662673c3 sh
sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip install "apache-airflow-providers-mysql"

CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
이미 테이블은 MySQL쪽에 만들어져 있고 레코드들이 존재하며 이를 Redshift로 복사하는 것이 목적
CREATE TABLE (본인의 스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
이 테이블들은 Redshift쪽에 본인 스키마 밑에 별도로 만들고 뒤에서 실습할 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사할 것
SqlToS3Operator
S3ToRedshiftOperator

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "kyongjin1234"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True, #이미 s3에 존재할 경우 overwrite
pd_kwargs={"index": False, "header": False}, #mySql->s3시 헤더는 copy no, 일련번호 copy no
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE', #APPEND, UPSERT도 있는데 REPLACE시 Full refresh, 다 날리고 새로 테이블 생성, upsert는 Primary key 기준으로 존재할 경우 수정, 존재하지 않을 경우 추가
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
태스크를 만드는 두 오퍼레이터 모두 airflow에서 제공해주는 native operator
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "kyongjin1234"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')" # 매일 한번씩 실행될 경우 전날날짜로 바꿔치기 될거임.
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
먼저 모든 DAG가 backfill을 필요로 하지는 않음
여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
어떻게 backfill로 구현할 것인가