airflow를 사용하다보면 개발/운영 환경의 분리나 초기 적재/데일리 적재 등 옵션에 따라 다른 프로세스를 수행해야 할 때가 있다. 그때마다 dag나 task를 만들거나 직접 소스 내, config 파일을 바꾸는 것은 너무 비효율적이다. 이런 상황에서 사용할 수 있는 TIP이 있어 정리하고자 한다.
@dag(
default_args=args,
description="파이프라인",
schedule_interval="30 3 * * *",
catchup=False,
tags=["data"],
params={
"mode" : "daily",
"env" : "dev"
}
)
def test_dag():
...
dag 정의 시, params라는 옵션을 입력한다. JSON형태로 key-value는 사용자가 정의 가능함.
airflow dags trigger --conf '{"mode":"dev", "env":"dev"}' test_dag
위와 같이 지정한 params는 operator마다 사용방법이 다름
1. bash operator
test_operator = BashOperator(
task_id="test",
bash_command="bash " + source_path + "/src/script/test.sh {{ params.env }}"
)
def choose_mode(**context):
mode = context['params']['mode']
if mode == 'init':
target = 'init_generate_data'
else:
target = 'daily_generate_data'
return target
task_branch = BranchPythonOperator(
task_id='task_branch',
python_callable=choose_mode
)