Airflow DAG 구현 - OLTP 복사와 ELT (TIL 35)

석형원·2024년 5월 29일

TIL

목록 보기
35/52

✏️ 오늘 학습한 내용

1. MySQL 테이블을 Redshift로 복사
2. Airflow의 Backfill 실행
3. Airflow와 데이터 파이프라인 정리


🔎 MySQL 테이블을 Redshift로 복사

[프로덕션 데이터베이스 (OLTP)] MySQL에서 [데이터 웨어하우스 (OLAP)] Redshift로 테이블을 복사를 해보겠습니다.

  • 복사할 레코드가 적은 경우
    INSERT INTO를 통해 Redshift로 직접 추가

  • 복사할 레코드가 많은 경우
    S3에 적재 후 COPY 명령을 통해 Redshift에 추가

AWS 관련 권한 설정

  • Airflow DAG에서 S3에 접근할 수 있도록

    • IAM에서 사용자 생성
      (S3 버킷에 대한 읽기/쓰기 권한 부여)
  • Redshift가 S3에 접근할 수 있도록

    • IAM에서 역할 생성 후 Redshift에 지정]

MySQL Connection 설정 (Airflow - AWS MySQL)

MySQL Connection 설정시 유의사항

MySQL -> Redshift로 복제하는 과정에서
"ModuleNotFoundError: No module named 'MySQLdb'" 에러가 발생할 수 있습니다.

이를 해결하기 위해 Airflow Scheduler Docker Containerroot 유저로 로그인을 해서 몇 가지 명령어를 실행해줘야 합니다.

# scheduler의 container에 root 유저로 접근
docker exec --user root -it container_id sh

sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

AWS S3 Connection

  • IAM을 사용해 별도의 사용자 생성
  • 그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 설정
  • 그 사용자의 Access Key ID와 Secret Access Key를 사용
    • 키가 노출될 경우 해킹 위험이 있으니 주기적으로 변경해야합니다!
      ( 제가 그 해킹을 당했습니다.. )

테이블 상세

  • MySQL 테이블
CREATE TABLE prod.nps (
 id INT NOT NULL AUTO_INCREMENT primary key,
 created_at timestamp,
 score smallint
);
  • Redshift 테이블
CREATE TABLE schema_name.nps (
 id INT NOT NULL primary key,
 created_at timestamp,
 score smallint
);

MySQL_to_Redshift DAG의 Task 구성

아래 2개의 Operator를 사용해서 구현

  • SqlToS3Operator

    • MySQL SQL 결과 -> S3에 적재
    • s3://s3_bucket_name/s3_key
  • S3ToRedshiftOperator

    • S3 -> Redshift 테이블에 적재
    • COPY command

📃 Full Refresh 방식

코드 구현

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), 
    schedule = '0 9 * * *',  
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "skqltldnjf77"
table = "nps"
# bucket 이름
s3_bucket = "grepp-data-engineering"
# s3_key, 즉 s3에 저장할 파일 이름
s3_key = schema + "-" + table

def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

