[데이터 엔지니어링 데브코스] TIL 41일차 - 데이터 파이프라인과 Airflow(5)

박단이·2023년 12월 18일
0

데브코스 TIL

목록 보기
41/56

오늘 배운 것🤓

Production DB의 내용을 Data Warehouse에 적재하기

  • Production DB가 MySQL이고 Data Warehouse는 Redshift라는 전제 하에 정리

적재 방법

  • MySQL의 데이터를 바로 Redshift로 Insert하기
    • 이 방법은 데이터 수가 적을 때 사용한다.
  • MySQL의 데이터를 file형태로 S3에 적재한 후, S3에서 벌크로 Redshift에 복제하기
    • Airflow DAG에서 S3에 접근(쓰기)할 수 있는 권한이 필요
    • Redshift가 S3에 접근할 수 있는 권한이 필요

MySQL Connections 설정시 해야 하는 설치

Scheduler에 관리자로 접속 후 아래의 코드를 입력하자

$ sudo apt-get update
$ sudo apt-get install -y default-libmysqlclient-$ dev
$ sudo apt-get install -y gcc
$ sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

Operator

  • SqlToS3Operator : Production DB의 데이터를 S3로 적재할 때 사용
  • S3ToRedshiftOperator : S3 데이터를 Redshift로 적재할 때 사용
# SqlToS3Operator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

SqlToS3Operator(
    task_id = "task_id",
    query = "select * from table",		# Reshift에 적재할 MySQL 데이터
    s3_bucket = "s3_bucket",	# 적재할 S3 Bucket이름
    s3_key = "s3_key", 			# 적재할 S3 Bucket의 위치
    sql_conn_id = "mysql_conn_id",	# Connections에서 저장한 MySQL Conn id
    aws_conn_id = "aws_conn_id",	# Connections에서 저장한 S3 Conn id
    verify = False,
    replace = True,		# 이미 데이터가 있다면 True이면 덮어씌우는 것, False이면 에러를 표출
    pd_kwargs={"index": False, "header": False},	# header와 index를 copy해오지 않을 것! 내부적으로 Pandas Dataframe을 사용하기 때문에 DF에 설정하는 값
    dag = dag
)
# S3ToRedshiftOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = "task_id",
    s3_bucket = "s3_bucket",	# 데이터를 가져오는 S3 bucket 이름
    s3_key = "s3_key",			# 데이터를 가져오는 위치
    schema = "schema",		# 데이터를 적재할 schema
    table = "table",		# 데이터를 적재할 table
    copy_options=['csv'],	# S3에서 가져올 file 확장자
    redshift_conn_id = "redshift_dev_db",	# Connections에서 저장한 redshift Conn id
    aws_conn_id = "aws_conn_id",    	# Connections에서 저장한 S3 Conn id
    method = "UPSERT",	# "UPSERT" : 기존에 데이터가 있다면 바꿔치기, 없다면 새롭게 적재
    					# "REPLACE" : 모든 데이터 바꿔치기
                        # "APPEND" : 기존의 데이터에 새롭게 적재
    upsert_keys = ["id"],	# "UPSERT"일 경우, PK를 보장하는 field list
    dag = dag
)

Incremental Update일 경우

  • 소스 테이블이 꼭 가지고 있어야 하는 field
    • created (timestamp) : Optional
    • modified (timestamp) : 생성 및 수정/삭제됐을 때 모두 시간이 업데이트 된다.
    • deleted (boolean) : 레코드가 삭제되는 테이블의 경우 필수! 레코드를 삭제하지 않고 True로 변경한다.

Backfill 실행하기

  • 과거의 많은 데이터를 다시 backfill 할 때
    • 하루에 1개씩 매일 실행하는 방법
    • 한번에 여러 날짜를 동시에 실행하는 방법
      • 구현 방법에 따라 한번에 하나씩 실행하는 것이 더 안전할 수도 있다.
      • 너무 많은 데이터를 load하면 부하가 갈 수도 있다.
      • DAG를 생성할 때 max_active_runs 를 통해 개수를 조절할 수 있다.
$ airflow dags backfill dag_id -s 시작날짜 -e 종료날짜

=> catchup = True 여야 한다.
=> execution_date를 사용하여 Incremental Update가 구현되어 있어야 한다.
=> 실행순서는 날짜/시간순이 아니라 랜덤이다. 날짜순을 원한다면 default_args={'depends_on_past':True}를 설정하자.
=> 시작 날짜는 포함하지만, 종료 날짜는 포함하지 않는다.

Backfill 조건

  1. 모든 DAG가 Backfill을 필요로 하지 않는다.
    • 일별 혹은 시간별로 업데이트할 때, 필요하다.
    • execution_date를 활용해야한다.
  2. 데이터 크기가 굉.장.히 커지면 Backfill 구현은 필수!
    단, 데이터 소스의 도움 없이는 불가능하다.
    backfill 방식을 지원하는 소스를 작성해야한다.
  3. execution_date를 고려해야 하고, idempotent(멱등성)을 만족해야 한다.


느낀 점😊

이론을 공부하고 예시를 보는건 얼마 걸리지 않는데 실습에 시간을 많이 잡아먹는다. 조금더 빠르게 수업에 참여해야할 것 같다. 벌써 절반이나 왔다! 화이팅화이팅!

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글