Airflow의 Asset이란? 간단한 사용법

GarionNachal·2025년 4월 25일
0

airflow

목록 보기
5/8
post-thumbnail

Asset이란?

Airflow의 Asset은 Airflow 2.4 버전부터 도입된 기능으로, 데이터 중심의 워크플로우를 구축할 수 있도록 지원합니다. Asset은 구체적인 데이터 엔티티(예: S3 버킷의 파일, 데이터베이스 테이블 등) 또는 추상적인 데이터 엔티티를 나타내는 객체입니다.

Asset을 사용하면 특정 데이터가 업데이트될 때 DAG(Directed Acyclic Graph)를 실행하도록 스케줄링할 수 있어, 기존의 시간 기반 스케줄링보다 더 데이터 중심적인 접근이 가능합니다. 이를 통해 데이터 파이프라인의 효율성과 가시성이 향상됩니다.

Asset의 주요 개념

Airflow Asset을 이해하기 위한 몇 가지 주요 개념은 다음과 같습니다:

  1. Asset: 고유한 이름으로 정의된 데이터 엔티티를 나타내는 객체. 선택적으로 URI를 첨부할 수 있습니다.
  2. @asset: 코드를 간소화해 에셋 중심으로 DAG를 작성할 수 있는 데코레이터. @asset 사용 시 하나의 DAG와 하나의 태스크가 생성되며, 성공적으로 완료되면 에셋을 업데이트합니다.
  3. Asset 이벤트: 특정 에셋에 붙는 이벤트로, Producer 태스크가 에셋을 업데이트할 때 생성됩니다. 에셋 이벤트는 타임스탬프와 함께 정의되며, 선택적으로 추가 정보를 담을 수 있습니다.
  4. Asset 스케줄: 하나 이상의 에셋 이벤트가 생성될 때 트리거되는 DAG의 스케줄입니다.
  5. Producer 태스크outlets 파라미터에 제공된 하나 이상의 에셋을 업데이트하는 태스크로, 성공적으로 완료되면 에셋 이벤트를 생성합니다.
  6. Asset 표현식: AND(&)와 OR(|) 연산자를 사용하여 여러 에셋에 대한 업데이트를 기반으로 한 DAG 스케줄을 정의하는 논리적 표현식입니다.

Asset을 사용하는 이유

Airflow에서 Asset을 사용하면 다음과 같은 이점이 있습니다:

  1. 팀 간 표준화된 커뮤니케이션: Asset은 특정 위치의 데이터가 업데이트되었고 사용할 준비가 되었음을 알리는 API와 같은 역할을 합니다.
  2. 코드 간소화크로스-DAG 의존성을 구현하는 데 필요한 코드 양을 줄여줍니다.
  3. 가시성 향상: Airflow UI의 Assets 그래프에서 DAG와 데이터 간의 연결 관계를 시각적으로 확인할 수 있습니다.
  4. 비용 절감: Sensor나 크로스-DAG 의존성의 다른 구현 방식과 달리 Asset은 워커 슬롯을 사용하지 않습니다.
  5. 복잡한 데이터 기반 스케줄 생성: 조건부 Asset 스케줄링과 결합된 Asset 및 시간 기반 스케줄링을 사용하여 복잡한 데이터 기반 스케줄을 생성할 수 있습니다.
  6. 메시지 큐 기반 이벤트 스케줄링: 이벤트 기반 스케줄링을 통해 메시지 큐의 메시지를 기반으로 DAG를 스케줄링할 수 있습니다.

Asset 정의 방법

기본 Asset 정의

가장 간단한 Asset 스케줄은 하나의 Asset에 대한 업데이트에 기반하여 실행되는 하나의 DAG입니다. 다음 예제는 Asset을 정의하고 태스크의 outlets 파라미터에 제공하는 방법을 보여줍니다:

Copyfrom airflow.sdk import Asset, dag, task

@dag
def my_producer_dag():
    @task(outlets=[Asset("my_asset")])
    def my_producer_task():
        # 작업 로직 구현
        pass

    my_producer_task()

my_producer_dag()

Asset을 기반으로 스케줄링할 소비자 DAG는 다음과 같이 정의합니다:

Copyfrom airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator

@dag(
    schedule=[Asset("my_asset")],
)
def my_consumer_dag():
    EmptyOperator(task_id="empty_task")

my_consumer_dag()

이렇게 하면 my_producer_task가 성공적으로 완료될 때마다 my_consumer_dag가 트리거됩니다.

Asset Alias 사용

런타임에 생성된 Asset 이름을 기반으로 DAG를 스케줄링하기 위해 Asset 별칭을 생성할 수 있습니다. Asset 별칭은 고유한 이름 문자열로 정의되며, 정규 Asset 대신 outlets 및 schedules에서 사용할 수 있습니다.

Copyfrom airflow.sdk import Asset, AssetAlias, Metadata, task

my_alias_name = "my_alias"

@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
    bucket_name = "my-bucket"  # 런타임에 결정되는 값
    yield Metadata(
        asset=Asset(f"updated_{bucket_name}"),
        extra={"k": "v"},  # 추가 정보, 빈 딕셔너리도 가능
        alias=AssetAlias(my_alias_name),
    )

attach_event_to_alias_metadata()

소비자 DAG에서는 일반 Asset 대신 Asset 별칭을 사용할 수 있습니다:

Copyfrom airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator

my_alias_name = "my_alias"

