[Airflow] XCom & Variable

minyeamer·2025년 6월 3일
0

Apache Airflow 배우기

목록 보기
6/13
post-thumbnail

XCom

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html

  • Cross Communication이란 의미로, Airflow DAG 내 Task 간 데이터 공유를 위해 사용되는 기술
  • 주로 작은 규모의 데이터 공유를 위해 사용 (XCom 내용은 메타 DB의 xcom 테이블에 값이 저장)
    • 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요

XCom 사용법

  • keyword arguments로 전달되는 ti (task_instance) 객체를 활용
@task(task_id="task1")
def task1(**kwargs):
	ti = kwargs["ti"]
	ti.xcom_push(key="key1", value="value1")

@task(task_id="task2")
def task2(**kwargs):
	ti = kwargs["ti"]
	value1 = ti.xcom_pull(key="key1")
    print(value1)
  • 만약 서로 다른 Task 에서 동일한 키값을 push 한 후, 단순히 해당 키값을 pull로 꺼낼 때, 가장 마지막에 push된 키값이 반환
  • 안전하게 키값을 꺼내오기 위해서는 대상 Task를 가리키는 task_ids 파라미터를 명시할 수 있음

주의) Airflow 3.0 버전에서는 task_ids 가 반드시 명시되어야 함

@task(task_id="task2")
def task2(**kwargs):
	ti = kwargs["ti"]
	value1 = ti.xcom_pull(key="key1", task_ids="task1")
    print(value1)

return 값 활용

  • @task 데코레이터 사용 시 return 값은 자동으로 XCom에 return_value 키로 저장
  • 다음 단계의 Task에서 이전 단계의 return 값을 꺼낼 수 있음
@task(task_id="task1")
def task1(**kwargs):
	return "value1"

@task(task_id="task2")
def task2(**kwargs):
	ti = kwargs["ti"]
	value1 = ti.xcom_pull(key="return_value", task_ids="task1")
    print(value1)

task1() >> task2()
  • 또는, 데코레이터 사용 시 함수의 출력값을 다음 함수의 입력값으로 직접 전달하는 표현을 통해 return 값을 인수로 전달할 수도 있음
@task(task_id="task1")
def task1(**kwargs):
	return "value1"

@task(task_id="task2")
def task2(value1, **kwargs):
	print(value1)

task2(task1())

PythonOperator 1

XCom 활용

  • 앞서 서술한 코드를 DAG 안에서 Task로 구현
  • 두 개의 xcom_push_task 에서 동일한 키값을 XCom에 push하고, xcom_pull_task 에서 Xcom으로부터 키값을 pull하여 출력
# dags/python_xcom1.py

from airflow.sdk import DAG, task
from airflow.models.taskinstance import TaskInstance
import pendulum

with DAG(
    dag_id="python_xcom1",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "xcom"],
) as dag:
    @task(task_id="xcom_push_task1")
    def xcom_push_task1(ti: TaskInstance, **kwargs):
        ti.xcom_push(key="key1", value="value1")
        ti.xcom_push(key="key2", value=[1,2,3])

    @task(task_id="xcom_push_task2")
    def xcom_push_task2(ti: TaskInstance, **kwargs):
        ti.xcom_push(key="key1", value="value2")
        ti.xcom_push(key="key2", value=[4,5,6])

    @task(task_id="xcom_pull_task")
    def xcom_pull_task(ti: TaskInstance, **kwargs):
        value1 = ti.xcom_pull(key="key1")
        value2 = ti.xcom_pull(key="key2", task_ids="xcom_push_task1")
        print(value1)
        print(value2)

    xcom_push_task1() >> xcom_push_task2() >> xcom_pull_task()

DAG 실행

  • 두 개의 xcom_push_task 실행 내역의 XCom 탭에서 key1key2 에 대한 값이 지정됨을 확인
  • xcom_pull_task 에서는 task_ids 를 지정하지 않았을 때 마지막으로 push된 "value2"가 출력될 것을 기대했지만, Airflow 3.0에서 발생한 업데이트로 인해 None 값이 출력

