DAG for OLTP(Online Transactional Processing) to OLAP(Online Analytical Processing)

yjbenkang·2024년 11월 18일

프로덕션 데이터베이스 (MySQL) : OLTP
가상의 프로덕션 데이터베이스 prod.nps
to
데이터 웨어하우스 (AWS Redshift) : OLAP
raw_data.nps

mySQL에서 airflow로 데이터 파일로 변환 후 S3 버킷에 저장한다음 S3에서 해당 파일 Redshift로 COPY

S3와 MySQL 정보

  • S3 Bucket Name
    a. 이 S3연결을 위해 별도 사용자를 만들고 그 사용자의 키들을 권한 인증을 위해 사용할 예정
  • MySQL 서버 연결 정보:
    a. Host
    b. Schema
    c. Login
    d. Password
    e: Port: 3306

AWS 관련 권한 설정

  • Airflow DAG에서 S3 접근 (쓰기 권한)
    a. IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key를 사용



    혹은 특정 s3 버킷에만 접근할 수 있는 권한을 부여하기 위해 정책 생성에서 JSON을 직접하는게 더 안전할 수 있다.

    사용자가 만들어지고 나면 해당 사용자 클릭 후 보안자격증명(Security Credentials)탭에서 액세스키 만들기(Create Access Key)


    생성하면 위와 같이 키값과 시크릿키 확인 가능
  • Redshift가 S3 접근 (읽기 권한)
    a. Redshift에 S3를 접근할 수 있는 역할(Role)을 만들고 이를 Redshift에 지정



    특정 버킷 혹은 특정 기능에만 주고 싶은 경우 정책 생성해서 할 수 있다.

    위와 같이 연결하고자 하는 Redshift 클러스터에 가서 IAM role을 연결해주면 해당 Redshift 클러스터에서 S3 버킷에 접근할 수 있게 된다.

Airflow에서 MySQL Connection 설정 (Prod DB)

MySQL Connections 설정시 유의사항

  • 아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행
docker exec —user root -it 0017662673c3 sh
sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip install "apache-airflow-providers-mysql"

  • 이 부분은 실행하는 시점에 따라 다른 문제들이 존재할 수 있음. 문제가 생기면 질문채널을 통해 꼭 질문 !
    • 위의 명령들은 "ModuleNotFoundError: No module named "mySQLdb"에러를 해결하기 위함임

AWS S3 접근은 ? (Connections)

  • Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜
    a. 루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 -> 여러번 사고가 남
  • 우리가 사용해볼 Best Practice는:
    a. IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
    b. 그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
    c. 그 사용자의 Access Key ID와 Secret Access Key를 사용
    • 이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화

MySQL의 테이블 리뷰 (OLTP, Production Database)

CREATE TABLE prod.nps (
  id INT NOT NULL AUTO_INCREMENT primary key,
  created_at timestamp,
  score smallint
);

이미 테이블은 MySQL쪽에 만들어져 있고 레코드들이 존재하며 이를 Redshift로 복사하는 것이 목적

Redshift(OLAP, Data Warehouse)에 해당 테이블 생성

CREATE TABLE (본인의 스키마).nps (
  id INT NOT NULL primary key,
  created_at timestamp,
  score smallint
);

이 테이블들은 Redshift쪽에 본인 스키마 밑에 별도로 만들고 뒤에서 실습할 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사할 것

