앞선 7장에서 배웠듯이 Airflow는 여러 유형의 시스템들 간의 적업을 조율할 때 쉽게 확장할 수 있습니다.
만일 Airflow가 지원하지 않는 시스템에서 태스크를 실행해야할 경우 혹은 단순 반복적인 코드여서 여러 DAG에서 재사용하기 힘들 수 있습니다.
이럴경우 어떻게 해야할까요?
커스텀 오퍼레이션을 직접 구현해 사용해야합니다.
Airflow에서 지원하지 않는 시스템의 태스크 실행이 가능하며
여러 DAG에서 공용으로 만들어서 단순화시킬 수 있습니다.
이번 7장은 자신만의 커스텀 컴포넌트를 파이썬 패키지로 패키징하여 여러 환경에 설치하거나 재사용할 때 편리하게 하는 방법을 배웁니다.
영화 평점 데이터를 가져와 평점 기반으로 영화를 추천하는 예시를 통해 알아봅시다.
이미 마련된 REST API를 통해 데이터를 가져오겠습니다.
http://localhost:5000/ratings?offset=100
http://localhost:5000/ratings?limit=1000
http://localhost:5000/ratings?start_date=2019-01-01&end_date=2019-01-02
이런식으로 데이터를 가져올 수 있습니다.
API 파라미터로 사용된 값에 대해 알아보겠습니다.
- offset : 결과 중 몇 번째 결과부터 가져오는지
- limit : 결과 중 한 번에 가져올 수 있는 레코드 개수
- start_date : 시작 날짜
- end_date : 종료 날짜
이제 이렇게 API를 사용하여 데이터를 수집하는 프로그램이 어떻게 되는지 알아봅시다.
파라미터를 환경 변수 설정으로 하여 세션과 기본 URL을 구성하였습니다.
또한 API로 정보를 가져올 수 있게끔 인증 정보를 같이 포함해야합니다.
그래서 인증 정보를 session에 포함시켰습니다.
위에서 만든 _get_session 메소드로 데이터를 가져올 수 있는 API를 가지고 데이터를 가져오고 가져온 데이터를 반환하도록 하겠습니다.
왜 제너레이터를 효과적으로 전달한다는 것일까?
while문을 써서 데이터를 모두 가져옵니다.
데이터가 엄청엄청 많을 수도 있는데 가져온 데이터를 한번에 메모리에 올려두고 있으면 터질 수 있겠죠.
따라서 return이 아니라 yield를 써서 함수가 끝나지 않은 상태여도 값을 바깥으로 전달하게 하여 메모리를 효율적으로 사용할 수 있습니다.
또한 return은 실행되고 나면 함수가 끝나는데 yield를 쓰면 값 반환 후 함수를 계속해서 진행하기 때문에 사용자가 원하는 대로 데이터를 끝까지 다 가져올 수 있는 겁니다.파이썬의 강력한 기능인 Generator에 대해서 더 알고 싶으시면 여기를 참조해주세요.
이렇게 만든 두 기능을 하나로 합쳐 필요한 기간만큼의 데이터를 가져오도록 날짜 param을 넘겼습니다.
1) 데이터를 가져와 json으로 저장하는 fetch_ratings 태스크
💡 커스텀 오퍼레이션
원하는 태스크를 진행하기 위해 직접 python 구현을 통해 태스크를 생성하고 DAG를 구축할 수 있습니다.
앞에서 원하는 테스크를 직접구현해 보았습니다.
코드가 길고 복잡합니다. 똑같은 API로 데이터를 가져오는 태스크를 다른 DAG에서 진행한다면 매번 이 코드를 생산하는 것은 비효율적입니다.
따라서, 코드를 캡슐화하고 재활용 가능한 Airflow 훅으로 만들어 보겠습니다.
이 작업으로 모든 API 전용 코드를 한 곳에 보관하고 DAG의 여러 부분에서는 이 훅을 간단하게 사용할 수 있습니다.
훅을 사용하면 Airflow의 데이터베이스와 UI를 통한 자격증명과 연결된 관리 기능을 사용할 수 있습니다. 이것을 사용하면 앞처럼 API 자격증명 정보를 수동으로 넣지 않아도 됩니다.
Airflow 훅은 외부 시스템과의 연결 설정을 책임지는 get_conn 메서드를 정의합니다.
외부시스템 연결 메소드 get_conn를 알아보기 전에 API 연결을 위해서는 인증정보가 필요하다고 했습니다. 자격 증명 정보는 공개되어서는 안되고 안전하게 관리되어야합니다.
따라서, 자격 증명 정보를 보다 안전하고 쉽게 관리하려면 하드코딩하여 넘기는 것보다 Airflow 자격 인증 저장소에서 가져오는 것이 좋습니다.
메타스토어
이러한 자격 증명 정보를 Airflow 메타스토어에 추가합니다.
Airflow UI를 통해 추가가능하며 UI의 Admin > Connection 항목에서 작업 가능합니다.
if self._session is None:
👉 세션을 생성하기 전에 연결된 세션이 있는지 체크
처음 get_conn 함수가 호출될 때는 session이 None이므로 메타스토어 연결 세부 사항을 가져와서 기본 URL과 세션을 인스턴스 내부에 저장합니다.
이후에 호출될 때는 session과 base_url을 캐싱하여 캐싱된 값이 반환됩니다.
config = self.get_connection(self._conn_id)
👉 주어진 커넥션 id를 가지고 커넥션 설정 정보 가져옴.
schema = config.schema or self.DEFAULT_SCHEMA
👉 커넥션 정보와 기본값을 사용하여 기본 URL 구성
self._session = requests.Session()
self._session.auth = (config.login, config.password)
👉 커넥션 설정정보의 login/pwd 정보를 사용하여 요청 세션 생성
이렇게 생성한 데이터를 가져오는 단계를 캡슐화한 훅을 사용하면 DAG를 더 간결하게 만들 수 있습니다.
커스텀 훅을 임포트하고 hook에 정의된 메소드 기능을 사용하면 원하는 기능이 잘 수행하게됩니다.
💡 커스텀 훅
커스텀 훅을 사용하여 필요한 로직을 단일 클래스에 캡슐화하여 제공하였습니다.
DAG의 복잡한 부분을 훅으로 많이 캡슐화하고 간결화시켰지만 여전히 시작/종료일, 데이터 파일 저장 등 반복적(수작업)으로 코드를 작성해줘야하는 부분이 있습니다. 🥲
이는 커스텀 오퍼레이터를 직접 구현하여 코드 반복(중복)을 최소화할 수 있습니다.
👉 default_args를 사용하면 특정 인수를 전체 DAG의 기본 인수로 정의할 수 있습니다.
여기서 만든 커스텀 오퍼레이터는 1. 커스텀 오퍼레이션에서의 데이터 추출 & 영화 평점 랭킹 두개의 함수를 모두 객체화 한것으로 이번에 만든 커스텀 오퍼레이터를 훅으로 적용하면 한번의 적용으로 모든 기능이 수행되게 됩니다.
따라서 DAG 생성을 보면 다음처럼 커스텀 오퍼레이터를 불러와 필요한 인자만 넘겨주고 넘겨받은 인자를 가지고 커스텀 오퍼레이터가 데이터를 추출하고 영화 평점 랭킹까지 모두 하게되는 것입니다.
DAG에서 start_date, end_date, output_path에 {{ds}} 이런식의 윈도우 시작 값을 사용하는 인자가 있습니다.
이렇게 사용한 이유가 무엇일까요?
우리가 커스텀 오퍼레이터를 쓰고싶은 이유는 날짜, 데이터명 등 매번 바뀌는 값을 하드코딩이 아닌 인자형식으로 변환하고자 했던 것입니다.
거기까지는 저희가 커스텀 오퍼레이터를 만들 때 kwargs 등 인자로 받아 처리하도록 해두었습니다.
그런데 DAG를 실행할 때마다 인자를 넣어줘야한다는 것 또한 반복 작업입니다.
따라서 날짜(윈도우 날짜값)을 사용하여 날짜 기준 컷이나 데이터명에 날짜를 사용하여 데이터 고유성, 기존 데이터에 대한 침범을 막을 수 있고 반복작업을 줄일 수 있습니다.**
💡 커스텀 오퍼레이터
앞선 방식처럼 날짜, 데이터 등을 하드코딩하는 것이 아닌 인자로 변환하여 재사용성을 높였습니다.
앞 단원에서 배운 센서라는 기능을 특정 상황(조건)에 적용한 것이 커스텀 센서입니다.
기억이 안나거나 보지 않은 분을 위해 센서를 간략히 설명하고 넘어가겠습니다.
센서
DAG 안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하기 위해 사용
이렇게 생성한 커스텀 센서를 적용하면 센서 기능을 앞선 훅을 사용해 간결하게 작성할 수 있습니다.
지금까지는 커스텀 컴포넌트를 생성하면 DAG 티렉토리 내에 있는 서브 패키지까지 포함했습니다.
이는 사실 여러 사람과 프로젝트를 공유할 경우에는 올바른 경우가 아닙니다.
컴포넌트를 배포하는 나은 방법은 파이썬 패키지에 코드를 넣는 것입니다.
파이썬 패키지 부트스트랩 작업하기
다음처럼 소스파일을 준비한 후
패키지를 설치합니다.python -m pip install ./airflow-movielens
설치가 잘 되었습니다.패키지를 배포하는 방법은 2가지입니다.
1. 깃헙 레포에서 직접 설치python -m pip install git+htpps://github.com/..
- pip 패키지 피드
python -m pip install airflow_movielens