데브코스 45일차 - MySQL복사하기, Backfill

Pori·2023년 12월 17일
0

데엔

목록 보기
38/47

Mysql 복사하기

소개

: Production MySQL Table(OLTP)의 prod.nps 를 AWS Redshift의 raw_data.nps에 전달한다.(OLAP)

AWS 권한 설정

  • Airflow DAG에서 S3 접근 권한 (쓰기권한)
    • IAM User를 생성, acceess key와 secret key를 사용
  • Redshift가 S3 접근 (읽기 권한)
    • S3접근 Role을 만들고 Redshift에 지정해야한다.

MySQL Connection 설정 시 유의 사항

  • MySQLdb에러를 피하기 위해서 다음과 같은 명령어를 적용해야한다.
    • Airflow Scheduler에 다음 명령어들을 통해 설치 진행
      sudo apt-get install -y default-libmysqlclient-dev
      sudo apt-get install -y gcc
      sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

MySQL & Redshfit 테이블 리뷰

-- Mysql
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);

-- Redshift
CREATE TABLE (본인의스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);

MySQL_to_Redshift DAG의 Task 구성

  • SqlToS3Operator
    • mysql의 결과를 s3에 넣어준다.
  • S3ToRedshiftOperator
    • S3 → Redshfit, COPY 명령어 사용

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

Incremental Update 방식

  • MySQL, PostgreSQL 테이블이면 created→modified→deleted로 수행된다.
    • deleted 시에는 레코드를 삭제하지 않고, deleted를 True로 설정한다.
  • ROW_NUMBER로 구현하기도함.
    • A테이블의 내용을 temp_A로 복사
    • A 테이블의 레코드 중 modified의 날짜가 지난 일에 해당하는 모든 레코드를 읽어다가 temp_A로 복사 (execution_date)
      SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • temp_A의 레코드들을 pk를 기준으로 파티션 후 modified 값을 기준으로 DESC정렬해서 일련번호가 1인것들만 복사
  • S3ToRedshiftOperator로 구현
    • query parameter : SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    • method : UPSERT → UPSERT KEYs로 지정이된 PK를 기준으로 값이 같은 레코드들은 복사되는 값으로 대체되고, 없는 값은 추가된다.
    • upsert_keys로 pk를 지정, 리스트 형태
# SqlToS3Operator
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)
# S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

Backfill 실행해보기

  • `airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
    • catchUp이 True여야함
    • execution_date를 사용해서 incremental update가 구현되어야한다.
    • 실행순서가 랜덤이기 때문에 DAG defaul_args 의 depends_on_past를 True로 설정해야한다.
    • start_date부터 시작하지만, end_date는 포함되지 않는다.

0개의 댓글