airflow에서 필요한 기능을 찾으려다가 2.9 version 이상에서 제공되는 기능이라는 것을 확인했다. ChatGPT와 같은 도구를 무조건적으로 쓸 때 주의할 점은 제안되고 있는 코드가 내가 사용하고 있는 환경에서도 동작할 수 있는지를 정확히 확인해야 한다는 점이다.
https://airflow.apache.org/blog/airflow-2.9.0/
아무튼 찾으려고 했던 것은 Dynamic Task에 내가 원하는 이름을 부여할 수 있는 Custom names for Dynamic Task Mapping
이라는 기능이다.기존에는 이름을 따로 부여하는 것은 가능하지 않았고 0,1,2,...
와 같이 index가 부여되고 있었기 때문에 각각의 하위 task에 어떤 parameter가 돌고 있는지를 시각적으로 확인할 수 없었다. 나는 우리 팀에 airflow를 도입하고 싶기 때문에 airflow를 사용하면 DAG단위 뿐만 아니라 각각의 개별 하위 태스크들에 대한 시각적인 정보를 제공할 수 있어서 우리의 요구사항에 잘 맞을 것이라는 점을 강조하고 싶었다.
airflow 2.9에서는map_index_template
라는 airflow 파라미터를 사용할 수 있게 되었다. 파라미터의 값으로 macro를 사용한 값을 사용할 수 있게 된다는 점이다. 즉
BashOperator.partial(
task_id="hello",
bash_command="echo Hello $NAME",
map_index_template="{{ task.env['NAME'] }}",
).expand(
env=[{"NAME": "John"}, {"NAME": "Bob"}, {"NAME": "Fred"}],
)
와 같이 {{ }}
task macro로 접근하여 내부 변수인 env
에 접근할 수 있게 되었다.
이제 여러개의 변수에 대하여 동시에 수행될 수 있는 Dynamic Task인 hello[]
는 각각 John, Bob, Fred 라는 이름을 Map Index로 보여줄 수 있다.
Task Operator에 정의할 수도 있지만 Decorator 방식으로도 사용할 수 있다. 두 번째 task인 process_data
의 데코레이터에 map_index_template
을 적용할 수 있게 되었다.
@dag(start_date=datetime(2024, 1, 1), schedule_interval=None)
def dynamic_task_dag():
@task
def generate_parameters():
return [
{"table": "users", "date": "2024-01-01"},
{"table": "orders", "date": "2024-01-02"},
{"table": "products", "date": "2024-01-03"}
]
@task(map_index_template="{{ task.parameters['table'] }}_{{ task.parameters['date'] }}")
def process_data(parameters):
print(f"Processing {parameters['table']} for date {parameters['date']}")
parameters = generate_parameters()
process_data.expand(parameters=parameters)
dynamic_task_dag()
Airflow 1.x 까지 써봤고 2.x 로 넘어가는 시점부터 써보지 못했는데 이번에 새로운 기능을 찾아보다가 Airflow가 정말 좋은 기능이 많아졌다는 생각이 들었다. 처음에 사용할 땐 decorator도 없었고, dynamic task도 없었으니 말이다.