전역 공유 변수 Variable

우상욱·2024년 3월 27일

Airflow Master Class

목록 보기
24/24

1. 전역변수 Variable 이해


  • Xcom: 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유
  • 모든 DAG이 공유할 수 있는 전역 변수는 없을까?
  • Variable 등록하기
    • 실제 Variable의 Key, Value 값은 메타 DB에 저장됨(variable 테이블)

2. 전역변수 사용하기


  • 1안. Varaible 라이브러리 이용, 파이썬 문법을 이용해 미리 가져오기
from airflow.models import Variable

var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
    task_id="bash_var_1",
    bash_command=f"echo variable:{var_value}"
    )
  • 2안. Jinja 템플릿 이용, 오퍼레이터 내부에서 가져오기
bash_var_2 = BashOperator(
    task_id="bash_var_2",
    bash_command=f"echo variable:{{var.value.sample_key}}"
)

권장하는 방식은 2안입니다. 스케줄러의 주기적 DAG 파싱시 Variable.get 개수만큼 DB 연결을 일으켜서 불필요한 부하가 발생하고, 스케줄러 과부하 원인 중 하나입니다.

  • 그런데 이 전역 변수는 언제 사용하면 좋을까?
    협업환경에서 표준화된 dag를 만들기 위해서 주로 사용
    주로 상수(CONST)로 지정해서 사용할 변수들 셋팅

  • bash_sh_dir = /opt/airflow/plugins/shell

  • base_file_dir = /opt/airflow/plugins/files

  • email, Alert 메시지를 받을 담당자들의 email 주소 정보

실습


from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.models import Variable
import datetime
import pendulum

with DAG(
    dag_id="dags_bash_with_variable",
    schedule="10 9 * * *",
    start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    var_value = Variable.get("sample_key")
    bash_var1 = BashOperator(
        task_id="bash_var_1",
        bash_command=f"echo variable:{var_value}",
    )
    bash_var2 = BashOperator(
        task_id="bash_var2",
        bash_command="echo variable:{{var.value.sample_key}}",
    )

    bash_var1 >> bash_var2
profile
데이터엔지니어

0개의 댓글