https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
ti
(task_instance) 객체를 활용@task(task_id="task1")
def task1(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(key="key1", value="value1")
@task(task_id="task2")
def task2(**kwargs):
ti = kwargs["ti"]
value1 = ti.xcom_pull(key="key1")
print(value1)
task_ids
파라미터를 명시할 수 있음주의) Airflow 3.0 버전에서는
task_ids
가 반드시 명시되어야 함
@task(task_id="task2")
def task2(**kwargs):
ti = kwargs["ti"]
value1 = ti.xcom_pull(key="key1", task_ids="task1")
print(value1)
@task
데코레이터 사용 시 return 값은 자동으로 XCom에 return_value
키로 저장@task(task_id="task1")
def task1(**kwargs):
return "value1"
@task(task_id="task2")
def task2(**kwargs):
ti = kwargs["ti"]
value1 = ti.xcom_pull(key="return_value", task_ids="task1")
print(value1)
task1() >> task2()
@task(task_id="task1")
def task1(**kwargs):
return "value1"
@task(task_id="task2")
def task2(value1, **kwargs):
print(value1)
task2(task1())
xcom_push_task
에서 동일한 키값을 XCom에 push하고, xcom_pull_task
에서 Xcom으로부터 키값을 pull하여 출력# dags/python_xcom1.py
from airflow.sdk import DAG, task
from airflow.models.taskinstance import TaskInstance
import pendulum
with DAG(
dag_id="python_xcom1",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "xcom"],
) as dag:
@task(task_id="xcom_push_task1")
def xcom_push_task1(ti: TaskInstance, **kwargs):
ti.xcom_push(key="key1", value="value1")
ti.xcom_push(key="key2", value=[1,2,3])
@task(task_id="xcom_push_task2")
def xcom_push_task2(ti: TaskInstance, **kwargs):
ti.xcom_push(key="key1", value="value2")
ti.xcom_push(key="key2", value=[4,5,6])
@task(task_id="xcom_pull_task")
def xcom_pull_task(ti: TaskInstance, **kwargs):
value1 = ti.xcom_pull(key="key1")
value2 = ti.xcom_pull(key="key2", task_ids="xcom_push_task1")
print(value1)
print(value2)
xcom_push_task1() >> xcom_push_task2() >> xcom_pull_task()
xcom_push_task
실행 내역의 XCom 탭에서 key1
과 key2
에 대한 값이 지정됨을 확인xcom_pull_task
에서는 task_ids
를 지정하지 않았을 때 마지막으로 push된 "value2"가 출력될 것을 기대했지만, Airflow 3.0에서 발생한 업데이트로 인해 None 값이 출력# xcom_pull_task
[2025-06-03, 15:27:41] INFO - None: chan="stdout": source="task"
[2025-06-03, 15:27:41] INFO - [1, 2, 3]: chan="stdout": source="task"
task_ids
를 반드시 명시하도록 변경됨kwargs["ti"].xcom_pull(key="key")
와 같은 구문은 더 이상 작동하지 않음In Airflow 2, the
xcom_pull()
method allowed pulling XComs by key without specifying task_ids, ..., leading to unpredictable behavior.
Airflow 3 resolves this inconsistency by requiringtask_ids
when pulling by key.
xcom_return_task
에서 문자열 "Success"를 반환하고, 두 개의 xcom_pull_task
에서 서로 다른 방식으로 return 값을 받아 출력# dags/python_xcom2
from airflow.sdk import DAG, task
from airflow.models.taskinstance import TaskInstance
import pendulum
with DAG(
dag_id="python_xcom2",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "xcom"],
) as dag:
@task(task_id="xcom_return_task")
def xcom_return_task(**kwargs) -> str:
return "Success"
@task(task_id="xcom_pull_task1")
def xcom_pull_task1(ti: TaskInstance, **kwargs):
status = ti.xcom_pull(key="return_value", task_ids="xcom_return_task")
print(f"\"xcom_return_task\" 함수의 리턴 값: {status}")
@task(task_id="xcom_pull_task2")
def xcom_pull_task2(status: str, **kwargs):
print(f"\"xcom_return_task\" 함수로부터 전달받은 값: {status}")
return_value = xcom_return_task()
return_value >> xcom_pull_task1()
xcom_pull_task2(return_value)
xcom_pull_task
에서 모두 정상적으로 return 값을 받아서 동일한 결과가 출력됨을 확인# xcom_pull_task1
[2025-06-03, 15:36:10] INFO - "xcom_return_task" 함수의 리턴 값: "Success": chan="stdout": source="task"
# xcom_pull_task2
[2025-06-03, 15:36:10] INFO - "xcom_return_task" 함수로부터 전달받은 값: "Success": chan="stdout": source="task"
ti.xcom_push
또는 ti.xcom_pull
사용이 가능return_value
로 전달# dags/bash_xcom.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_xcom",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "xcom"],
) as dag:
bash_push_task = BashOperator(
task_id="bash_push_task",
bash_command="echo START && echo XCOM PUSHED {{ ti.xcom_push(key='bash_pushed', value='bash_message') }} && echo COMPLETE",
)
bash_pull_task = BashOperator(
task_id="bash_pull_task",
env={
"PUSHED_VALUE": "{{ ti.xcom_pull(key='bash_pushed', task_ids='bash_push_task') }}",
"RETURN_VALUE": "{{ ti.xcom_pull(key='return_value', task_ids='bash_push_task') }}"
},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
)
bash_push_task >> bash_pull_task
bash_push_task
의 실행 내역에서 직접 push한 bash_pushed
가 XCom에 들어있고, 마지막 출력문도 return_value
로 저장되어 있음을 조회bash_pull_task
에서 첫 번째로는 XCom에서 bash_pushed
키를 가지고 꺼낸 PUSHED_VALUE 값을 출력하여, 실행 로그에 "bash_message" 가 출력됨을 확인return_value
키를 가지고 꺼낸 RETURN_VALUE 값을 출력하여, 실행 로그에 bash_push_task
의 마지막 출력문 "COMPLETE" 가 출력됨을 확인# bash_pull_task
[2025-06-03, 16:05:09] INFO - bash_message: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 16:05:09] INFO - COMPLETE: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
@task(task_id="python_push")
def python_push_xcom():
return {"status":"Success", "data":[1,2,3]}
bash_pull = BashOperator(
task_id="bash_pull",
env={
"STATUS": "{{ ti.xcom_pull(key=\"return_value\", task_ids=\"python_push\")[\"status\"] }}",
"DATA": "{{ ti.xcom_pull(key=\"return_value\", task_ids=\"python_push\")[\"data\"] }}"
},
bash_command="echo $STATUS && echo $DATA"
)
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html
from airflow.models import Variable
var = Variable.get("key")
# 방법 2
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id="bash_task",
bash_command=f"echo {{var.value.key}}"
)
# dags/bash_variable.py
from airflow.sdk import DAG, Variable
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_variable",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "xcom"],
) as dag:
var = Variable.get("sample_key")
bash_var_task1 = BashOperator(
task_id="bash_var_task1",
bash_command=f"echo variable: \"{var}\"",
)
bash_var_task2 = BashOperator(
task_id="bash_var_task2",
bash_command="echo variable: \"{{ var.value.sample_key }}\"",
)
airflow.cfg
설정에서 sensitive_var_conn_names
항목을 확인해보고, Web UI 컨테이너에 들어가서 airflow variables get
명령어로 전역 변수가 마스킹된 채로 저장되어 있는지도 확인해보고, BashOperator 안에서 비교 연산자로 설정한 것과 동일한 값이 가져와지는지도 출력해서 확인해봤는데, 모두 정상적이고 실행 로그에서 전역 변수를 직접 출력할 때만 마스킹 처리됨# bash_var_task1
[2025-06-03, 16:52:07] INFO - variable: ***: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/mask-sensitive-values.html