Airflow 8장 커스텀 컴포넌트 빌드

snooby·2022년 8월 4일
1

🌌 Airflow

목록 보기
5/6
post-thumbnail

앞선 7장에서 배웠듯이 Airflow는 여러 유형의 시스템들 간의 적업을 조율할 때 쉽게 확장할 수 있습니다.

만일 Airflow가 지원하지 않는 시스템에서 태스크를 실행해야할 경우 혹은 단순 반복적인 코드여서 여러 DAG에서 재사용하기 힘들 수 있습니다.
이럴경우 어떻게 해야할까요?

커스텀 오퍼레이션을 직접 구현해 사용해야합니다.

1. 커스텀 오퍼레이션

Airflow에서 지원하지 않는 시스템의 태스크 실행이 가능하며
여러 DAG에서 공용으로 만들어서 단순화시킬 수 있습니다.

이번 7장은 자신만의 커스텀 컴포넌트를 파이썬 패키지로 패키징하여 여러 환경에 설치하거나 재사용할 때 편리하게 하는 방법을 배웁니다.

영화 평점 데이터를 가져와 평점 기반으로 영화를 추천하는 예시를 통해 알아봅시다.

이미 마련된 REST API를 통해 데이터를 가져오겠습니다.
http://localhost:5000/ratings?offset=100
http://localhost:5000/ratings?limit=1000
http://localhost:5000/ratings?start_date=2019-01-01&end_date=2019-01-02

이런식으로 데이터를 가져올 수 있습니다.
API 파라미터로 사용된 값에 대해 알아보겠습니다.

  • offset : 결과 중 몇 번째 결과부터 가져오는지
  • limit : 결과 중 한 번에 가져올 수 있는 레코드 개수
  • start_date : 시작 날짜
  • end_date : 종료 날짜

API로 데이터 수집하기

이제 이렇게 API를 사용하여 데이터를 수집하는 프로그램이 어떻게 되는지 알아봅시다.

파라미터를 환경 변수 설정으로 하여 세션과 기본 URL을 구성하였습니다.
또한 API로 정보를 가져올 수 있게끔 인증 정보를 같이 포함해야합니다.
그래서 인증 정보를 session에 포함시켰습니다.

페이지 처리

위에서 만든 _get_session 메소드로 데이터를 가져올 수 있는 API를 가지고 데이터를 가져오고 가져온 데이터를 반환하도록 하겠습니다.

  • while total is None or offset < total:
    👉 데이터를 끝까지 모두 가져오도록 합니다.
  • response_json = response.json()
    👉 가져온 데이터를 json으로 파싱합니다.
  • response_json = response.json()
    👉 결과를 반환할 때 yield from을 사용하여 각 평점 레코드들의 제너레이터를 효과적으로 전달합니다.

    왜 제너레이터를 효과적으로 전달한다는 것일까?

    while문을 써서 데이터를 모두 가져옵니다.
    데이터가 엄청엄청 많을 수도 있는데 가져온 데이터를 한번에 메모리에 올려두고 있으면 터질 수 있겠죠.
    따라서 return이 아니라 yield를 써서 함수가 끝나지 않은 상태여도 값을 바깥으로 전달하게 하여 메모리를 효율적으로 사용할 수 있습니다.
    또한 return은 실행되고 나면 함수가 끝나는데 yield를 쓰면 값 반환 후 함수를 계속해서 진행하기 때문에 사용자가 원하는 대로 데이터를 끝까지 다 가져올 수 있는 겁니다.

    파이썬의 강력한 기능인 Generator에 대해서 더 알고 싶으시면 여기를 참조해주세요.

이렇게 만든 두 기능을 하나로 합쳐 필요한 기간만큼의 데이터를 가져오도록 날짜 param을 넘겼습니다.

DAG 구축하기

1) 데이터를 가져와 json으로 저장하는 fetch_ratings 태스크

  • 데이터를 넘어온 인자 날짜 사이만큼의 데이터만 가져오고
  • 출력 디렉토리에 json 형태로 저장시킵니다.
    2) 평점 데이터로 영화 랭킹을 만드는 rank_movies 태스크
  • 영화의 평균 평점과 평점 총 개수를 계산하고
  • 최소 평점 개수 기준으로 영화를 필터링합니다.
  • 평균 평점을 기준으로 정렬
    3) 태스크를 airflow dag로 생성

💡 커스텀 오퍼레이션

원하는 태스크를 진행하기 위해 직접 python 구현을 통해 태스크를 생성하고 DAG를 구축할 수 있습니다.

2. 커스텀 훅

