[Airflow] DAG(Directed Acyclic Graph)

jake·2022년 8월 16일
0

Airflow

목록 보기
1/1

1. DAG

DAG을 직역하면 비순환 그래프라는 뜻이다.
일반적으로 DAG 파일은 서로 다른 태스크와 해당 의존성을 기술하는 하나의 DAG에 대해 정의한다.
DAG로 파이프라인 구조를 정의하면, 파이프라인을 언제 실행할 것인지 각각의 DAG의 실행 주기를 설정할 수 있다.

 


그림처럼 하나의 DAG 파일에는 여러 task가 존재한다.

 




2. DAG의 구조

 

기본 구조

로켓 발사 데이터 다운로드 및 처리를 위한 DAG

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)


def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify

 

DAG 객체 인스턴스 생성

dag = DAG( # 객채의 인스턴스 생성(구체화), 모든 워크플로의 시작점
    dag_id="download_rocket_launches", # DAG의 이름 : download_rocket_launches
    start_date=airflow.utils.dates.days_ago(14), # DAG의 처음 실행 날짜 
    schedule_interval=None, # DAG의 실행 간격
)

dag(소문자)는 DAG(대문자) 클래스를 구체화한 인스턴스의 이름이고 인스턴스의 이름은 임의로 지정하면 된다. DAG 클래스는 두 개의 인수가 필요하다.
위의 코드를 통해 Airflow UI에 표시되는 DAG의 이름은 download_rocket_launches라는 것을 알 수 있고, start_date를 통해 워크플로가 처음 실행되는 날짜/시간을 알 수 있다.

또한 schedule_interval을 None으로 설정했는데 이는 DAG가 자동으로 실행되지 않음을 의미한다. 밑에서 예약으로 실행하는 방법을 알아보겠다.

 

배시 커맨드를 실행하기 위해 BashOperator 객체 인스턴스 생성

download_launches = BashOperator(
    task_id="download_launches", #태스크 이름
    bash_command="curl -o /tmp/launches.json -L  'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # 실행할 배시 커맨드
    dag=dag, # DAG 변수에 대한 참조
)

각 오퍼레이터는 하나의 태스크를 수행하고 여러 개의 오퍼레이터가 Airflow의 워크플로 또는 DAG를 구성한다.
오퍼레이터는 서로 독립적으로 실행할 수 있지만, 순서를 정의해 실행할 수도 있습니다.
Airflow에서는 이를 의존성(dependency)라고 한다.

 

PythonOperator를 사용한 파이썬 함수 실행

def _get_pictures(): # 호출할 파이썬 함수
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) # 경로가 없으면 디렉터리 생성

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f: # 이전 단계의 태스크 결과 확인
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url) # 각각의 이미지 다운로드
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content) # 각각의 이미지 저장
                print(f"Downloaded {image_url} to {target_file}") # Airflow 로그에 저장하기 위해 stdout으로 출력
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator( # 파이썬 함수 호출을 위해 PythonOperator 구체화
    task_id="get_pictures", python_callable=_get_pictures, # 실행할 파이썬 함수를 지정 
    dag=dag
)

Airflow의 PythonOperator는 파이썬 코드 실행을 담당한다.
앞에서 사용된 BashOperator와 마찬가지로 모든 오퍼레이터에는 task_id가 필요한데 task_id는 태스크 실행 시에 참조되며,task_id는 태스크 실행 시에 참조 된다.

PythonOperator의 사용 시 다음 두 가지 사항을 항상 지켜야 한다.

  1. 오퍼레이터 자신(get_pictures)을 정의해야 한다.

  2. pythoncollable은 인수에 호출이 가능한 일반 함수(_get_pictures)를 가리킨다

태스크 실행 순서 정의

download_launches >> get_pictures >> notify # 화살표는 태스크 실행 순서를 결정

Airflow에서 오른쪽 시프트 연산자 >>를 사용하여 태스크 간의 의존성을 정의한다.



DAG은 모든 워크플로의 시작점이다.
워크플로 내의 모든 태스크는 DAG 개체를 참조하므로 Airflow는 어떤 태스크가 어떤 DAG에 속하는지 확인할 수 있다.

 

스케줄 간격 정의하기

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
    )

dag는 DAG 클래스를 구체화한 인스턴스 이름이다.
인스턴스 이름은 임의로 지정 가능하고, 모든 오퍼레이터는 변수(dag)를 참조하여 인스턴스가 어떤 DAG에 속해있는지 Airflow에게 알려준다.

