Python Operator 를 사용한 Airflow DAG 생성
Python Operator
from airflow.operators.python import PythonOperator
load_nps = PythonOperator(
dag = dag,
task_id = 'task_id',
python_callable = python_func,
params = {
'table':'delighted_nps',
'schema':'raw_data'
},
)
- params : 함수(python_func)에 넘기고 싶은 인자 딕셔너리 정의
def python_func(**cst):
table =cxt["params"]["table"]
schema = cst["params"]["schema"]
ex_date = cst["execution_date"]
# 아래로 동작시키고 싶은 Python 코드 작성
## 소스 코드
- 2개의 태스크 구성 데이터 파이프라인(DAG)
- print_hello >> print_goodbye
- 두 개의 태스크가 순차적으로 실행
```py
dag = DAG(
dag_id = "helloWorld"
, start_date = datetime(2023,12,13)
, catchup = False
, tags = ['example']
, shcedule = '0 2 * * *' # 하루에 한 번 2시 0분에 실행
, default_args = default_args
)
- shcedule = '0 2 * * *' # 하루에 한 번 2시 0분에 실행\
- default_args 는 딕셔너리 형태
Connection Id \
Connection Type\
Description\
Run Job Flow Configuration
Key\
Val\
Description
key : csv_url\
Val : https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv\
값 암호화하려면 *
E의 return이 T의 input으로, T의 output이 L의 input으로 들어가게끔
- Task(Operator) to Task of Data
- 한 operator의 Output = 다른 operator 의 Input 으로 전달하는 Airflow의 방식
- But, 중간 전달값들은 모두 Airflow메타데이터에 저장되기 때문에, 용량이 큰 것들에는 사용하지 않고, S3 같은 Storage에 저장시켜야 함
#pulls the xcom variable with key "identifier as string" that was pushed from within task-1\
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
🤔궁금증: 왜 xcom_pull만 있고, xcom_push는 없냐?
답: return 을 통해 자동으로 Push 되기 때문
Airflow Xcom 공식 Document
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
만약 아무런 키도 전달되지 않았다면, 아래처럼 task_id 만 전달해줘도 그 return 값을 사용하게 됨
text = context["task_instance"].xcom_pull(task_ids="extract")
🤔궁금증: Xcom으로 전달 가능한 데이터 사이즈는?
답: DB에 따라 달라짐StackOverflow링크
SQLite : 2GB\
Postgres: 1GB\
MySQL : 64kb
∴ 전송할 데이터 크기가 크다면, DB의 링크를 걸어서 push하고 pull 하는 게 맞음