앞에서 원하는 테스크를 직접구현해 보았습니다.
코드가 길고 복잡합니다. 똑같은 API로 데이터를 가져오는 태스크를 다른 DAG에서 진행한다면 매번 이 코드를 생산하는 것은 비효율적입니다.
따라서, 코드를 캡슐화하고 재활용 가능한 Airflow 훅으로 만들어 보겠습니다.
이 작업으로 모든 API 전용 코드를 한 곳에 보관하고 DAG의 여러 부분에서는 이 훅을 간단하게 사용할 수 있습니다.
훅을 사용하면 Airflow의 데이터베이스와 UI를 통한 자격증명과 연결된 관리 기능을 사용할 수 있습니다. 이것을 사용하면 앞처럼 API 자격증명 정보를 수동으로 넣지 않아도 됩니다.

커스텀 훅 설계하기

  • from airflow.hooks.base_hook import BaseHook
    class MovielensHook(BaseHook):
    👉 훅은 BaseHook 클래스의 서브클래스로 생성합니다
  • def init(self, conn_id, retry=3):
    👉 훅에 필요한 다른 추가적인 인수를 지정하는 init 메서드를 정의
  • self._conn_id = conn_id
    👉 커넥션 ID는 훅에게 어떤 커넥션을 사용하는지 전달하며 이 값은 꼭 저장해야합니다.
  • init 메서드의
    self._session = None
    self._base_url = None
    👉 세션과 기본 URL을 캐싱하기위한 추가 변수 2개

get_conn 메서드 정의

Airflow 훅은 외부 시스템과의 연결 설정을 책임지는 get_conn 메서드를 정의합니다.

외부시스템 연결 메소드 get_conn를 알아보기 전에 API 연결을 위해서는 인증정보가 필요하다고 했습니다. 자격 증명 정보는 공개되어서는 안되고 안전하게 관리되어야합니다.
따라서, 자격 증명 정보를 보다 안전하고 쉽게 관리하려면 하드코딩하여 넘기는 것보다 Airflow 자격 인증 저장소에서 가져오는 것이 좋습니다.

메타스토어

이러한 자격 증명 정보를 Airflow 메타스토어에 추가합니다.
Airflow UI를 통해 추가가능하며 UI의 Admin > Connection 항목에서 작업 가능합니다.

  • if self._session is None:
    👉 세션을 생성하기 전에 연결된 세션이 있는지 체크

    처음 get_conn 함수가 호출될 때는 session이 None이므로 메타스토어 연결 세부 사항을 가져와서 기본 URL과 세션을 인스턴스 내부에 저장합니다.
    이후에 호출될 때는 session과 base_url을 캐싱하여 캐싱된 값이 반환됩니다.

  • config = self.get_connection(self._conn_id)
    👉 주어진 커넥션 id를 가지고 커넥션 설정 정보 가져옴.

  • schema = config.schema or self.DEFAULT_SCHEMA
    👉 커넥션 정보와 기본값을 사용하여 기본 URL 구성

  • self._session = requests.Session()
    self._session.auth = (config.login, config.password)
    👉 커넥션 설정정보의 login/pwd 정보를 사용하여 요청 세션 생성

커스텀 훅 사용하기

이렇게 생성한 데이터를 가져오는 단계를 캡슐화한 훅을 사용하면 DAG를 더 간결하게 만들 수 있습니다.

