[Airflow] custom component

Woong·2026년 2월 9일

Apache Airflow

목록 보기
3/12

PythonOperator로 작업하기

평점 데이터 가져오기

  • _get_session
    • 영화 평점 데이터 서버 세션 생성 함수
  • _get_with_pagination
    • 페이지 처리하는 헬퍼 함수
  • _get_ratings 함수 결합

DAG 구축하기

  • _get_ratings 함수를 PythonOperator로 호출하여 스케줄 간격마다 평점데이터를 가져올수 있음
    • templates_dict에 넣어 전달하면 context variable 을 dict에서 참조
  • 아래 코드는 두 단계를 DAG로 구성
    • 평점 테이터를 가져오기 + 영화 랭킹 생성
import datetime as dt
import logging
import json
import os

import pandas as pd
import requests

from airflow import DAG
from airflow.operators.python import PythonOperator

from custom.ranking import rank_movies_by_rating


MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5000")

MOVIELENS_USER = os.environ["MOVIELENS_USER"]
MOVIELENS_PASSWORD = os.environ["MOVIELENS_PASSWORD"]


# start_date부터 end_date 사이의 평점 데이터를 가져옴.
# 한번에 100개 씩
def _get_ratings(start_date, end_date, batch_size=100):
    session, base_url = _get_session()

    yield from _get_with_pagination(
        session=session,
        url=base_url + "/ratings",
        params={"start_date": start_date, "end_date": end_date},
        batch_size=batch_size,
    )


# 평점 데이터를 가져올 서버의 세션 생성
def _get_session():

    session = requests.Session()
    session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)

    schema = MOVIELENS_SCHEMA
    host = MOVIELENS_HOST
    port = MOVIELENS_PORT

    base_url = f"{schema}://{host}:{port}"

    return session, base_url


# 주어진 세션, URL, 파라미터를 설정하여 배치 사이즈에 맞게 평점데이터 요청.
def _get_with_pagination(session, url, params, batch_size=100):

    offset = 0
    total = None
    while total is None or offset < total:
        response = session.get(
            url, params={**params, **{"offset": offset, "limit": batch_size}}
        )
        response.raise_for_status()
        response_json = response.json()

        yield from response_json["result"]

        offset += batch_size
        total = response_json["total"]


with DAG(
    dag_id="01_python",
    description="Fetches ratings from the Movielens API using the Python Operator.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:

	# 평점 데이터 다운
    def _fetch_ratings(templates_dict, batch_size=1000, **_):
        logger = logging.getLogger(__name__)

        start_date = templates_dict["start_date"]
        end_date = templates_dict["end_date"]
        output_path = templates_dict["output_path"]

        logger.info(f"Fetching ratings for {start_date} to {end_date}")
        ratings = list(
            _get_ratings(
                start_date=start_date, end_date=end_date, batch_size=batch_size
            )
        )
        logger.info(f"Fetched {len(ratings)} ratings")

        logger.info(f"Writing ratings to {output_path}")

        # Make sure output directory exists.
        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        with open(output_path, "w") as file_:
            json.dump(ratings, fp=file_)

    fetch_ratings = PythonOperator(
        task_id="fetch_ratings",
        python_callable=_fetch_ratings,
        templates_dict={
            "start_date": "{{ds}}",
            "end_date": "{{next_ds}}",
            "output_path": "/data/python/ratings/{{ds}}.json",
        },
    )

	# 평점 데이터로 영화 추천
    def _rank_movies(templates_dict, min_ratings=2, **_):
        input_path = templates_dict["input_path"]
        output_path = templates_dict["output_path"]

        ratings = pd.read_json(input_path)
        ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)

        # Make sure output directory exists.
        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        ranking.to_csv(output_path, index=True)

    rank_movies = PythonOperator(
        task_id="rank_movies",
        python_callable=_rank_movies,
        templates_dict={
            "input_path": "/data/python/ratings/{{ds}}.json",
            "output_path": "/data/python/rankings/{{ds}}.csv",
        },
    )

    fetch_ratings >> rank_movies

custom hook 빌드

  • Hook : 외부 시스템과 상호작용하는 데 사용되는 추상화 계층
    • 특정한 외부 시스템(예: 데이터베이스, API 등)과 연결하고, 데이터 전송을 돕기 위해 제공
    • BaseHook 클래스의 서브클래스로 생성
    • 훅 연결과 __init__ 메서드를 정의
from airflow.hooks.base_hook import BaseHook


class MovielensHook(BaseHook):
    def __init__(self, conn_id):	# 훅에게 어떤 connection 을 사용하는지 전달.
        super().__init__()
        self._conn_id = conn_id
  • 자격 증명을 안전하게 관리하는 법
    • airflow credentials
config = self.get_connection(self._conn_id)
  • get_conn, get_ratings 메서드 추가 결과.
