Airflow 2.9 에서 dynamic task의 이름 주기

Kanto(칸토)·2025년 1월 11일
0

프로그래밍

목록 보기
6/6

airflow에서 필요한 기능을 찾으려다가 2.9 version 이상에서 제공되는 기능이라는 것을 확인했다. ChatGPT와 같은 도구를 무조건적으로 쓸 때 주의할 점은 제안되고 있는 코드가 내가 사용하고 있는 환경에서도 동작할 수 있는지를 정확히 확인해야 한다는 점이다.

https://airflow.apache.org/blog/airflow-2.9.0/

아무튼 찾으려고 했던 것은 Dynamic Task에 내가 원하는 이름을 부여할 수 있는 Custom names for Dynamic Task Mapping 이라는 기능이다.기존에는 이름을 따로 부여하는 것은 가능하지 않았고 0,1,2,...와 같이 index가 부여되고 있었기 때문에 각각의 하위 task에 어떤 parameter가 돌고 있는지를 시각적으로 확인할 수 없었다. 나는 우리 팀에 airflow를 도입하고 싶기 때문에 airflow를 사용하면 DAG단위 뿐만 아니라 각각의 개별 하위 태스크들에 대한 시각적인 정보를 제공할 수 있어서 우리의 요구사항에 잘 맞을 것이라는 점을 강조하고 싶었다.

airflow 2.9에서는map_index_template 라는 airflow 파라미터를 사용할 수 있게 되었다. 파라미터의 값으로 macro를 사용한 값을 사용할 수 있게 된다는 점이다. 즉

BashOperator.partial(
    task_id="hello",
    bash_command="echo Hello $NAME",
    map_index_template="{{ task.env['NAME'] }}",
).expand(
    env=[{"NAME": "John"}, {"NAME": "Bob"}, {"NAME": "Fred"}],
)

와 같이 {{ }} task macro로 접근하여 내부 변수인 env에 접근할 수 있게 되었다.

이제 여러개의 변수에 대하여 동시에 수행될 수 있는 Dynamic Task인 hello[] 는 각각 John, Bob, Fred 라는 이름을 Map Index로 보여줄 수 있다.

Task Operator에 정의할 수도 있지만 Decorator 방식으로도 사용할 수 있다. 두 번째 task인 process_data 의 데코레이터에 map_index_template을 적용할 수 있게 되었다.

@dag(start_date=datetime(2024, 1, 1), schedule_interval=None)
def dynamic_task_dag():

    @task
    def generate_parameters():
        return [
            {"table": "users", "date": "2024-01-01"},
            {"table": "orders", "date": "2024-01-02"},
            {"table": "products", "date": "2024-01-03"}
        ]

    @task(map_index_template="{{ task.parameters['table'] }}_{{ task.parameters['date'] }}")
    def process_data(parameters):
        print(f"Processing {parameters['table']} for date {parameters['date']}")

    parameters = generate_parameters()
    process_data.expand(parameters=parameters)

dynamic_task_dag()

Airflow 1.x 까지 써봤고 2.x 로 넘어가는 시점부터 써보지 못했는데 이번에 새로운 기능을 찾아보다가 Airflow가 정말 좋은 기능이 많아졌다는 생각이 들었다. 처음에 사용할 땐 decorator도 없었고, dynamic task도 없었으니 말이다.

profile
통계학으로 사람들의 행동을 이해하고 싶습니다.

0개의 댓글

관련 채용 정보