Scheduler에 관리자로 접속 후 아래의 코드를 입력하자
$ sudo apt-get update
$ sudo apt-get install -y default-libmysqlclient-$ dev
$ sudo apt-get install -y gcc
$ sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
SqlToS3Operator
: Production DB의 데이터를 S3로 적재할 때 사용S3ToRedshiftOperator
: S3 데이터를 Redshift로 적재할 때 사용# SqlToS3Operator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
SqlToS3Operator(
task_id = "task_id",
query = "select * from table", # Reshift에 적재할 MySQL 데이터
s3_bucket = "s3_bucket", # 적재할 S3 Bucket이름
s3_key = "s3_key", # 적재할 S3 Bucket의 위치
sql_conn_id = "mysql_conn_id", # Connections에서 저장한 MySQL Conn id
aws_conn_id = "aws_conn_id", # Connections에서 저장한 S3 Conn id
verify = False,
replace = True, # 이미 데이터가 있다면 True이면 덮어씌우는 것, False이면 에러를 표출
pd_kwargs={"index": False, "header": False}, # header와 index를 copy해오지 않을 것! 내부적으로 Pandas Dataframe을 사용하기 때문에 DF에 설정하는 값
dag = dag
)
# S3ToRedshiftOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = "task_id",
s3_bucket = "s3_bucket", # 데이터를 가져오는 S3 bucket 이름
s3_key = "s3_key", # 데이터를 가져오는 위치
schema = "schema", # 데이터를 적재할 schema
table = "table", # 데이터를 적재할 table
copy_options=['csv'], # S3에서 가져올 file 확장자
redshift_conn_id = "redshift_dev_db", # Connections에서 저장한 redshift Conn id
aws_conn_id = "aws_conn_id", # Connections에서 저장한 S3 Conn id
method = "UPSERT", # "UPSERT" : 기존에 데이터가 있다면 바꿔치기, 없다면 새롭게 적재
# "REPLACE" : 모든 데이터 바꿔치기
# "APPEND" : 기존의 데이터에 새롭게 적재
upsert_keys = ["id"], # "UPSERT"일 경우, PK를 보장하는 field list
dag = dag
)
created
(timestamp) : Optionalmodified
(timestamp) : 생성 및 수정/삭제됐을 때 모두 시간이 업데이트 된다.deleted
(boolean) : 레코드가 삭제되는 테이블의 경우 필수! 레코드를 삭제하지 않고 True로 변경한다.$ airflow dags backfill dag_id -s 시작날짜 -e 종료날짜
=> catchup = True
여야 한다.
=> execution_date를 사용하여 Incremental Update가 구현되어 있어야 한다.
=> 실행순서는 날짜/시간순이 아니라 랜덤이다. 날짜순을 원한다면 default_args={'depends_on_past':True}
를 설정하자.
=> 시작 날짜는 포함하지만, 종료 날짜는 포함하지 않는다.
이론을 공부하고 예시를 보는건 얼마 걸리지 않는데 실습에 시간을 많이 잡아먹는다. 조금더 빠르게 수업에 참여해야할 것 같다. 벌써 절반이나 왔다! 화이팅화이팅!