- python Operator에서 return값은 자동으로 Xcom에 저장됨
- Bash Operator에서 ti.xcom_pull/push는
env
와 bash_command
에서만 가능함
- Bash Operator에서는 마지막에 출력된 문장이 return 값으로 간주되어 Xcom에 저장됨
- ti객체를 활용해서 key와 task_ids 파라미터로 값을 가져와서 value를 출력하는 구조
from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_python_with_xcom",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status': 'Good', 'data': [1, 2, 3], 'options_cnt': 100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS': "{{ ti.xcom_pull(task_ids='python_push')['status']}}",
'DATA': "{{ti.xcom_pull(task_ids='python_push')['data']}}",
'OPTIONS_CNT': "{{ ti.xcom_pull(task_ids='python_push')['options_cnt']}}"
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
bash_push = BashOperator(
task_id = "bash_push",
bash_command='echo PUSH_START '
'{{ ti.xcom_push(key="bash_pushed", value=200) }} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print("status_value: " + str(status_value))
print("return_value: " + return_value)
bash_pull >> python_pull_xcom()


[2023-08-14, 01:15:47 KST] {subprocess.py:86} INFO - Output:
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - Good
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - [1, 2, 3]
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - 100
[2023-08-14, 01:15:48 KST] {logging_mixin.py:150} INFO - status_value: 200
[2023-08-14, 01:15:48 KST] {logging_mixin.py:150} INFO - return_value: PUSH_COMPLETE