MySQL_to_Redshift DAG의 Task 구성

  • SqlToS3Operator

    • MySQL SQL 결과 -> S3
    • (s3://grepp-data-engineering/{본인ID}-nps)
    • s3://s3_bucket/s3_key
  • S3ToRedshiftOperator

    • S3 -> Redshift 테이블
    • (s3://grepp-data-engineering/{본인ID}-nps)-> Redshift (본인스키마.nps)
    • COPY command is used

    Bulk Update Sequence - COPY SQL

    • 2개의 Operator를 사용해서 구현
      • SqlTOS3Operator
      • S3ToRedshiftOperator
    • MySQL 있는 테이블 nps를 Redshift내의 스키마 밑의 nps 테이블로 복사
      • S3를 경유해서 COPY 명령으로 복사

Full-refresh DAG 코드 리뷰

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "kyongjin1234"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True, #이미 s3에 존재할 경우 overwrite
    pd_kwargs={"index": False, "header": False}, #mySql->s3시 헤더는 copy no, 일련번호 copy no    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE', #APPEND, UPSERT도 있는데 REPLACE시 Full refresh, 다 날리고 새로 테이블 생성, upsert는 Primary key 기준으로 존재할 경우 수정, 존재하지 않을 경우 추가
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

태스크를 만드는 두 오퍼레이터 모두 airflow에서 제공해주는 native operator

MySQL 테이블의 Incremental Update 방식

  • MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
    • created (timestamp): Optional
    • modified (timestamp)
    • deleted (boolean) : 레코드를 삭제하지 않고 deleted를 True로 설정
  • Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
  • ROW_NUMBER로 직접 구현하는 경우
    • 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
    • MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
      • 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
        • SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사

Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면

  • S3ToRedshiftOperator로 구현하는 경우
    • query 파라미터로 아래를 지정
      • SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • method 파라미터로 "UPSERT"를 지정
    • upsert_keys 파라미터로 Primary key를 지정
      • 앞서 nps 테이블이라면 "id" 필드를 사용

Incremental Update version 코드 리뷰

  • MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
  • 이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함
  • 권한의 생성은 Redshift에게 위 S3 버킷에 대한 액세스 권한 지정
  • 2개의 Operator를 사용해서 구현
    • SqlToS3Operator:execution_date에 해당하는 레코드만 읽어오게 바뀜
    • S3ToRedshiftOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json

dag = DAG(
    dag_id = 'MySQL_to_Redshift_v2',
    start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "kyongjin1234"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table       # s3_key = schema + "/" + table

sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')" # 매일 한번씩 실행될 경우 전날날짜로 바꿔치기 될거임.
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

Daily Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 한다면

  • Airflow에서 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
  • 하지만 이를 어떻게 실행하나?
    • 하루씩 31번 실행?
      • airflow dags test MySQL_to_Redshift_v2 2023-07-01
      • ...
      • airflow dags test MySQL_to_Redshift_v2 2023-07-31
    • 한번에 여러 날짜를 동시에 실행 가능한가?
      • 구현방법에 따라 한번에 하나씩 실행하는 것이 안전할 수 있음
      • 이를 제어해주는 DAG 파라미터가 max_active_runs

Backfill을 커맨드라인에서 실행하는 방법

airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

  • 다음이 준비되어있어야함 :
    • catchUp이 True로 설정되어 있음
    • execution_date를 사용해서 Incremental Update가 구현이 되어있음
  • start_date부터 시작하지만 end_date는 포함하지 않음
  • 실행순서는 날짜/시간순이 아니고 랜덤. 만일 날짜순으로 하고 싶다면
    • DAG default_args의 depends_on_past를 True로 설정
      default_args = {
      'depends_on_past': True,
      ...
      }

How to Make your DAG Backfill ready

  • 먼저 모든 DAG가 backfill을 필요로 하지는 않음

    • Full Refresh를 한다면 backfill은 의미가 없음
  • 여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함

    • 마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
  • 데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수

    • airflow가 큰 도움이 됨
    • 하지만 데이터 소스의 도움 없이는 불가능
  • 어떻게 backfill로 구현할 것인가

    • 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
    • "execution_date"을 사용해서 업데이트할 데이터 결정
    • "catchup" 필드를 True로 설정
    • start_date/end_date을 backfill하려는 날짜로 설정
    • 다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함

최종정리

airflow란 무엇인가?

  • Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임워크
    • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
    • Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
    • ELT든 ETL이든 에어플로우 내에서 작성 가능
  • Airflow의 장점
    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 데이터 웨어하우스를 지원
    • 백필(Backfill)이 쉬움
    • 다양한 native operator
  • Airflow 관련 중요 용어/개념
    • start_date, execution_date, catchup
    • start_date는 incremental update dag일 경우 받아오는 데이터의 날짜
    • execution_date로 어느날짜 읽어와야하는지 코딩하면 에어플로우가 주는 시스템 정보로 사용됨
  • 스케일링 방식
    • Scale Up vs Scale Out vs 클라우드 버젼 vs k8s 사용

데이터 파이프라인 작성시 기억할 점

  • 데이터 파이프라인에 관한 정보를 수집하는 것이 중요
    • 비즈니스 오너와 데이터 리니지에 주의할 것
    • 결국 데이터 카탈로그가 필요
  • 데이터 품질 체크
    • 입력 데이터와 출력 데이터
  • 코드 실패를 어설프게 복구하려는 것보다 깔끔하게 실패하는 것이 좋음
  • 가능하면 Full Refresh
    • Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한 이유
  • 주기적인 청소 (데이터, 테이블, DAG)
profile
keep growing

0개의 댓글