class MovielensHook(BaseHook):

    def __init__(self, conn_id):
        super().__init__()
        self._conn_id = conn_id
	
	# 훅에서 사용할 커넥션 생성
    def get_conn(self):
        
	    if self._session is None:
               # BaseHook 클래스에서 제공하는 메소드.
               config = self.get_connection(self._conn_id)

            if not config.host:
                raise ValueError(f"No host specified in connection {self._conn_id}")

            schema = config.schema or self.DEFAULT_SCHEMA
            port = config.port or self.DEFAULT_PORT

            self._base_url = f"{schema}://{config.host}:{port}"
            self._session = requests.Session()

	    # 로그인 계정이 있으면
            if config.login:
                self._session.auth = (config.login, config.password)

        return self._session, self._base_url

    def get_ratings(self, start_date=None, end_date=None, batch_size=100):
        yield from self._get_with_pagination(
            endpoint="/ratings",
            params={"start_date": start_date, "end_date": end_date},
            batch_size=batch_size,
        )

    def _get_with_pagination(self, endpoint, params, batch_size=100):

        session, base_url = self.get_conn()
        url = base_url + endpoint

        offset = 0
        total = None
        while total is None or offset < total:
            response = session.get(
                url, params={**params, **{"offset": offset, "limit": batch_size}}
            )
            response.raise_for_status()
            response_json = response.json()

            yield from response_json["result"]

            offset += batch_size
            total = response_json["total"]

MovielensHook로 DAG 빌드하기

  • DAG 폴더 안에 패키지를 생성하고 hook.py라는 모듈에 훅을 저장.
chapter08
ㄴdags
   ㄴ custom
       ㄴ __init__.py
       ㄴ hooks.py
   ㄴ01_python.py
   ㄴ02.hook.py
ㄴ docker-compose.yml
ㄴ ...

아래와 같이 호출

from custom.hooks import MovielensHook

hook = MovielensHook(conn_id=conn_id)
ratins = hook.get_ratings(
    start_date=start_date,
	end_date=end_date,
	batch_size=batch_size
)

DAG에서 MovielensHook 사용하기

from custom.hooks import MovielensHook


