[데이터 엔지니어링 데브코스 2기] TIL-10주차-파트06 데이터 파이프라인, Airflow(5)

이재호·2023년 12월 15일
0

1. 프로덕션 DB(MySQL) -> 데이터 웨어하우스(Redshift)


OLTP(MySQL) -> S3 -> OLAP(Redshift)를 Full Refresh 버전과 Incremental Update 버전으로 나눠서 진행.

AWS 관련 권한 설정

MySQL Connection 설정

  • Airflow Admin 메뉴에서 Connections 선택 후, MySQL 설정.
  • Connection Id : mysql_conn_id
  • Connection Type : MySQL
  • host, schema, login, pw, port 입력

S3 Connection 설정

  • Airflow Admin 메뉴에서 Connections 선택 후, S3 설정.
  • Connection Id : aws_conn_id
  • Connection Type : Amazon Web Services
  • Access Key ID, Secret Access Key, Extra에 {"region_name": "ap-northeast-2"} 입력.

2. DAG 생성


  • SqlToS3Operator (airflow에서 제공하는 기본 오퍼레이터)

    • MySQL -> S3.
  • S3ToRedshift (airflow에서 제공하는 기본 오퍼레이터)

    • S3 -> Redshfit.
    • COPY 커맨드 사용 예정.
  • 코드 MySQL_to_Redshift.py

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
    from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
    from airflow.models import Variable
    
    from datetime import datetime
    from datetime import timedelta
    
    import requests
    import logging
    import psycopg2
    import json
    
    dag = DAG(
        dag_id = 'MySQL_to_Redshift',
        start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
        schedule = '0 9 * * *',  # 적당히 조절
        max_active_runs = 1,
        catchup = False,
        default_args = {
            'retries': 1,
            'retry_delay': timedelta(minutes=3),
        }
    )
    
    schema = "jaeho"
    table = "nps"
    s3_bucket = "grepp-data-engineering"
    s3_key = schema + "-" + table
    
    mysql_to_s3_nps = SqlToS3Operator(
        task_id = 'mysql_to_s3_nps',
        query = "SELECT * FROM prod.nps",
        s3_bucket = s3_bucket,
        s3_key = s3_key,
        sql_conn_id = "mysql_conn_id",
        aws_conn_id = "aws_conn_id",
        verify = False,
        replace = True,
        pd_kwargs={"index": False, "header": False},    
        dag = dag
    )
    
    s3_to_redshift_nps = S3ToRedshiftOperator(
        task_id = 's3_to_redshift_nps',
        s3_bucket = s3_bucket,
        s3_key = s3_key,
        schema = schema,
        table = table,
        copy_options=['csv'],
        method = 'REPLACE', # Replace로 적용함으로써 Full Refresh 방식으로 적용.
        redshift_conn_id = "redshift_dev_db",
        aws_conn_id = "aws_conn_id",
        dag = dag
    )
    
    mysql_to_s3_nps >> s3_to_redshift_nps
  • Incremental Update 방식

    • 필요한 필드 리스트 : created, modified, deleted

    • ROW_NUMBER 방식과 Redshift에서 제공하는 UPSERT 방식이 있음. (후자 선택.)

    • MySQL_to_Redshift_v2.py

      from airflow import DAG
      from airflow.operators.python import PythonOperator
      from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
      from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
      from airflow.models import Variable
      
      from datetime import datetime
      from datetime import timedelta
      
      import requests
      import logging
      import psycopg2
      import json
      
      dag = DAG(
          dag_id = 'MySQL_to_Redshift_v2',
          start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
          schedule = '0 9 * * *',  # 적당히 조절
          max_active_runs = 1,
          catchup = False,
          default_args = {
              'retries': 1,
              'retry_delay': timedelta(minutes=3),
          }
      )
      
      schema = "jaeho"
      table = "nps"
      s3_bucket = "grepp-data-engineering"
      s3_key = schema + "-" + table       # s3_key = schema + "/" + table
      
      			# 동일한 created_at과 execution_date가 동일한 것만 SELECT. {{,}}는 airflow 문법.
      sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
      print(sql)
      mysql_to_s3_nps = SqlToS3Operator(
          task_id = 'mysql_to_s3_nps',
          query = sql,
          s3_bucket = s3_bucket,
          s3_key = s3_key,
          sql_conn_id = "mysql_conn_id",
          aws_conn_id = "aws_conn_id",
          verify = False,
          replace = True,
          pd_kwargs={"index": False, "header": False},    
          dag = dag
      )
      
      s3_to_redshift_nps = S3ToRedshiftOperator(
          task_id = 's3_to_redshift_nps',
          s3_bucket = s3_bucket,
          s3_key = s3_key,
          schema = schema,
          table = table,
          copy_options=['csv'],
          redshift_conn_id = "redshift_dev_db",
          aws_conn_id = "aws_conn_id",    
          method = "UPSERT", # method 파라미터를 UPSERT로 지정함으로써, Incremental 업데이트를 적용함.
          upsert_keys = ["id"], # upsert_key를 id(primary key)로 지정. 만약 겹치는 id가 있다면 해당 레코드를 갱신, 없으면 추가.
          dag = dag
      )
      
      mysql_to_s3_nps >> s3_to_redshift_nps

3. 실습


  • AWS S3 Connections 설정
  • Redshift S3 Connection 설정
  • MySQL 관련 모듈 설치 (Docker)
  • MySQL_to_Redshift DAG 실행
  • MySQL_to_Redshift_v2 DAG 실행

만약 2023-01-01 부터 2023-01-31 까지의 데이터만 필요하다면(backfill),
airflow dags backfill dag_id -s 2023-01-01 -e 2023-02-01로 커맨드 입력.

profile
천천히, 그리고 꾸준히.

0개의 댓글