airflow를 사용하다보면 개발/운영 환경의 분리나 초기 적재/데일리 적재 등 옵션에 따라 다른 프로세스를 수행해야 할 때가 있다. 그때마다 dag나 task를 만들거나 직접 소스 내, config 파일을 바꾸는 것은 너무 비효율적이다. 이런 상황에서 사용할 수 있는 TIP이 있어 정리하고자 한다.
@dag(
    default_args=args,
    description="파이프라인",
    schedule_interval="30 3 * * *",
    catchup=False,
    tags=["data"],
    params={
        "mode" : "daily",
        "env" : "dev"
    }
)
def test_dag():
  ...
dag 정의 시, params라는 옵션을 입력한다. JSON형태로 key-value는 사용자가 정의 가능함.

airflow dags trigger --conf '{"mode":"dev", "env":"dev"}' test_dag
위와 같이 지정한 params는 operator마다 사용방법이 다름
1. bash operator 
test_operator = BashOperator(
        task_id="test",
        bash_command="bash " + source_path + "/src/script/test.sh {{ params.env }}"
)
def choose_mode(**context):
    mode = context['params']['mode']
    if mode == 'init':
        target = 'init_generate_data'
    else:
        target = 'daily_generate_data'
    return target
    
task_branch = BranchPythonOperator(
        task_id='task_branch',
        python_callable=choose_mode
)