Xcom: 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유가 된다.
- 따라서, 어제 수행되었던 Task와 오늘 수행된 Task끼리는 데이터 공유가 안된다.
모든 DAG가 공유할 수 있는 전역 변수는 없을까?
Variable 등록하기
- Admin 탭에 Variable로 들어가기
- Variable도 Xcom처럼 key-value 형태로 값을 넣어주면 된다.
- 해당 값은 메타 DB에 저장된다.
from airflow.models import Variable
get
을 사용해서 해당 키 값에 대응하는 value를 가져올 수 있다.from airflow.operators.bash import BashOperator
from airflow.models import Variable
var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variable: {var_value}"
)
var.value
.{key명}을 입력하면 value를 가져올 수 있다. from airflow.operators.bash import BashOperator
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo
variable: {{ var.value.sample_key }}"
)
airflow에서는 2안)을 권고한다. (스케줄러의 부하를 줄이기 위함)
- 1안)의 경우, 스케줄러의 주기적 DAG 파싱시, Variable.get 개수만큼 DB연결을 일으켜 불필요한 부하가 발생하기 때문에, 스케줄러 과부하를 발생하기 때문이다.
- Variable은 메타 DB 테이블에 등록되기 때문에, .get을 통해 계속해서 가져오면 변수의 갯수가 많으면 부하가 심해진다.
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.models import Variable
with DAG(
dag_id="dags_bash_with_variable",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variables: {var_value}"
)
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo variable: {{ var.value.sample_key }}"
)
# bash_var_1
2023-08-14, 02:11:04 KST] {subprocess.py:93} INFO - variables: sample_value
#bash_var_2
[2023-08-14, 02:11:04 KST] {subprocess.py:93} INFO - variable: sample_value