airflow 데이터 공유

yoon__0_0·2024년 6월 17일
0

이어드림 수업

목록 보기
64/103

Xcom

  • cross Communication
  • airflow DAG안 task간 데이터 공유를 위해 사용되는 기술
  • task1의 수행중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우
  • 주로 작은 규모의 데이터 공유를 위해서 사용함.

python operater 에서 사용하기

1) **kwargs에 존재하는 ti(task_instance) 객체 활용

  • ti 객체는 xcom_push 와 xcom_pull 메서드를 활용하여 값을 넣거나 뺄수 있음.
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
  ti = kwargs['ti']
  ti.xcom_push(key="result1", value="value_1") ti.xcom_push(key="result2", value=[1, 2, 3])
  
@task(task_id='python_xcom_pull_task') 
def xcom_pull(**kwargs):
  ti = kwargs['ti']
  value1 = ti.xcom_pull(key="result1")
  # task_ids 까지 적어주면 더 확실하게 가져올 수 있음. 
  value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1') print(value1)
  print(value2)

2) task 데코레이터 활용 & 파이썬 함수의 return 값 활용

  • task 데코레이터 활용시 함수 입출력 관계만으로 데이터 전달이 수행되고 선후행 관계가 정해짐
@task(task_id='xcom_push_by_return') 
def xcom_push_by_return(**kwargs):
  transaction_value = 'status Good' 
  return transaction_value
  
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
	print(status) 

xcom_pull_by_return(xcom_push_by_return())

3) 혼용해서 사용하기

  • task 데코레이터를 쓰면 return 값은 자동으로 xcom에 key = 'return_value', task_ids = task_id로 저장됨
@task(task_id='xcom_push_by_return') 
def xcom_push_return(**kwargs):
	transaction_value = 'status Good' 
    return transaction_value
    
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
  ti = kwargs['ti']
  pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return') 
  print(pull_value)
  
xcom_push_return() >>̬xcom_pull_return_by_method()

실습

1) ti 객체를 이용해서 실습해보기

  • 같은 key값으로 정의할때 뒤에 정의한 key값으로 덮어짐.
  • 그래서 같은 key값이라면 task_ids를 적어줘야함.
from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)


    xcom_push1() >> xcom_push2() >> xcom_pull()

결과

xcome

2) 두개 같이 사용해보기

from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:

    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'

    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        # 기본적으로 key는 return values이다. 
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값:' + status)

    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()
  • 두개의 관계가 어떻게 될까? : pull1, 2 는 함께 실행됨.

bash operater에서 실습

  • template 적용 가능 파라미터면 xcom을 사용할 수 있음.
  • xcom.push, xcom.pull 이용하기
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
                 "echo XCOM_PUSHED "
                 "{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                 "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
            'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
        do_xcom_push=False
    )

    bash_push >> bash_pull
  • 결과값
    => first_bash_message COMPLETE

    • 기본적으로 echo 값도 리턴값으로 들어가서 xcom의 return_values로 들어가게 됨.
    • do_xcom_push : 는 return 값 (여기선 echo 값을 xcom에 들어가지 않도록 하기 위한 것 )

서로다른 operator 끼리 사용

python <-> bash

from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_python_with_xcom",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:

    @task(task_id='python_push')
    def python_push_xcom():
        result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
        return result_dict

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={
            'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
            'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
            'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'

        },
        bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
    )
    python_push_xcom() >> bash_pull

    bash_push = BashOperator(
    task_id='bash_push',
    bash_command='echo PUSH_START '
                 '{{ti.xcom_push(key="bash_pushed",value=200)}} && '
                 'echo PUSH_COMPLETE'
    )

    @task(task_id='python_pull')
    def python_pull_xcom(**kwargs):
        ti = kwargs['ti']
        status_value = ti.xcom_pull(key='bash_pushed')
        return_value = ti.xcom_pull(task_ids='bash_push')
        print('status_value:' + str(status_value))
        print('return_value:' + return_value)

    bash_push >> python_pull_xcom()
  • 실행 결과
python push xcombash_pull의 결과bash_push xcompython_pull_xcom결과

python <-> email

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
from airflow.operators.email import EmailOperator

with DAG(
    dag_id="dags_python_email_operator",
    schedule="0 8 1 * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='something_task')
    def some_logic(**kwargs):
        from random import choice 
        return choice(['Success','Fail'])


    send_email = EmailOperator(
        task_id='send_email',
        to='email',
        subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
        html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
                    {{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
    )

    some_logic() >> send_email

variables

  • airflow의 전경변수로써 key, value 로 저장하고 다른 스케쥴과 DAG에서 접근 가능한 공유방법

  • 저장한 변수는 메타 DB의 variable 테이블에 저장됨.

  • admin -> variavles

값을 가져오기

  • airflow.models에 있는 Variable을 활용해서 get 활용
  • jinja 템플릿 이용 : {{var.value.key_name}}
  • 하지만 airflow는 jinja 를 활용하기를 권장함.
    • airflow는 주기적으로 DAG을 parsing 하며 변경 사항과 이상 유무를 점검하며 비록 실제 실행하지 않아도 내부적으로 구문을 실행함
    • 따라서 첫번째와 같이 주기적으로 DB연결하여 수행한다면 부하증가의 원인이 됨
    • 하지만 템플릿을 이용하면 run 할때 실행되기 때문에 부하가 안됨.

bashoperator 실습

  • 코드
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.models import Variable

with DAG(
    dag_id="dags_bash_with_variable",
    schedule="10 9 * * *",
    start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    var_value = Variable.get("sample_key")
	
    # 1번방법 
    bash_var_1 = BashOperator(
    task_id="bash_var_1",
    bash_command=f"echo variable:{var_value}"
    )
	
    # 2번방법
    bash_var_2 = BashOperator(
    task_id="bash_var_2",
    bash_command="echo variable:{{var.value.sample_key}}"
    )
  • 결과값 (1번 2번 둘다 동일하게 뜸)
profile
신윤재입니다

0개의 댓글