Xcom(Cross Communication)이란?
- airflow Dag안에서 Task간 데이터 공유를 위해 사용되는 기술
- ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
- 주로 작은 규모의 데이터 공유를 위해 사용
- Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장되기 때문이다.
- 1GB 이상의 대용량 데이터를 공유를 하기 위해서는 외부 솔루션 사용이 필요하다.(AWS S3, HDFS 등)
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value_1')
ti.xcom_push(key='result2', value=[1, 2, 3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value_2')
ti.xcom_push(key='result2', value=[1, 2, 3, 4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key='result1') # xcom_push2의 value값을 가져온다.(가장 최근것)
value2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
xcom_pull에서 코드를 확인해보자.
value1은 result1이라는 key를 가진 value를 가져오는 것이다. 이때, task 순서는 push_task1 >> push_task2 순서이므로 가장 최근에 수행된 task는 push2이다. 따라서, value2가 출력된다.
value2는 result2라는 key를 가진 value를 가져오는 것이다. 이번에는 task_ids 파라미터에 구체적으로 task 이름을 넣음으로써, [1, 2, 3]이 올바르게 출력되었다.
[2023-08-14, 00:01:02 KST] {logging_mixin.py:150} INFO - value_2
[2023-08-14, 00:01:02 KST] {logging_mixin.py:150} INFO - [1, 2, 3]
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return "Success"
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print("xcom_pull 메서드로 직접 찾은 리턴 값: "+ value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print("함수 입력값으로 받은 값: " + status)
# task flow
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return) # status는 리턴값인 success가 들어갈 것
python_xcom_push_by_return >> xcom_pull_1()
task flow에 따라, python_xcom_push_by_return에는 xcom_push_result()의 리턴값인 ‘success’가 들어가 있는 상태이다. 물론, python_xcom_push_by_return은 일반적인 함수는 아니고 airflow 객체이다. 해당 return값은 Xcom에 push가 되어있는 상태이다.
xcom_pull2는 status로 success값이 들어가 있는 상태이다. 따라서 이것이 맞다면 print(”함수 입력값으로 받은 값: “ + success)가 출력될 것이다.
그 다음에 python_xcom_push_by_return 다음으로 xcom_pull_1이 수행된다. x_com_pull_1은 task_ids가 python_xcom_push_by_return의 return값을 value1으로 가지고 있다.
따라서, print(”xcom_pull 메서드로 직접 찾은 리턴값": + success)가 출력될 것이다.
# python_xcom_pull_2 로그
[2023-08-14, 00:01:00 KST] {logging_mixin.py:150} INFO - 함수 입력값으로 받은 값: Success
# python_xcom_pull_1 로그
[2023-08-14, 00:01:00 KST] {logging_mixin.py:150} INFO - xcom_pull 메서드로 직접 찾은 리턴 값: Success
Xcom push방법 | Xcom pull 방법 |
---|---|
ti.xcom_push 명시적 사용 | ti.xcom_pull 명시적 사용 |
함수 return | return 값을 input으로 사용 |