Xcom으로 task간 데이터 공유하기
SQLite : 2GB
Postgres : 1GB
MySQL : 64 KB
Operator가 작동하면 task 인스턴스가 생성됨
ti : task 인스턴스
ti 파라미터를 이용하는데 task 인스턴스 오브젝트 xcom 메서드로 접근하게해줌
def _t1(ti):
ti.xcom_push(key='my_key', value=1234)
def _t2(ti):
ti.xcom_pull(key='my_key', task_id='t1')
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1(ti):
ti.xcom_push(key='my_key', value=1234)
def _t2(ti):
print('value : ', ti.xcom_pull(key='my_key', task_id='t1'))
with DAG("xcom_dag", start_date=datetime(2022, 9, 1),
schedule_interval='@daily', catchup=False) as dag:
t1 = PythonOperator(
task_id='t1',
python_callable=_t1
)
t2 = PythonOperator(
task_id='t2',
python_callable=_t2
)
t1 >> t2
Dag를 실행시키고 상단 메뉴 Admin - XComs에가면 t1에 my_key : 1234를 확인할 수 있다.
t2의 로그를 보면
value가 출력되었다
value가 짝수이면 t2로, 아니면 t3이 실행되도록하였다.
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def _t1(ti):
ti.xcom_push(key='my_key', value=1234)
def _t2(ti):
ti.xcom_pull(key='my_key', task_ids='t1')
def _branch(ti):
value = ti.xcom_pull(key='my_key', task_ids='t1')
if (value % 2) == 0:
return 't2'
return 't3'
with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
schedule_interval='@daily', catchup=False) as dag:
t1 = PythonOperator(
task_id='t1',
python_callable=_t1
)
branch = BranchPythonOperator(
task_id='branch',
python_callable=_branch
)
t2 = PythonOperator(
task_id='t2',
python_callable=_t2
)
t3 = BashOperator(
task_id='t3',
bash_command="echo ''"
)
t4 = BashOperator(
task_id='t4',
bash_command="echo ''",
)
선택되지 않은 task t3는 skip 된다(분홍)
t4는 t2, t3이 선행되어야 실행되기때문에 t3이 skip되었으므로 t4도 skip되었다.
t4를 t2,t3상태에 따라 실행시킬 수 있는데
trigger_rule 옵션을 사용하면 된다.
trigger_rule 옵션갑
all_success: (default) 모든 부모 노드 성공
all_failed: 모든 부모 노드 failed 혹은 upstream_failed
all_done: 모든 부모 노드 실행 완료
all_skipped : 모든 부모 노드 skipped 상태
one_failed: 부모 최소 1개 failed. 모든 부모 노드 완료시까지 가디리지 않음
one_success: 부모 최소 1개 성공. 모든 부모 노드 완료시까지 가디리지 않음
none_failed: 모든 부모 노드 실패(failed 혹은 upstream_failed) 아닌 경우. == 모든 부모 노드 성공 혹은 skip
none_failed_min_one_success : 모든 upstream tasks
failed 혹은 upstream_failed 가 아니고, 그리고 최소 1 upstream task 성공
none_skipped: 모든 부모 노드 skip 아닌 경우 == 모든 부모 노드 성공 / failed / upstream_failed state
always : 의존성 x. 항상
t4에 트리거 룰을 설정하면 t3이 스킵되었는데도 실행되었다.
t4 = BashOperator(
task_id='t4',
bash_command="echo ''",
trigger_rule='none_failed_min_one_success'
)