[TIL] 데이터 파이프라인, Airflow (5)

이원진·2023년 6월 9일
0

데브코스

목록 보기
45/54
post-thumbnail

학습내용


  1. MySQL 테이블 복사하기

  2. Backfill 실행하기

1. MySQL 테이블 복사하기


  • 구현하려는 ETL

    • 가상의 프로덕션 MySQL(OLTP)의 레코드를 Redshift(OLAP)로 복사

      • 레코드 수가 많을 경우 COPY, 별로 없을 경우 INSERT

  • AWS 관련 권한 설정

    • Airflow DAG에서의 S3 접근 권한(쓰기)

      • IAM User를 생성해 S3에 대한 읽기 / 쓰기 권한을 설정하고, access key와 secret key 사용

    • Redshift의 S3 접근 권한(읽기)

      • Redshift에 S3를 접근할 수 있는 Role을 만들고 이를 Redshift에 지정

  • 복사할 MySQL 테이블

    CREATE TABLE prod.nps (
            id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
            created_at TIMESTAMP,
            score SMALLINT
    );

  • Redshift 테이블

    CREATE TABLE schema_name.nps (
            id INT NOT NULL PRIMARY KEY,
            created_at TIMESTAMP,
            score SMALLINT
    );

  • MySQL_to_Redshift DAG의 태스크 구성

    • SqlToS3Operator

      • MySQL 결과를 S3에 적재

    from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
    
    mysql_to_s3_nps = SqlToS3Operator(
            task_id = "mysql_to_s3_nps",
            query = "SELECT * FROM prod.nps",
            s3_bucket = "s3_bucket_name",
            s3_key = "s3_key_name",
            sql_conn_id = "mysql_conn_id",
            aws_conn_id = "aws_conn_id",
            verify = False,
            replace = True,
            pd_kwargs = {"index": False, "header": False},
            dag = dag
    )

  • S3ToRedshiftOperator

    • S3 데이터를 Redshift 테이블로 COPY

    from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
    
    s3_to_redshift_nps = S3ToRedshiftOperator(
            task_id = "s3_to_redshift_nps",
            s3_bucket = "s3_bucket_name",
            s3_key = "s3_key_name",
            schema = "schema_name",
            table = "table_name",
            copy_options ["csv"],
            method = "REPLACE",
            sql_conn_id = "redshift_dev_id",
            aws_conn_id = "aws_conn_id",
            dag = dag
    )

  • Incremental Update Version

    • MySQL / PostgreSQL 테이블이라면 다음을 만족해야 함

      • created(TIMESTAMP): 생성시각, Optional

      • modified(TIMSTAMP): 변경시각

      • deleted(BOOLEAN): 레코드를 삭제하는 것이 아닌, deleted 파라미터를 True로 설정

    • ROW_NUMBER로 직접 구현

      1. Redshift의 A 테이블 내용을 temp_A에 복사

      2. MySQL의 A테이블 레코드 중 modified의 날짜가 이미 지난(execution_date) 모든 레코드를 temp_A에 복사

        • SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)

          • 해당 쿼리문의 결과를 S3에 업로드하고 COPY

      3. temp_A의 레코드들을 PK 기준으로 파티셔닝한 다음, modified 기준으로 역순 정렬해, 일련번호가 1인 것들만 원본 A테이블로 복사

    • S3TORedshiftOperator로 구현

      • query = “SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)”

      • method = “UPSERT”

      • upsert_keys 파라미터로 Primary Key 지정

        • upsert_keys = [”id”]

2. Backfill 실행하기


  • CLI에서 Backfill 실행

    • airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

    • start_date는 포함, end_date는 미포함

    • 기본적으로 시간 순이 아닌 랜덤이며, depends_on_past = True로 설정해 시간 순으로 실행 가능

  • DAG의 Backfill

    • 모든 DAG가 backfill을 필요로 하지는 않음

      • Full Refresh로 구현했다면 backfill이 필요 X

      • 마지막 업데이트 시간을 기준으로 backfill을 한다면, execution_date을 사용한 backfill이 필요 X

    • 데이터의 크기가 매우 커질 경우, backfill 기능 구현이 필수적

      • airflow가 큰 도움이 됨

      • 데이터 소스의 도움 없이는 불가능

    • backfill 구현 조건

      • 가장 중요한 것은 데이터 소스가 backfill 방식을 지원해야 함

      • execution_date를 사용해 업데이트할 데이터 결정

      • start_date / end_date로 backfill하려는 날짜 설정

      • catchup = True로 설정

      • DAG 구현 시 execution_date를 고려해야 하며, idempotent해야 함


메모



0개의 댓글