@dag(schedule=[AssetAlias(my_alias_name)])
def my_consumer_dag():
    EmptyOperator(task_id="empty_task")

my_consumer_dag()

Asset 스케줄링 방법

Asset 스케줄링에는: 여러 방법이 있습니다:

기본 Asset 스케줄링

가장 간단한 형태로, 하나의 Asset 업데이트에 기반하여 DAG를 실행할 수 있습니다:

Copy@dag(schedule=[Asset("my_asset")])
def my_consumer_dag():
    # 태스크 정의
    pass

다중 Asset 스케줄링

여러 Asset을 list로 제공하면, 모든 Asset이 적어도 한 번씩 업데이트된 후에 DAG가 실행됩니다:

Copy@dag(
    schedule=[
        Asset("asset1"),
        Asset("asset2"),
        Asset("asset3"),
    ],
)
def multiple_assets_example():
    # 태스크 정의
    pass

조건부 Asset 스케줄링

논리 연산자를 사용하여 더 복잡한 Asset 업데이트 조건을 정의할 수 있습니다:

Copy@dag(
    schedule=(
        Asset("asset1")
        | Asset("asset2")  # asset1 또는 asset2가 업데이트될 때
    ),  # [] 대신 ()를 사용해야 합니다!
)
def downstream1_on_any():
    # 태스크 정의
    pass

더 복잡한 조건의 예:

Copy@dag(
    schedule=(
        (Asset("asset1") | Asset("asset2"))  # asset1 또는 asset2가 업데이트되고
        & (Asset("asset3") | Asset("asset4"))  # asset3 또는 asset4가 업데이트될 때
    ),
)
def downstream2_one_in_each_group():
    # 태스크 정의
    pass

Asset과 시간 기반 스케줄링 결합

AssetOrTimeSchedule 타임테이블을 사용하여 Asset 기반 스케줄링과 시간 기반 스케줄링을 결합할 수 있습니다:

Copyfrom airflow.sdk import Asset, dag, task
from pendulum import datetime
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

@dag(
    start_date=datetime(2025, 3, 1),
    schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # 매일 자정
        assets=(Asset("asset3") | Asset("asset4")),  # asset3 또는 asset4가 업데이트될 때
    )
)
def toy_downstream3_asset_and_time_schedule():
    # 태스크 정의
    pass

Asset 활용 예제

실제 사용 사례를 통해 Asset을 이해해보겠습니다. 다음은 칵테일 정보를 가져와 파일에 저장하는 Producer DAG와 해당 파일을 읽는 Consumer DAG의 예입니다:

Producer DAG

Copyfrom airflow.sdk import Asset, dag, task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")

@dag
def assets_producer_dag():
    @task
    def get_cocktail(api):
        import requests
        r = requests.get(api)
        return r.json()

    @task(outlets=[INSTRUCTIONS])
    def write_instructions_to_file(response):
        cocktail_name = response["drinks"][0]["strDrink"]
        cocktail_instructions = response["drinks"][0]["strInstructions"]
        msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
        f = open("include/cocktail_instructions.txt", "a")
        f.write(msg)
        f.close()

    @task(outlets=[INFO])
    def write_info_to_file(response):
        import time
        time.sleep(30)
        cocktail_name = response["drinks"][0]["strDrink"]
        cocktail_category = response["drinks"][0]["strCategory"]
        alcohol = response["drinks"][0]["strAlcoholic"]
        msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
        f = open("include/cocktail_info.txt", "a")
        f.write(msg)
        f.close()

    cocktail = get_cocktail(api=API)
    write_instructions_to_file(cocktail)
    write_info_to_file(cocktail)

assets_producer_dag()

Consumer DAG

Copyfrom airflow.sdk import Asset, dag, task

INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")

@dag(schedule=[INSTRUCTIONS, INFO])  # 두 Asset 모두에 대한 스케줄링
def assets_consumer_dag():
    @task
    def read_about_cocktail():
        cocktail = []
        for filename in ("info", "instructions"):
            with open(f"include/cocktail_{filename}.txt", "r") as f:
                contents = f.readlines()
                cocktail.append(contents)
        return [item for sublist in cocktail for item in sublist]

    read_about_cocktail()

assets_consumer_dag()

이 예제에서, assets_producer_dag의 태스크가 INSTRUCTIONS와 INFO Asset을 업데이트할 때마다 assets_consumer_dag가 트리거됩니다.

마무리

Airflow의 Asset은 데이터 중심의 워크플로우를 구축하기 위한 강력한 기능입니다. 시간 기반 스케줄링을 넘어 데이터 업데이트에 기반한 스케줄링을 가능하게 함으로써, 더 효율적이고 직관적인 데이터 파이프라인을 구축할 수 있게 합니다.

Asset을 사용하면:

  • 팀 간 데이터 의존성에 대한 명확한 커뮤니케이션이 가능합니다.
  • 복잡한 데이터 파이프라인의 가시성이 향상됩니다.
  • 여러 데이터 소스 간의 의존성을 명시적으로 정의할 수 있습니다.
  • 크로스-DAG 의존성을 더 효율적으로 구현할 수 있습니다.

Airflow UI의 Assets 탭에서 데이터 의존성을 시각적으로 확인하고, 데이터 파이프라인을 더 효과적으로 관리할 수 있습니다.

Asset은 Airflow 2.4 버전 이상에서 사용할 수 있으므로, 데이터 중심의 워크플로우를 구축하고자 한다면 이 기능을 적극 활용해보시기 바랍니다.


참고 자료:

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글