Airflow의 Asset은 Airflow 2.4 버전부터 도입된 기능으로, 데이터 중심의 워크플로우를 구축할 수 있도록 지원합니다. Asset은 구체적인 데이터 엔티티(예: S3 버킷의 파일, 데이터베이스 테이블 등) 또는 추상적인 데이터 엔티티를 나타내는 객체입니다.
Asset을 사용하면 특정 데이터가 업데이트될 때 DAG(Directed Acyclic Graph)를 실행하도록 스케줄링할 수 있어, 기존의 시간 기반 스케줄링보다 더 데이터 중심적인 접근이 가능합니다. 이를 통해 데이터 파이프라인의 효율성과 가시성이 향상됩니다.
Airflow Asset을 이해하기 위한 몇 가지 주요 개념은 다음과 같습니다:
@asset
사용 시 하나의 DAG와 하나의 태스크가 생성되며, 성공적으로 완료되면 에셋을 업데이트합니다.outlets
파라미터에 제공된 하나 이상의 에셋을 업데이트하는 태스크로, 성공적으로 완료되면 에셋 이벤트를 생성합니다.&
)와 OR(|
) 연산자를 사용하여 여러 에셋에 대한 업데이트를 기반으로 한 DAG 스케줄을 정의하는 논리적 표현식입니다.Airflow에서 Asset을 사용하면 다음과 같은 이점이 있습니다:
가장 간단한 Asset 스케줄은 하나의 Asset에 대한 업데이트에 기반하여 실행되는 하나의 DAG입니다. 다음 예제는 Asset을 정의하고 태스크의 outlets 파라미터에 제공하는 방법을 보여줍니다:
Copyfrom airflow.sdk import Asset, dag, task
@dag
def my_producer_dag():
@task(outlets=[Asset("my_asset")])
def my_producer_task():
# 작업 로직 구현
pass
my_producer_task()
my_producer_dag()
Asset을 기반으로 스케줄링할 소비자 DAG는 다음과 같이 정의합니다:
Copyfrom airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator
@dag(
schedule=[Asset("my_asset")],
)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
이렇게 하면 my_producer_task
가 성공적으로 완료될 때마다 my_consumer_dag
가 트리거됩니다.
런타임에 생성된 Asset 이름을 기반으로 DAG를 스케줄링하기 위해 Asset 별칭을 생성할 수 있습니다. Asset 별칭은 고유한 이름 문자열로 정의되며, 정규 Asset 대신 outlets
및 schedules
에서 사용할 수 있습니다.
Copyfrom airflow.sdk import Asset, AssetAlias, Metadata, task
my_alias_name = "my_alias"
@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # 런타임에 결정되는 값
yield Metadata(
asset=Asset(f"updated_{bucket_name}"),
extra={"k": "v"}, # 추가 정보, 빈 딕셔너리도 가능
alias=AssetAlias(my_alias_name),
)
attach_event_to_alias_metadata()
소비자 DAG에서는 일반 Asset 대신 Asset 별칭을 사용할 수 있습니다:
Copyfrom airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator
my_alias_name = "my_alias"
@dag(schedule=[AssetAlias(my_alias_name)])
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
Asset 스케줄링에는: 여러 방법이 있습니다:
가장 간단한 형태로, 하나의 Asset 업데이트에 기반하여 DAG를 실행할 수 있습니다:
Copy@dag(schedule=[Asset("my_asset")])
def my_consumer_dag():
# 태스크 정의
pass
여러 Asset을 list로 제공하면, 모든 Asset이 적어도 한 번씩 업데이트된 후에 DAG가 실행됩니다:
Copy@dag(
schedule=[
Asset("asset1"),
Asset("asset2"),
Asset("asset3"),
],
)
def multiple_assets_example():
# 태스크 정의
pass
논리 연산자를 사용하여 더 복잡한 Asset 업데이트 조건을 정의할 수 있습니다:
Copy@dag(
schedule=(
Asset("asset1")
| Asset("asset2") # asset1 또는 asset2가 업데이트될 때
), # [] 대신 ()를 사용해야 합니다!
)
def downstream1_on_any():
# 태스크 정의
pass
더 복잡한 조건의 예:
Copy@dag(
schedule=(
(Asset("asset1") | Asset("asset2")) # asset1 또는 asset2가 업데이트되고
& (Asset("asset3") | Asset("asset4")) # asset3 또는 asset4가 업데이트될 때
),
)
def downstream2_one_in_each_group():
# 태스크 정의
pass
AssetOrTimeSchedule
타임테이블을 사용하여 Asset 기반 스케줄링과 시간 기반 스케줄링을 결합할 수 있습니다:
Copyfrom airflow.sdk import Asset, dag, task
from pendulum import datetime
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
start_date=datetime(2025, 3, 1),
schedule=AssetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"), # 매일 자정
assets=(Asset("asset3") | Asset("asset4")), # asset3 또는 asset4가 업데이트될 때
)
)
def toy_downstream3_asset_and_time_schedule():
# 태스크 정의
pass
실제 사용 사례를 통해 Asset을 이해해보겠습니다. 다음은 칵테일 정보를 가져와 파일에 저장하는 Producer DAG와 해당 파일을 읽는 Consumer DAG의 예입니다:
Copyfrom airflow.sdk import Asset, dag, task
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
@dag
def assets_producer_dag():
@task
def get_cocktail(api):
import requests
r = requests.get(api)
return r.json()
@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
@task(outlets=[INFO])
def write_info_to_file(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
cocktail = get_cocktail(api=API)
write_instructions_to_file(cocktail)
write_info_to_file(cocktail)
assets_producer_dag()
Copyfrom airflow.sdk import Asset, dag, task
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
@dag(schedule=[INSTRUCTIONS, INFO]) # 두 Asset 모두에 대한 스케줄링
def assets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
read_about_cocktail()
assets_consumer_dag()
이 예제에서, assets_producer_dag
의 태스크가 INSTRUCTIONS
와 INFO
Asset을 업데이트할 때마다 assets_consumer_dag
가 트리거됩니다.
Airflow의 Asset은 데이터 중심의 워크플로우를 구축하기 위한 강력한 기능입니다. 시간 기반 스케줄링을 넘어 데이터 업데이트에 기반한 스케줄링을 가능하게 함으로써, 더 효율적이고 직관적인 데이터 파이프라인을 구축할 수 있게 합니다.
Asset을 사용하면:
Airflow UI의 Assets 탭에서 데이터 의존성을 시각적으로 확인하고, 데이터 파이프라인을 더 효과적으로 관리할 수 있습니다.
Asset은 Airflow 2.4 버전 이상에서 사용할 수 있으므로, 데이터 중심의 워크플로우를 구축하고자 한다면 이 기능을 적극 활용해보시기 바랍니다.
참고 자료: