2. Airflow DAG

DongbinLim·2023년 12월 14일
0

PythonOperator

PythonOperator Document

1. Hello World 예제 프로그램

Python Operator 를 사용한 Airflow DAG 생성

Python Operator

from airflow.operators.python import PythonOperator

load_nps = PythonOperator(
dag = dag,
task_id = 'task_id',
python_callable = python_func,
params = {
'table':'delighted_nps',
'schema':'raw_data'
},
)

- params : 함수(python_func)에 넘기고 싶은 인자 딕셔너리 정의

def python_func(**cst):
table =cxt["params"]["table"]
schema = cst["params"]["schema"]

ex_date = cst["execution_date"]

# 아래로 동작시키고 싶은  Python 코드 작성
## 소스 코드
- 2개의 태스크 구성 데이터 파이프라인(DAG)
- print_hello >> print_goodbye
- 두 개의 태스크가 순차적으로 실행
```py
dag = DAG(
  dag_id = "helloWorld"
  , start_date  = datetime(2023,12,13)
  , catchup = False
  , tags = ['example']
  , shcedule = '0 2 * * *' # 하루에 한 번 2시 0분에 실행
  , default_args = default_args
)
  • shcedule = '0 2 * * *' # 하루에 한 번 2시 0분에 실행\
  • default_args 는 딕셔너리 형태

2. NameGenderCSVtoRedshift

1) DAG 파라미터 vs. Task 파라미터

  • NameGenderCSVtoRedshift_v2

2) Connections & Variables

Connections :

  • 호스트id 등의 보안 민감 정보 바깥으로 빼놓는 방법

    Connection Id \
    Connection Type\
    Description\
    Run Job Flow Configuration

Variables :

  • CSV 파일 위치 바뀌면 하드코딩 해놓은 것 변경해야 하는데, 따로 빼놓을 수 있는 방법
  • use Airflow like Key-Value
  • Airflow에 세팅해놓고, 그 안에서 변경 가능

    Key\
    Val\
    Description

3) NameGenderCSVtoRedshift_v3

CSV Parameter 넣기 w/ Variables

Xcom 사용해 (E/T/L) 3개의 태스크로 나누기

  • E의 return이 T의 input으로, T의 output이 L의 input으로 들어가게끔

    Xcom (Crosx - Communications)

    - Task(Operator) to Task of Data
    - 한 operator의 Output = 다른 operator 의 Input 으로 전달하는 Airflow의 방식
    - But, 중간 전달값들은 모두 Airflow메타데이터에 저장되기 때문에, 용량이 큰 것들에는 사용하지 않고, S3 같은 Storage에 저장시켜야 함

    #pulls the xcom variable with key "identifier as string" that was pushed from within task-1\
    task_instance.xcom_pull(key="identifier as string", task_ids="task-1")

    🤔궁금증: 왜 xcom_pull만 있고, xcom_push는 없냐?

    답: return 을 통해 자동으로 Push 되기 때문
    Airflow Xcom 공식 Document

text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")

만약 아무런 키도 전달되지 않았다면, 아래처럼 task_id 만 전달해줘도 그 return 값을 사용하게 됨

text = context["task_instance"].xcom_pull(task_ids="extract")

🤔궁금증: Xcom으로 전달 가능한 데이터 사이즈는?

답: DB에 따라 달라짐StackOverflow링크

SQLite : 2GB\
Postgres: 1GB\
MySQL : 64kb
∴ 전송할 데이터 크기가 크다면, DB의 링크를 걸어서 push하고 pull 하는 게 맞음

0개의 댓글