xcom-push-task1

xcom-push-task2

# xcom_pull_task

[2025-06-03, 15:27:41] INFO - None: chan="stdout": source="task"
[2025-06-03, 15:27:41] INFO - [1, 2, 3]: chan="stdout": source="task"

Airflow 3.0 업데이트

  • Airflow 3.0부터는 task_ids 를 반드시 명시하도록 변경됨
    • kwargs["ti"].xcom_pull(key="key") 와 같은 구문은 더 이상 작동하지 않음

In Airflow 2, the xcom_pull() method allowed pulling XComs by key without specifying task_ids, ..., leading to unpredictable behavior.
Airflow 3 resolves this inconsistency by requiring task_ids when pulling by key.

https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#behaviour-change-in-xcom-pull

PythonOperator 2

return 값 활용

  • xcom_return_task 에서 문자열 "Success"를 반환하고, 두 개의 xcom_pull_task 에서 서로 다른 방식으로 return 값을 받아 출력
# dags/python_xcom2

from airflow.sdk import DAG, task
from airflow.models.taskinstance import TaskInstance
import pendulum

with DAG(
    dag_id="python_xcom2",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "xcom"],
) as dag:
    @task(task_id="xcom_return_task")
    def xcom_return_task(**kwargs) -> str:
        return "Success"

    @task(task_id="xcom_pull_task1")
    def xcom_pull_task1(ti: TaskInstance, **kwargs):
        status = ti.xcom_pull(key="return_value", task_ids="xcom_return_task")
        print(f"\"xcom_return_task\" 함수의 리턴 값: {status}")

    @task(task_id="xcom_pull_task2")
    def xcom_pull_task2(status: str, **kwargs):
        print(f"\"xcom_return_task\" 함수로부터 전달받은 값: {status}")

    return_value = xcom_return_task()
    return_value >> xcom_pull_task1()
    xcom_pull_task2(return_value)

DAG 실행

  • 두 개의 xcom_pull_task 에서 모두 정상적으로 return 값을 받아서 동일한 결과가 출력됨을 확인
# xcom_pull_task1

[2025-06-03, 15:36:10] INFO - "xcom_return_task" 함수의 리턴 값: "Success": chan="stdout": source="task"
# xcom_pull_task2

[2025-06-03, 15:36:10] INFO - "xcom_return_task" 함수로부터 전달받은 값: "Success": chan="stdout": source="task"

BashOperator

XCom 및 return 값 활용

  • Jinja 템플릿 문법을 통해 ti.xcom_push 또는 ti.xcom_pull 사용이 가능
  • 마지막 출력문은 자동으로 return_value 로 전달
# dags/bash_xcom.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_xcom",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "xcom"],
) as dag:
    bash_push_task = BashOperator(
        task_id="bash_push_task",
        bash_command="echo START && echo XCOM PUSHED {{ ti.xcom_push(key='bash_pushed', value='bash_message') }} && echo COMPLETE",
    )

    bash_pull_task = BashOperator(
        task_id="bash_pull_task",
        env={
            "PUSHED_VALUE": "{{ ti.xcom_pull(key='bash_pushed', task_ids='bash_push_task') }}",
            "RETURN_VALUE": "{{ ti.xcom_pull(key='return_value', task_ids='bash_push_task') }}"
        },
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
    )

    bash_push_task >> bash_pull_task

DAG 실행

  • bash_push_task 의 실행 내역에서 직접 push한 bash_pushed 가 XCom에 들어있고, 마지막 출력문도 return_value 로 저장되어 있음을 조회
  • bash_pull_task 에서 첫 번째로는 XCom에서 bash_pushed 키를 가지고 꺼낸 PUSHED_VALUE 값을 출력하여, 실행 로그에 "bash_message" 가 출력됨을 확인
  • 두 번째로는 Xcom에서 return_value 키를 가지고 꺼낸 RETURN_VALUE 값을 출력하여, 실행 로그에 bash_push_task 의 마지막 출력문 "COMPLETE" 가 출력됨을 확인