# Redshift table 생성
def create_redshift_table():
    cur = get_Redshift_connection()
    cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    id INT NOT NULL primary key,
    created_at timestamp,
    score smallint
);""")

create_redshift_table = PythonOperator(
    task_id = 'create_redshift_table',
    python_callable = create_redshift_table,
    dag = dag
)

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    # mysql prod.nps 테이블의 데이터를 읽어옴
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    # mysql 연결 정보
    sql_conn_id = "mysql_conn_id",
    # aws(s3) 연결 정보
    aws_conn_id = "aws_conn_id",
    verify = False,
    # 만약, 지금 저장하려하는 S3 key가 이미 존재하고 있다면 교체할 것인지? 
    # Fail 처리할 것인지?
    replace = True,
    # S3에 업로드할 때, 어떤 식으로 할 것인지 정하는 것
    # index와 header는 복사하지 않도록 함
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    # redshift 연결 정보
    redshift_conn_id = "redshift_dev_db",
    # S3 연결 정보
    aws_conn_id = "aws_conn_id",
    dag = dag
)

create_redshift_table >>  mysql_to_s3_nps >> s3_to_redshift_nps

📃 Incremental Update 방식

MySQL 테이블의 Incremental Update

Production DB의 경우,
Incremental Update를 하기 위해 만족해야하는 조건들이 있습니다.

  • created(timestamp) : Optional

    • 레코드가 처음 생성될 때마다 기록하는 timestamp가 필요합니다.
  • modified(timestamp)

    • 레코드가 변경될 때마다 기록이 되는 timestamp가 필요합니다.
  • deleted(boolean) : 레코드를 삭제하지 않고 deleted를 True로 설정

    레코드가 삭제가 된다면 Incremental Update가 불가능하게 됩니다.
    삭제가 된 상태인지 다 읽어오기전까지는 모르기 때문에,
    실제로 물리적으로 삭제하는 것 대신에 deleted를 True로 체크하는 방식을 사용합니다.

예시 상황

아래와 같이 상황을 가정해보겠습니다.
Daily Update,
테이블 이름 : A,
MySQL -> Redshift

  • ROW_NUMBER로 구현하는 경우

    1. Redshift의 A 테이블의 내용을 임시 테이블에 복사
    2. MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어서 임시 테이블에 추가
      ex) SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    3. 임시 테이블의 레코드들을 primary key를 기준으로 Partition 후 중복 제거하여 A로 복사
  • S3ToRedshiftOperator로 구현하는 경우

    1. query 파라미터로 아래 query문을 지정
      SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    2. method 파라미터로 "UPSERT"를 지정
    3. upsert_keys 파라미터로 Primary key를 지정
      ( 지정한 upsert_keys를 기준으로 그 값이 같은 레코드들은 새로 복사되는 레코드들로 교체 -> Redshift에서 제공하는 기능 )

코드 구현

2개의 Operator를 사용해서 구현을 진행할 것입니다.

  • SqlToS3Operator
    • execution_date에 해당하는 레코드만 읽어오기
  • S3ToRedshiftOperator
    • method : UPSERT
    • upsert_keys = id(Primary key)

아래 DAG를 동작시키기 앞서 위의 Full Refresh 코드를 먼저 동작시켜야합니다.

Redshift의 테이블 생성 코드도 없을 뿐더러, execution_date = created가 일치하는 레코드가 없을 수도 있기 때문입니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json

dag = DAG(
    dag_id = 'MySQL_to_Redshift_v2',
    start_date = datetime(2023,1,1), 
    schedule = '0 9 * * *',
    # 동시에 실행하는 DAG의 수 = 1
    max_active_runs = 1,
    catchup = False,
    default_args = {
    	# 실패시 재시도 1회
        'retries': 1,
        # 실패 후 3분 후에 실행
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "skqltdnjf77"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table

# mysql에서 S3로 적재할 때는
# created, 즉 MySQL에서 생성된 날짜와
# Airflow의 execution_date가 동일한 날짜들만 추출해서 적재
# Airflow의 경우 '{{...}}`와 같은 형태로 사용하여, 
# Airflow가 제공하는 시스템 변수 값을 파이썬 코드로 호출
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    # S3에 적재된 csv를 copy
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    # UPSERT를 진행
    method = "UPSERT",
    # nps테이블의 Primary key인 id를 선택
    upsert_keys = ["id"],
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

🔎 Airflow의 Backfill 실행

예시) Daily Incremental DAG에서 2018년 7월달 데이터를 다시 읽어와야한다면?

  • Airflow에서 추천하는 방식을 통해 Incremental Update를 구현했다면 Backfill이 용이

  • 방법 1 : 일일히 execution_date를 지정하여 실행 ( 하루씩 31번 실행 )

    • airflow dags test MySQL_to_Redshift_v2 2023-07-01
    • ...
    • airflow dags test MySQL_to_Redshift_v2 2023-07-31

    Airflow가 제공하는 Backfill 기능 중에는 시작 날짜와 끝 날짜를 지정하여 실행해주는 커맨드가 존재합니다. 이를 사용하면 보다 쉽게 Backfill을 실행할 수 있습니다.

  • 방법 2 : 한번에 여러 날짜를 동시에 실행(병렬 실행)

    • 이를 제어해주는 DAG 파라미터가 max_active_runs

      • 한 번에 몇 개의 DAG를 동시에 실행할지 지정해주는 파라미터
    • 문제점

      • Data Source에 들어가는 overhead가 클 수 있습니다. (Production DB가 멈출 수도 있음)

      • DAG 구현 방식에 따라 한번에 여러 날짜를 동시에 실행하면 충돌이 발생할 수 있습니다.

      • COPY가 동시에 실행되면 한 날짜의 데이터만 COPY가 될 수도 있습니다.

    • 따라서, 동시에 실행하기보다는 한번에 하나씩 실행하는 것이 안전합니다.
      ( max_active_runs = 1 )

Backfill을 커맨드 라인에서 실행하는 방법

airflow dags backfill 'dag_id' -s 'start_date' -e 'end_date'

  • 선행 조건

    • catchup : True
    • execution_date를 사용한 Incremental Update가 구현되어 있어야 합니다.
  • start_date는 포함되지만, end_date는 포함되지 않습니다.

  • 실행 순서는 날짜/시간 순이 아닌 랜덤으로 실행

    • 날짜 순으로 실행하고 싶다면 default_args를 수정

      default_args = {
      	...
          'depends_on_past': True,
      
      }

Backfill를 쉽게하기 위한 준비

  • 모든 DAG가 backfill을 필요로 하지는 않음

    • Full Refresh는 backfill이 의미가 없음
  • Backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미

  • Backfill 조건

    • 가장 중요한 것은 데이터 소스가 Backfill 방식을 지원해야함

    • "execution_date"를 사용해서 업데이트할 데이터 결정

    • "catchup" 필드를 True로 설정

    • start_date/end_date를 backfill하려는 날짜로 설정

    • DAG 구현이 execution_date를 고려해야하며 indempotent(멱등성)을 보장해야함!!
      ( UPSERT -> 중복제거 )
      -> 두번째로 중요


🔎 Airflow와 데이터 파이프라인 정리

Airflow란 무엇인가?

  • Airflow는 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크

    • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍

    • Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름

  • Airflow의 장점

    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 데이터 웨어하우스를 지원
    • 백필(Backfill)이 쉬움
  • 스케일링 방식

    • Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용

데이터 파이프라인 작성시 주의할 점

  • 데이터 파이프라인에 관한 정보를 수집하는 것이 중요

  • 비지니스 오너와 데이터 리니지에 주의할 것

    • 비지니스 오너 : 해당 데이터 파이프라인의 데이터를 요청한 사람이 누구인지?

    • 데이터 리니지 : 한 데이터가 뒷 단에서 어떻게 쓰이는지 흐름이 모두 파악이 되어야합니다. 작은 수정이 큰 영향을 끼칠 수 있기에..

    -> 데이터 카탈로그가 필요하게 됨

  • 데이터 품질 체크가 필요!

    • 입력 데이터와 출력 데이터
  • 가능하면 Full Refresh

    • Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한 이유
  • 주기적인 청소 (데이터, 테이블, Dag)

profile
데이터 엔지니어를 꿈꾸는 거북이, 한걸음 한걸음

0개의 댓글