섹션5: 데이터 공유(Python Operator)

류홍규·2023년 8월 13일
0

airflow

목록 보기
1/18
post-thumbnail

Python Operator에서 Xcom 사용

Xcom(Cross Communication)이란?

  • airflow Dag안에서 Task간 데이터 공유를 위해 사용되는 기술
    • ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
    • 주로 작은 규모의 데이터 공유를 위해 사용
      • Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장되기 때문이다.
        • 1GB 이상의 대용량 데이터를 공유를 하기 위해서는 외부 솔루션 사용이 필요하다.(AWS S3, HDFS 등)

방법 1) (**kwargs)에 존재하는 ti(task_instance) 객체 활용

from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key='result1', value='value_1')
        ti.xcom_push(key='result2', value=[1, 2, 3])
    
    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key='result1', value='value_2')
        ti.xcom_push(key='result2', value=[1, 2, 3, 4])
    
    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key='result1') # xcom_push2의 value값을 가져온다.(가장 최근것)
        value2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)
    
    xcom_push1() >> xcom_push2() >> xcom_pull()


xcom_pull에서 코드를 확인해보자.

value1은 result1이라는 key를 가진 value를 가져오는 것이다. 이때, task 순서는 push_task1 >> push_task2 순서이므로 가장 최근에 수행된 task는 push2이다. 따라서, value2가 출력된다.

value2는 result2라는 key를 가진 value를 가져오는 것이다. 이번에는 task_ids 파라미터에 구체적으로 task 이름을 넣음으로써, [1, 2, 3]이 올바르게 출력되었다.

[2023-08-14, 00:01:02 KST] {logging_mixin.py:150} INFO - value_2
[2023-08-14, 00:01:02 KST] {logging_mixin.py:150} INFO - [1, 2, 3]

방법 2-1) 파이썬 함수의 return 값 활용(1안)

  • Task 데커레이터 사용시 함수 입력/출력 관계만으로 Task flow 정의가 된다.
  • return을 하게 되면 airflow는 자동으로 Xcom에 저장하게 된다.

방법 2-2) 파이썬 함수의 return값 활용(2안)

  • return한 값은 자동으로 Xcom에 저장되기 때문에 key를 따로 명시하지 않아도 된다.
from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return "Success"
    
    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print("xcom_pull 메서드로 직접 찾은 리턴 값: "+ value1)
    
    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print("함수 입력값으로 받은 값: " + status)
    
    # task flow
    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return) # status는 리턴값인 success가 들어갈 것
    python_xcom_push_by_return >> xcom_pull_1()


task flow에 따라, python_xcom_push_by_return에는 xcom_push_result()의 리턴값인 ‘success’가 들어가 있는 상태이다. 물론, python_xcom_push_by_return은 일반적인 함수는 아니고 airflow 객체이다. 해당 return값은 Xcom에 push가 되어있는 상태이다.

xcom_pull2는 status로 success값이 들어가 있는 상태이다. 따라서 이것이 맞다면 print(”함수 입력값으로 받은 값: “ + success)가 출력될 것이다.

그 다음에 python_xcom_push_by_return 다음으로 xcom_pull_1이 수행된다. x_com_pull_1은 task_ids가 python_xcom_push_by_return의 return값을 value1으로 가지고 있다.

따라서, print(”xcom_pull 메서드로 직접 찾은 리턴값": + success)가 출력될 것이다.

# python_xcom_pull_2 로그
[2023-08-14, 00:01:00 KST] {logging_mixin.py:150} INFO - 함수 입력값으로 받은 값: Success

# python_xcom_pull_1 로그
[2023-08-14, 00:01:00 KST] {logging_mixin.py:150} INFO - xcom_pull 메서드로 직접 찾은 리턴 값: Success

Xcom push방법Xcom pull 방법
ti.xcom_push 명시적 사용ti.xcom_pull 명시적 사용
함수 returnreturn 값을 input으로 사용
profile
공대생의 코딩 정복기

0개의 댓글