xcom-push-task

# bash_pull_task

[2025-06-03, 16:05:09] INFO - bash_message: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 16:05:09] INFO - COMPLETE: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

Python → Bash 전달

  • PythonOperator에서 딕셔너리 객체를 반환했을 경우, BashOperator에서 XCom을 통해 딕셔너리 내 특정 값을 꺼낼 수 있음
  • 반대로, BashOperator에서 push한 값 또는 마지막 출력문을 PythonOperator에서 XCom을 통해 꺼낼 수도 있음
@task(task_id="python_push")
def python_push_xcom():
	return {"status":"Success", "data":[1,2,3]}

bash_pull = BashOperator(
	task_id="bash_pull",
    env={
    	"STATUS": "{{ ti.xcom_pull(key=\"return_value\", task_ids=\"python_push\")[\"status\"] }}",
    	"DATA": "{{ ti.xcom_pull(key=\"return_value\", task_ids=\"python_push\")[\"data\"] }}"
	},
    bash_command="echo $STATUS && echo $DATA"
)

Variable

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html

  • 모든 DAG에서 공유하는 전역 변수
  • Airflow UI에서 Admin 메뉴를 통해 접근 및 생성 가능

variable-main

variable-add

Variable 가져오기

  • Variable 라이브러리를 통해 전역 변수를 꺼내는 방법
  • 해당 방법은 DAG를 파싱할 때마다 DB 연결을 발생시켜 불필요한 부하가 발생 (스케줄러 과부하 원인)
from airflow.models import Variable

var = Variable.get("key")
  • Jinja 템플릿을 이용해 Operator 내부에서 가져오는 방법
  • 실제 실행할때만 DB에 접근하기 때문에 상대적으로 부하가 적음 (Airflow에서 권장하는 방법)
# 방법 2

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
	task_id="bash_task",
    bash_command=f"echo {{var.value.key}}"
)

Variable 활용

  • 앞서 서술한 코드를 DAG 안에서 Task로 구현
  • 첫 번째 Task에서는 Variable 라이브러리로 꺼낸 전역 변수를 출력하고, 두 번째 Task에서는 Jinja 템플릿을 이용해 전역 변수를 출력
# dags/bash_variable.py

from airflow.sdk import DAG, Variable
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_variable",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "xcom"],
) as dag:
    var = Variable.get("sample_key")

    bash_var_task1 = BashOperator(
        task_id="bash_var_task1",
        bash_command=f"echo variable: \"{var}\"",
    )

    bash_var_task2 = BashOperator(
        task_id="bash_var_task2",
        bash_command="echo variable: \"{{ var.value.sample_key }}\"",
    )

DAG 실행

  • 기대했던 것과 달리, 실행 로그에서는 전역 변수가 마스킹 처리되어 출력
  • airflow.cfg 설정에서 sensitive_var_conn_names 항목을 확인해보고, Web UI 컨테이너에 들어가서 airflow variables get 명령어로 전역 변수가 마스킹된 채로 저장되어 있는지도 확인해보고, BashOperator 안에서 비교 연산자로 설정한 것과 동일한 값이 가져와지는지도 출력해서 확인해봤는데, 모두 정상적이고 실행 로그에서 전역 변수를 직접 출력할 때만 마스킹 처리됨
  • 어떻게든 출력해보려고 했지만, 모든 사람이 접근할 수 있는 환경 변수를 평문으로 출력시키지 않으려는 의도가 있다고 짐작하고 추가적인 시도를 중지함
# bash_var_task1

[2025-06-03, 16:52:07] INFO - variable: ***: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/mask-sensitive-values.html

profile
데이터의 모든 것을 추구합니다.

0개의 댓글