schedule_interval은 실행간격을 나타낸다.
schedule_interval=None 이 설정은 DAG이 자동으로 실행되지 않음을 의미한다.
만약 schedule_interval=@daily 로 설정하면 Airflow가 워크플로를 하루에 한 번 실행하기 때문에 직접 트리거할 필요가 없다.

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily"
    start_date=dt.datetime(2019,1,1),
    end_date=dt.datetime(2019,1,5),
    )

start_date=dt.datetime(2019,1,1),end_date=dt.datetime(2019,1,5)를 추가하였는데 이는 첫 번째 DAG의 실행을 2019년 1월 1일 자정 이후에 하고, 마지막 DAG의 실행을 2019년 1월 5일 자정 이후에 한다는 뜻이다.

DAG의 실행기록을 따지면 2019년 1월 2일 00:00에 최초로 실행되고 이후에 2019년 1월 3일 00:00, 2019년 1월 4일 00:00, 2019년 1월 5일 00:00에 실행하고 end_date에서 지정한 2019년 1월 5일 00:00 이후인 2019년 1월 6일 00:00에 마지막 실행을 하고 종료된다.


Cron 기반의 스케줄 간격 설정하기

앞선 예제에서 DAG을 매일 실행시켰는데 그렇다면 매시간 또는 매주 작업을 실행하거나 매주 토요일 23시 45분에 DAG을 실행하려면 어떻게 해야할까?

이를 지원하기 위해 Cron 구문을 이용하는데 Cron 구문은 5개의 구성 요소가 있고 다음과 같이 정의된다.

*(분, 0 ~ 59) *(시간, 0 ~ 23) *(일, 1 ~ 31) *(월, 1 ~ 12) *(요일, 0 ~ 6)

0 * * * * = 매시간(정시에 실행)
0 0 * * * = 매일(자정에 실행)
0 0 * * 0 = 매주(일요일 자정에 실행)
0 0 1 * * = 매월 1일 자정에 실행
45 23 * * SAT = 매주 토요일 23시 45분에 실행


또한 Cron 식을 사용할 때, 콤마(,)를 사용하여 값의 리스트를 정의하거나 대시(-)를 사용하여 값의 범위를 정의할 수 있다.


0 0 * * MON, WED, FRI = 매주 월, 화, 금요일 자정에 실행
0 0 * * MON-FRI = 매주 월요일부터 금요일 자정에 실행
0 0,12 * * * = 매일 자정 및 오후 12시에 실행


Airflow는 스케줄 간격을 의미하는 약어를 사용한 몇 가지 매크로를 지원하는데 앞에서 본 @daily가 예시다.

@once : 1회만 실행하도록 스케줄
@hourly : 매시간 변경 시 1회 실행
@daily : 매일 자정에 1회 실행
@weekly : 매주 일요일 자정에 1회 실행
@monthly : 매월 1일 자정에 1회 실행
@yearly : 매년 1월 1일 자정에 1회 실행

 

빈도 기반의 스케줄 간격 설정하기

DAG을 3일에 한 번씩 실행하는 Cron 식은 어떻게 정의해야 할까?
매월 1, 4, 7 ...로 표현할 때 이번 달 31일과 다음 달 1일을 포함해 생각하면 , 다음 달에 원하는 결과를 얻을 수 없다.
Airflow는 이런 Cron식의 제약을 스케줄 간격 정의를 통해 극복한다.
빈도기반 스케줄을 사용하려면 timedelta 인스턴스를 사용하면 된다.

dag = DAG(
    dag_id="04_time_delta",
    schedule_interval=dt.timedelta(days=3),
    start_date=dt.datetime(2019,1,1),
    end_date=dt.datetime(2019,1,5),
    )

이렇게 설정하면 DAG가 시작 시간으로부터 3일마다 실행된다.

 

배시 커맨드

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)

task_id="download_launches"는 태스크의 명칭을 download_launches라고 설정한 것이고 밑에 두 줄은 실행할 배시 커맨드를 나타낸다.
dag=dag는 DAG 변수에 대한 참조를 뜻한다.


download_launches >> get_pictures >> notify

화살표(>>)는 태스크 실행 순서를 설정하는데
이를 통해, download_launches가 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures가 성공적으로 완료된 후에만 notify 태스크가 실행된다.

 

데이터 증분 처리하기