커스텀 훅을 임포트하고 hook에 정의된 메소드 기능을 사용하면 원하는 기능이 잘 수행하게됩니다.

  • from custom.hooks import MovielensHook
    👉 커스텀 훅은 custom 패키지 안에 생성하고 위와 같이 불러서 사용가능합니다.
  • PythonOperator(
    task_id="fetch_ratings",
    python_callable=_fetch_ratings,
    👉 DAG에서 훅을 사용하기 위해서는 훅 호출 코드를 PythonOperator에 래핑해야합니다.

💡 커스텀 훅

커스텀 훅을 사용하여 필요한 로직을 단일 클래스에 캡슐화하여 제공하였습니다.

3. 커스텀 오퍼레이터 빌드하기

DAG의 복잡한 부분을 훅으로 많이 캡슐화하고 간결화시켰지만 여전히 시작/종료일, 데이터 파일 저장 등 반복적(수작업)으로 코드를 작성해줘야하는 부분이 있습니다. 🥲
이는 커스텀 오퍼레이터를 직접 구현하여 코드 반복(중복)을 최소화할 수 있습니다.

커스텀 오퍼레이터 사용하기

  • from airflow.models import BaseOperator
    class MovielensFetchRatingsOperator(BaseOperator):
    👉 Airflow의 모든 오퍼레이터는 BaseOperator 클래스의 서브 클래스로 만듭니다.
  • super(MovielensFetchRatingsOperator, self).init(kwargs)
    👉 오퍼레이터의 일반적인 동작을 정의하는 제네릭 인수들은 많습니다. 가령 task_id, 스케줄 관련 retries, retry_delay 등 이러한 인수를 나열하지 않고
    kwargs 구문으로 전달합니다.

👉 default_args를 사용하면 특정 인수를 전체 DAG의 기본 인수로 정의할 수 있습니다.

  • @apply_defaults
    def init(
    👉 기본 인수들이 정상적으로 적용되었는지 확인하기 위해 apply_defaults라는 데코레이터를 사용할 수 있습니다. init 메소드에서 적용되며 커스텀 오퍼레이터를 정의할 때 apply_defaults를 항상 포함해야합니다.

    👉 execute 메서드는 context라는 하나의 파라미터만 받고 이 파라미터는 Airflow의 모든 콘텍스트 변수를 담고있는 dict 객체입니다.

여기서 만든 커스텀 오퍼레이터는 1. 커스텀 오퍼레이션에서의 데이터 추출 & 영화 평점 랭킹 두개의 함수를 모두 객체화 한것으로 이번에 만든 커스텀 오퍼레이터를 훅으로 적용하면 한번의 적용으로 모든 기능이 수행되게 됩니다.

따라서 DAG 생성을 보면 다음처럼 커스텀 오퍼레이터를 불러와 필요한 인자만 넘겨주고 넘겨받은 인자를 가지고 커스텀 오퍼레이터가 데이터를 추출하고 영화 평점 랭킹까지 모두 하게되는 것입니다.

템플릿 오퍼레이터 변수

DAG에서 start_date, end_date, output_path에 {{ds}} 이런식의 윈도우 시작 값을 사용하는 인자가 있습니다.
이렇게 사용한 이유가 무엇일까요?

우리가 커스텀 오퍼레이터를 쓰고싶은 이유는 날짜, 데이터명 등 매번 바뀌는 값을 하드코딩이 아닌 인자형식으로 변환하고자 했던 것입니다.

거기까지는 저희가 커스텀 오퍼레이터를 만들 때 kwargs 등 인자로 받아 처리하도록 해두었습니다.
그런데 DAG를 실행할 때마다 인자를 넣어줘야한다는 것 또한 반복 작업입니다.
따라서
날짜(윈도우 날짜값)을 사용하여 날짜 기준 컷이나 데이터명에 날짜를 사용하여 데이터 고유성, 기존 데이터에 대한 침범을 막을 수 있고 반복작업을 줄일 수 있습니다.**

💡 커스텀 오퍼레이터

앞선 방식처럼 날짜, 데이터 등을 하드코딩하는 것이 아닌 인자로 변환하여 재사용성을 높였습니다.

4. 커스텀 센서 빌드하기

앞 단원에서 배운 센서라는 기능을 특정 상황(조건)에 적용한 것이 커스텀 센서입니다.

기억이 안나거나 보지 않은 분을 위해 센서를 간략히 설명하고 넘어가겠습니다.

센서

DAG 안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하기 위해 사용

커스텀 센서 생성하기

  • from airflow.sensors.base import BaseSensorOperator
    class MovielensRatingsSensor(BaseSensorOperator):
    👉 BaseSensorOperator 클래스를 상속합니다.
  • def poke(self, context):
    👉 커스텀 센서를 생성하기 위해서는 execute 메서드를 사용했던 앞과 다르게 poke를 구현해야합니다.
    poke는 execute와 다르게 불린값을 반환합니다. 당연한 거겠죠. 센서는 특정 태스크가 완료되었는지 여부를 체크하니깐 True/False겠죠. False이면 몇 초 정도 대기 상태로 들어갑니다.
    👉 앞서 만든 MovielensHook 훅을 사용하여 레코드가 하나라도 존재하는 지 확인합니다.
    존재하면 True 아니면 False를 반혼하는데 제너레이터가 빈값일 경우에는 오류가 발생합니다.
    이러한 예외 상황을 체크하고자 try/except 문을 넣어줍니다.

DAG 생성


이렇게 생성한 커스텀 센서를 적용하면 센서 기능을 앞선 훅을 사용해 간결하게 작성할 수 있습니다.

5. 컴포넌트 패키징하기

지금까지는 커스텀 컴포넌트를 생성하면 DAG 티렉토리 내에 있는 서브 패키지까지 포함했습니다.
이는 사실 여러 사람과 프로젝트를 공유할 경우에는 올바른 경우가 아닙니다.

컴포넌트를 배포하는 나은 방법은 파이썬 패키지에 코드를 넣는 것입니다.

컴포넌트 패키징 장점

  • DAG와는 별도로 코드를 유지함으로써 커스텀 코드에 대한 CI/CD 프로세스를 구성 가능
  • 다른 사람과 이 코드를 더 쉽게 공유하고 협업

파이썬 패키지 부트스트랩 작업하기

다음처럼 소스파일을 준비한 후

패키지를 설치합니다.

python -m pip install ./airflow-movielens


설치가 잘 되었습니다.

패키지를 배포하는 방법은 2가지입니다.
1. 깃헙 레포에서 직접 설치

python -m pip install git+htpps://github.com/..
  1. pip 패키지 피드
python -m pip install airflow_movielens
profile
데이터를 가치있게 다루고 싶은 개발자 🐥

0개의 댓글