with DAG(
    dag_id="02_hook",
    description="Fetches ratings from the Movielens API using a custom hook.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:

    def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
        logger = logging.getLogger(__name__)

        start_date = templates_dict["start_date"]
        end_date = templates_dict["end_date"]
        output_path = templates_dict["output_path"]

        logger.info(f"Fetching ratings for {start_date} to {end_date}")
        hook = MovielensHook(conn_id=conn_id)
        ratings = list(
            hook.get_ratings(
                start_date=start_date, end_date=end_date, batch_size=batch_size
            )
        )
        logger.info(f"Fetched {len(ratings)} ratings")

        logger.info(f"Writing ratings to {output_path}")

        # Make sure output directory exists.
        output_dir = os.path.dirname(output_path)
        os.makedirs(output_dir, exist_ok=True)

        with open(output_path, "w") as file_:
            json.dump(ratings, fp=file_)

    PythonOperator(
        task_id="fetch_ratings",
        python_callable=_fetch_ratings,
        op_kwargs={"conn_id": "movielens"},		# 사용할 커넥션 지정.
        templates_dict={
            "start_date": "{{ds}}",
            "end_date": "{{next_ds}}",
            "output_path": "/data/custom_hook/{{ds}}.json",
        },
    )

custom operator 빌드하기

  • MovielensHook 에도 시작/종료 날짜정의 파일 저장 같은 반복 코드를 작성해야하는 문제
    • custom operator 를 구현하여 반복적인 코드를 최소화

custom operator 정의하기

  • 모든 오퍼레이터는 BaseOperator 클래스의 서브클래스로 만든다.
  • 커스텀 오퍼레이터에만 사용되는 인수들은 생성자 메서드인 init에 명시적으로 지정 가능
    • 기본적으로 커넥션ID와 작업에 필요한 세부사항(시작/종료 날짜, 쿼리 등)이 포함
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook

class MovielensFetchRatingsOperator(BaseOperator):

    @apply_defaults		# 기본 DAG 인수를 커스텀 오퍼레이터에게 전달하기 위한 데커레이터
    def __init__(self,conn_id,**kwargs):	# BaseOperator 생성자에게 추가 키워드 인수를 전달
        super.__init__(**kwargs)
        self._conn_id = conn_id
		
		
# 오퍼레이터에 기본 인수 적용 (default_args 기본 인수를 사용하여 정의)
default_args = {
  "retires": 1,
  "retry_delay": timedelta(minutes=5)
}

with DAG(
  ...
  default_args=default_args
) as dag:
  MyCustomOperator(
    ...
  )
  
  def execute(self, context):	# 커스텀 오퍼레이터를 실행할때 호출되는 메인 메서드(context는 dict객체)
    ...
  • custom operator 구현
    • template_fields
      • DAG에서 오퍼레이터 인자를 하드코딩해서 호출하는 문제를 해결하기위해
      • 특정 변수를 템플릿으로 만들어서 컨텐스트 변수에 참조할 수 있다.
class MovielensFetchRatingsOperator(BaseOperator):

    template_fields = ("_start_date", "_end_date", "_output_path")

    @apply_defaults
    def __init__(
        self,
        conn_id,
        output_path,
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        batch_size=1000,
        **kwargs,
    ):
        super(MovielensFetchRatingsOperator, self).__init__(**kwargs)

        self._conn_id = conn_id
        self._output_path = output_path
        self._start_date = start_date
        self._end_date = end_date
        self._batch_size = batch_size

    
    def execute(self, context):
        hook = MovielensHook(self._conn_id)

        try:
            self.log.info(
                f"Fetching ratings for {self._start_date} to {self._end_date}"
            )
            ratings = list(
                hook.get_ratings(
                    start_date=self._start_date,
                    end_date=self._end_date,
                    batch_size=self._batch_size,
                )
            )
            self.log.info(f"Fetched {len(ratings)} ratings")
        finally:

            hook.close()

        self.log.info(f"Writing ratings to {self._output_path}")

        # Make sure output directory exists.
        output_dir = os.path.dirname(self._output_path)
        os.makedirs(output_dir, exist_ok=True)

        # Write output as JSON.
        with open(self._output_path, "w") as file_:
            json.dump(ratings, fp=file_)
  • MovielensFetchRatingsOperator 사용 예
from custom.operators import MovielensFetchRatingsOperator

with DAG(
    dag_id="03_operator",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:
    fetch_ratings = MovielensFetchRatingsOperator(
        task_id="fetch_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        output_path="/data/custom_operator/{{ds}}.json",
    )

custom sensor 빌드하기

  • DAG안에서 다운스트림 태스크 실행하기 전에 특정 조건이 충족될때까지 대기하기 위해 사용
    • 예) 다운스트림 태스크에서 데이터를 사용하기 전에 특정 파일이 사용 가능한지 체크
  • 아래와 같은 방식으로 사용
    • execute 메서드 대신 poke 메서드를 구현
from airflow.sensors.base import BaseSensorOperator

class MyCustomSensor(BaseSensorOperator):
  def poke(self, context):
    ...
  • 센서는 오퍼레이터와 유사하지만 exeucte 메서드와 다르게 poke 메서드는 True/False를 반환함
    • False이면 센서가 상태를 다시 체크할 때까지 몇 초 정도 대기 상태 유지
  • 커스텀 센서 클래스

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

from custom.hooks import MovielensHook


class MovielensRatingsSensor(BaseSensorOperator):

    template_fields = ("_start_date", "_end_date")

    @apply_defaults
    def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
        super().__init__(**kwargs)
        self._conn_id = conn_id
        self._start_date = start_date
        self._end_date = end_date

    def poke(self, context):
        hook = MovielensHook(self._conn_id)

        try:
            next(                                                     # 첫번째 레코드 가져오도록 
                hook.get_ratings(
                    start_date=self._start_date, end_date=self._end_date, batch_size=1
                )
            )
            self.log.info(
                f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
            )
            return True		# next가 성공이면 True
        except StopIteration:
            self.log.info(
                f"Didn't find any ratings for {self._start_date} "
                f"to {self._end_date}, waiting..."
            )
            return False	# 예외 발생이면(레코드가 없으면) False 반환
        finally:
            hook.close()
  • 센서를 적용한 DAG
from custom.operators import MovielensFetchRatingsOperator
from custom.sensors import MovielensRatingsSensor

with DAG(
    dag_id="04_sensor",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:
    wait_for_ratings = MovielensRatingsSensor(
        task_id="wait_for_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
    )

    fetch_ratings = MovielensFetchRatingsOperator(
        task_id="fetch_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        output_path="/data/custom_sensor/{{ds}}.json",
    )

    wait_for_ratings >> fetch_ratings

component 패키징하기

파이썬 패키지 부트스트랩 작업하기

  • setuptools 사용
$ mkdir -p airflow-movielens
$ cd airflow-movielens

$ mkdir -p src/airflow_movielens
$ touch src/airflow_movielens/__init__.py		# 패키지로 만들기 위한 파일.


# 패키지 구조

airflow-movielens
ㄴ setup.py
ㄴ src
   ㄴ airflow-movielens
      ㄴ __init__.py
	  ㄴ hook.py
	  ㄴ operators.py
	  ㄴ sensors.py

여기에 setup.py 파일을 만들어서 포함시킴

#!/usr/bin/env python

import setuptools

requirements = ["apache-airflow", "requests"]

extra_requirements = {"dev": ["pytest"]}

setuptools.setup(
    name="airflow_movielens",
    version="0.1.0",
    description="Hooks, sensors and operators for the Movielens API.",
    author="Anonymous",
    author_email="anonymous@example.com",
    install_requires=requirements,
    extras_require=extra_requirements,
    packages=setuptools.find_packages("src"),
    package_dir={"": "src"},
    url="https://github.com/example-repo/airflow_movielens",
    license="MIT license",
)

0개의 댓글