DAG가 매일 실행(@daily 스케줄 유지시)되지만, DAG가 매일 사용자 이벤트 카탈로그 전체를 다운로드하고 계산하는 것은 비효율적이다.
또한 지난 30일 동안의 이벤트만 다운로드하기 때문에, 그 이상 과거 특정 시점의 이벤트는 존재하지 않는다.

 

이벤트 데이터 증분 가져오기

이런 문제를 해결하기 위해 데이터를 순차적으로 가져올 수 있도록 DAG를 변경하자. 스케줄 간격에 해당하는 일자의 이벤트만 로드하고 새로운 이벤트만 통계를 계산한다.

curl -0 http://localhost:5000/events?start_date=2019-01-01&end_date=2019-01-02
이 예제에서 start_date는 포함되는 날짜이며, end_date는 포함되지 않는 날짜이다.
즉, 2019-01-01 00:00:00과 2019-01-01 23:59:59 사이에 발생하는 이벤트 데이터를 가져 온다.

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        "start_date=2019-01-01&"
        "end_date=2019-01-02"
    ),
    dag=dag,
)

 

실행 날짜를 사용하여 동적 시간 참조하기

Airflow는 태스크가 실행되는 특정 간격을 정의할 수 있는 추가 매개변수를 제공한다.
이런 매개변수 중 가장 중요한 매개변수는 DAG가 실행되는 날짜와 시간을 나타내는 execution_date이다. execution_date는 DAG를 시작하는 시간의 특정 날짜가 아니라 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프이다. 스케줄 간격의 종료 시간은 next_execution_date라는 매개변수를 사용한다.

이해 안됨 ;
내용 추가 예정

 


데이터 파티셔닝

새로운 fectch_events 태스크로 새롭게 스케줄한 간격으로 점진적인 이벤트 데이터를 가져오면, 각각의 새로운 태스크가 전일의 데이터를 덮어쓰게 된다.
이 문제를 해결하기 위해 events.json 파일에 새 이벤트를 추가할 수 있는데
이 방법은 특정 날짜의 통계 계산을 하려고 해도 전체 데이터 세트를 로드하는 다운스트림 프로세스 작업이 필요하다는 것이다.

그렇기에 다른 방법을 사용하는게 효율적인데 태스크의 출력을 해당 실행 날짜의 이름이 적힌 파일에 기록함으로써 데이터 세트를 일일 배치로 나누는 것이다.

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events/{{ds}}.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&"
        "end_date={{next_ds}}"
    ),
    dag=dag,
)

data/events/{{ds}}.json 코드를 통해 2019-01-01 실행 날짜에 다운로드되는 모든 데이터가 data/events/2019-01-01.json 파일에 기록된다.

데이터 세트를 더 작고 관리하기 쉬운 조각으로 나누는 작업은 데이터 저장 및 처리 시스템에서 일반적인 전략이다. 이러한 방법을 일반적으로 파티셔닝(partitioning)이라고 하며, 데이터 세트의 작은 부분을 파티션(partitions)이라고 한다.

코드 추가 예정 이해 안됨 ;

Airflow의 실행 날짜 이해

고정된 스케줄 간격으로 태스크 실행

실행 날짜는 Airflow에서 중요한 부분이다.
Airflow가 시작 날짜, 스케줄 간격, 종료 날짜의 세 가지 매개 변수를 사용하여 DAG를 실행하는 시점을 제어할 수 있다. 실제로 DAG를 예약하기 위해 Airflow는 이 세 가지 매개 변수를 사용하여 시간을 스케줄 간격으로 나눈다. 지정된 시작 날짜부터 시작하여 종료 날짜(선택 사항)에 종료된다.

이러한 방법을 간격 기반 스케줄이라고 하는데 이 방법은 증분 데이터 처리 유형을 수행하는데 적합하다.
작업이 실행되는 현재 시간만 아는 cron과 같은 시점 기반(point-based) 스케줄링 시스템과는 극명한 대조를 이룬다.

task에서 previous_execution_date 및 next_execution_date 매개변수를 사용할 때 주의해야 할 사항은, 이러한 매개변수가 DAG 실행을 통해서만 정의된다는 것이다.
따라서 Airflow UI 또는 CLI를 통해 수동으로 실행하는 경우, Airflow가 다음 또는 이전 스케줄 간격에 대한 정보를 확인할 수 없기 때문에 매개변수 값이 정의되지 않아 사용할 수 없다.

 

 

과거 데이터 간격을 메꾸기 위해 백필 사용하기

