DevCourse TIL Day6 Week10

김태준·2023년 6월 12일
0

Data Enginnering DevCourse

목록 보기
46/93
post-thumbnail

✅ MySQL -> Redshift

프로덕션 DB로 가장 많이 사용되는 MySQL(OLTP)로부터 DW인 Redshift로 데이터가 이동하는 ETL 과정에 대해 실습해보고자 한다.
우선 MySQL DB의 데이터를 파일로 저장하여 Airflow server에 저장한 후 S3와 같은 클라우드 스토리지에 업로드, 이후 Redshift로 copy 커맨드로 벌크 업데이트 진행하는 3단계 절차를 거치게 된다.

결국 MySQL 과 Airflow의 커넥션과 Airflow와 S3간 커넥션, S3와 Redshift간 커넥션 작업이 필요로 해진다.

< Airflow 서버에 S3, MySQL, Redshift connection 결과 >

🎈 MySQL-To-Redshift DAG의 Task 구성

  • SqlToS3Operator
    : MySQL 결과 -> S3 (s3://s3_bucket/s3_key)
  • S3ToRedshiftOperator
    : S3 -> Redshift (COPY 커맨드 사용, s3://grepp-data-engineering/{본인ID}-nps)

2개의 operator를 활용하여 DAG를 구성하였고 코드는 다음과 같다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
import requests
import logging
import psycopg2
import json
from datetime import datetime, timedelta

dag = DAG(
    dag_id='MySQL_to_Redshift',
    start_date=datetime(2023, 6, 10),
    schedule='0 9 * * *',
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries':1,
        'retry_delay':timedelta(minutes=3),
    }
)

# default 변수 미리 지정
schema = 'taejun3305'
table = 'nps'
s3_bucket = 'grepp-data-engineering'
s3_key = schema + "-" + table
# 모든 쿼리 조회하기 위해 IAM - S3FullAccess 권한 필요
# jinja 탬플릿 format 활용해 airflow가 넘겨주는 system valuable 값을 파이썬 코드로 사용
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs = {"index": False, "header": False},
    dag= dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options = ['csv'],
    redshift_conn_id = 'redshift_dev_db',
    aws_conn_id = 'aws_conn_id',
    method = 'UPSERT',
    upsert_keys = ['id'],
    dag = dag
)

# task flow
mysql_to_s3_nps >> s3_to_redshift_nps

🎈 MySQL 테이블 Incremental Update

  1. create(timestamp) - 생성 modified(timestamp) - 수정 deleted (boolean) 형태로 진행해 시점 저장, 삭제되는 경우 Incremental Update 불가능
  2. ROW_NUMBER로 구현 - execution_date 사용하여 문제 발생 줄여 데이터 정합성 확보
  3. Redshift에서 제공하는 UPSERT 방식 사용
    -> query 파라미터 : SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    -> method 파라미터 : UPSERT
    -> upsert_keys 파라미터 : Primary key로 지정

추가적인 실습과정은 다음과 같다.
1. AWS S3 Connections 설정 (IAM User 설정 - Policy 생성필요 - json 업로드)
2. Redshift S3 Connections 설정 (IAM Role 설정)
3. MySQL 관련 모듈 설치 (Docker)
-> 아래 명령 container에 root user로 로그인해 실행 필요

# 그전에 docker 킨 상태로 docker ps로 containerID 저장하기.
docker exec --user root -it containerID sh
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
  1. MySQL_to_Redshift DAG 실행
  2. MySQL_to_Redshift_v2 DAG 실행

✅ Airflow Backfill 실행하기

앞서 학습한 대로 1년치 데이터를 Incremental Update 해야 한다면 반복 루프를 돌면서 여러 번 작업해야 하는 환경이 발생한다. 이를 해결하는 방법은 다음과 같다
-> 한번에 여러 날짜를 동시에 진행하는 방법 : DAG 파라미터 : max_active_runs

  • 모든 DAG가 backfill필요로 하진 않음 (Full Refresh의 경우 backfill 의미 X)
  • backfill은 일별, 시간 별로 업데이트 진행
  • 데이터 크기가 커질수록 backfill기능은 선택이 아닌 필수

-> 커맨드라인에서 구현방식은 다음과 같다.
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
(start_date부터 시작하지만 end_date는 포함 X, catchup이 반드시 True, default_args = { 'depends_on_past': True}로 설정할 것.

-> Airflow에서 구현방식은 다음과 같다.

  1. 데이터 소스가 backfill을 지원하도록 구현
  2. execution_date 사용해 업데이트할 데이터 결정
  3. catchup 변수를 True로 설정하여 날짜 지난 것에 대해 작업 실행 구현
  4. start_date, end_date를 backfill하려는 날짜로 지정
  5. dag구현이 execution_date를 고려해야 하며 idempotent(멱등성 보장)해야 함.
profile
To be a DataScientist

0개의 댓글