모든 워크플로우 자동화 도구가 그렇듯, Apache Airflow에서도 테스크(Task) 간에 데이터를 주고받아야 하는 상황이 빈번하게 발생합니다. Airflow에서는 이를 위해 XCom(Cross-Communication)이라는 메커니즘을 제공합니다.
XCom은 Airflow의 메타데이터베이스에 저장되는 작은 데이터 조각으로, 태스크 간에 데이터를 공유할 수 있게 해줍니다. 예를 들어, 한 태스크에서 생성한 파일 경로, 처리된 데이터의 요약 정보, 또는 다음 태스크에 필요한 설정값 등을 전달할 수 있습니다.
XCom을 사용할 때 주의해야 할 중요한 점은 데이터 크기 제한입니다. 표준 XCom 백엔드를 사용할 경우, XCom의 크기 제한은 사용 중인 메타데이터 데이터베이스에 따라 결정됩니다:
| 데이터베이스 | 크기 제한 |
|---|---|
| PostgreSQL | 1 GB |
| SQLite | 2 GB |
| MySQL | 64 KB |
보시다시피, 특히 MySQL의 경우 64KB라는 매우 작은 제한이 있어 큰 데이터를 전달하기에는 부적합합니다. 만약 XCom을 통해 전달하려는 데이터가 메타데이터 데이터베이스의 크기 제한을 초과할 가능성이 있다면, Custom XCom 저장소를 구축하여 활용하는 것을 고려해야 합니다.
**context 파라미터 활용하기전통적인 PythonOperator 방식에서 호출되는 함수에 **context를 인자로 받아 ti(Task Instance) 객체에 접근하는 방법입니다.
def push_to_xcom(**context):
message = "사과"
ti = context["ti"]
ti.xcom_push(
key='message',
value=message
)
return message # 'return_value'라는 키로도 자동 저장됨
def pull_from_xcom(**context):
ti = context["ti"]
xcom_value = ti.xcom_pull(
task_ids='py1',
key='message'
)
print("py1에서 전달받은 결과 : ", xcom_value)
ti.xcom_push를 사용할 때 고유한 key를 지정할 수 있습니다.**context에는 무엇이 들어있을까요?많은 분들이 **context를 프린트했을 때 쏟아지는 방대한 양의 메타데이터에 당황하곤 합니다. 이 데이터들은 현재 실행 중인 DAG과 태스크의 '상태 정보'입니다.
주요 항목들을 표로 정리하면 다음과 같습니다:
| 키(Key) | 설명 | 예시 |
|---|---|---|
ds | 태스크가 실행되는 논리적 날짜 (Date String) | '2024-01-14' |
ds_nodash | 대시(-)가 제거된 날짜 문자열 | '20240114' |
ti / task_instance | 현재 실행 중인 태스크 인스턴스 객체 | <TaskInstance: ...> |
dag | 현재 태스크가 속한 DAG 객체 | <DAG: ...> |
logical_date | 태스크의 논리적 실행 시점 (Pendulum 객체) | 2024-01-14T00:00:00+00:00 |
run_id | 현재 DAG Run의 고유 식별자 | 'scheduled__2024-01-14T00:00:00+00:00' |
print(context) 실행 시 실제 출력 예시{
'ds': '2024-01-14',
'ds_nodash': '20240114',
'logical_date': DateTime(2024, 1, 14, 0, 0, 0, tzinfo=Timezone('UTC')),
'dag': <DAG: xcom01_dag>,
'ti': <TaskInstance: xcom01_dag.py1 [running]>,
'run_id': 'scheduled__2024-01-14T00:00:00+00:00',
'params': {},
... (중략) ...
}
따라서 **context를 사용한다는 것은, "Airflow가 태스크를 실행하면서 들고 있는 모든 보따리(메타데이터)를 다 넘겨줘!"라고 요청하는 것과 같습니다.
get_current_context() 사용하기함수의 인자로 **context를 넘기지 않더라도, get_current_context()를 통해 현재 실행 중인 태스크의 컨텍스트를 동적으로 가져올 수 있습니다.
from airflow.sdk import get_current_context
def push_to_xcom():
message = "사과"
context = get_current_context()
ti = context['ti']
ti.xcom_push(key='message', value=message)
return message
def pull_from_xcom():
context = get_current_context()
ti = context['ti']
xcom_value = ti.xcom_pull(task_ids='py1', key='message')
print("py1에서 전달받은 결과 : ", xcom_value)
return 사용하기 (현대적인 방식)Airflow 2.0 이상에서 권장되는 TaskFlow API를 사용하면, 복잡한 xcom_push/pull 코드 없이 함수의 return 값만으로 데이터를 전달할 수 있습니다.
@task(task_id="first")
def first_func(args):
join_list = ' '.join(args)
return join_list # 자동으로 XCom에 push됨
@task(task_id='second')
def second_func(message):
# 인자로 넘겨받은 message는 이전 태스크의 return 값 (XCom pull)
changed_list = '!' + message + '!'
return changed_list
# DAG 내에서 호출
message = first_func(['FLOWER', 'AIRFLOW'])
second_func(message)
multiple_outputs)태스크가 여러 개의 결과값을 딕셔너리 형태로 반환할 때, multiple_outputs=True 옵션을 주면 각각의 키 값이 개별 XCom 항목으로 저장됩니다.
@task(task_id="first", do_xcom_push=True, multiple_outputs=True)
def first_func(args):
join_list = ' '.join(args)
return {"key1": join_list} # 'key1'이라는 이름으로 XCom에 저장됨
Python이 아닌 다른 오퍼레이터(예: BashOperator)에서 XCom 데이터를 사용하고 싶을 때는 Jinja 템플릿 형식을 사용합니다.
t1 = PythonOperator(
task_id="make_dirname",
python_callable=make_dirname, # 내부에서 ti.xcom_push(key="dir_path", ...) 수행
)
t2 = BashOperator(
task_id="make_dir",
bash_command="mkdir -p {{ ti.xcom_pull(task_ids='make_dirname', key='dir_path') }}"
)
| 방식 | 특징 | 추천 상황 |
|---|---|---|
| `context`** | 명시적 객체 전달 | 복잡한 컨텍스트 제어가 필요할 때 |
get_current_context | 깔끔한 함수 시그니처 | 함수 인자를 단순하게 유지하고 싶을 때 |
TaskFlow return | 가장 간결하고 직관적 | 일반적인 Python 기반 태스크 연결 시 |
multiple_outputs | 딕셔너리 자동 분리 | 여러 결과값을 개별적으로 전달할 때 |
| Jinja 템플릿 | 오퍼레이터 간 통합 | Bash, SQL 등 다른 오퍼레이터로 전달할 때 |