Airflow를 사용하면 임의의 시작 날짜로부터 스케줄 간격을 정의할 수 있으므로 과거의 시작 날짜부터 과거 간격을 정의할 수도 있다.
이 속성을 이용해 과거 데이터 세트를 로드하거나 분석하기 위해 DAG의 과거 기록을 실행할 수도 있다. 이 프로세스를 보통 백필(backfilling)이라고 한다.

과거 시점의 작업 실행하기

Airflow는 아직 실행되지 않은 과거 스케줄 간격을 예약하고 실행한다.
따라서 과거 시작 날짜를 지정하고 해당 DAG를 활성화하면 현재 시간이 실행되기 전에 경과된 모든 간격이 생성된다.
이 동작은 DAG의 catchip 매개변수에 의해 제어되며 catchup을 false로 설정하여 비활성화할 수 있다.

dag = DAG(
    dag_id="09_no_catchup",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
    catchup=False,
)

이 설정으로 DAG는 과거 모든 스케줄 간격으로 태스크를 실행하는 대신 가장 최근 스케줄 간격에 대해서만 실행한다.

 


태스크 디자인을 위한 모범 사례

Airflow의 백필링과 재실행 태스크는 높은 부하가 가지만 핵심적인 속성이 만족하는지 확인하여 적절한 결과를 내도록 해야 한다.
이를 위해 원자성(atomicity)과 멱등성(idempotency)을 알아보자.

원자성

원자성이라는 용어는 데이터베이스 시스템에서 자주 사용되며, 여기서 원자성 트랜잭션은 모두 발생하거나 전혀 발생하지 않는, 나눌 수 없고 돌이킬 수 없는 일련의 데이터베이스와 같은 작업으로 간주된다.


예를 들어 사용자 이벤트 DAG에 대해 간단한 동작을 확장하여 각 실행이 끝날 때마다 상위 10명의 사용자에게 이메일을 발송하는 기능을 추가하려고 한다.

def _calculate_stats(**context):
    """Calculates event statistics."""
    input_path = context["templates_dict"]["input_path"]
    output_path = context["templates_dict"]["output_path"]

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

    _email_stats(stats, email="user@example.com")

이 접근법의 단점은 더 이상 원자성을 유지하지 못한다는 것이다.
만약 _send_stats 함수가 실패하면 어떤 일이 발생할까.
이 경우, 이미 output_path 경로에 통계에 대한 출력 파일이 저장되어 있기 때문에 통계 발송이 실패했음에도 불구하고 작업이 성공한 것처럼 보이게 된다.

원자성을 유지하는 방식으로 구현하기 위해서는 이메일 기능을 별도의 태스크로 분할하면 된다.

def _send_stats(email, **context):
    stats = pd.read_csv(context["templates_dict"]["stats_path"])
    email_stats(stats, email=email)


send_stats = PythonOperator(
    task_id="send_stats",
    python_callable=_send_stats,
    op_kwargs={"email": "user@example.com"},
    templates_dict={"stats_path": "/data/stats/{{ds}}.csv"},
    dag=dag,
)

이렇게 하면 이메일 전송에 실패해도 더 이상 calculate_stats 작업의 결과에 영향을 주지 않고 send_stats만 실패하므로 두 작업 모두 원자성이 유지된다.

이 예를 보면, 모든 작업을 개별 태스크로 분리하면 모든 태스크의 원자성을 생각할 수 있지만 그렇지는 않다.
대부분의 Airflow 오퍼레이터는 이미 원자성을 유지하도록 설계되어 있지만
좀 더 유연한 파이썬 및 배시(bash) 오퍼레이터 사용 시에 태스크가 원자성을 유지할 수 있도록 주의를 기울여야 한다.


멱등성

Airflow 태스크 작성 시 고려해야 할 또 다른 중요한 속성은 멱등성이다.
동일한 입력으로 동일한 태스크를 여러 번 호춯해도 결과에 효력이 없어야 한다.
즉, 입력 변경 없이 태스크를 다시 실행해도 전체 결과가 변경되지 않아야 한다.

일반적으로 데이터를 쓰는 태스크는 기존 결과를 확인하거나 이전 태스크 결과를 덮어쓸지 여부를 확인하여 멱등성을 유지할 수 있다.
보다 일반적인 응용 프로그램에서는 작업의 모든 과정에서 오류가 발생할 수 있는 모든 상황을 멱등성이 보장되는지 확인해야 한다.

0개의 댓글