_get_session_get_with_pagination_get_ratings 함수 결합_get_ratings 함수를 PythonOperator로 호출하여 스케줄 간격마다 평점데이터를 가져올수 있음templates_dict에 넣어 전달하면 context variable 을 dict에서 참조import datetime as dt
import logging
import json
import os
import pandas as pd
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from custom.ranking import rank_movies_by_rating
MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5000")
MOVIELENS_USER = os.environ["MOVIELENS_USER"]
MOVIELENS_PASSWORD = os.environ["MOVIELENS_PASSWORD"]
# start_date부터 end_date 사이의 평점 데이터를 가져옴.
# 한번에 100개 씩
def _get_ratings(start_date, end_date, batch_size=100):
session, base_url = _get_session()
yield from _get_with_pagination(
session=session,
url=base_url + "/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
# 평점 데이터를 가져올 서버의 세션 생성
def _get_session():
session = requests.Session()
session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)
schema = MOVIELENS_SCHEMA
host = MOVIELENS_HOST
port = MOVIELENS_PORT
base_url = f"{schema}://{host}:{port}"
return session, base_url
# 주어진 세션, URL, 파라미터를 설정하여 배치 사이즈에 맞게 평점데이터 요청.
def _get_with_pagination(session, url, params, batch_size=100):
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
with DAG(
dag_id="01_python",
description="Fetches ratings from the Movielens API using the Python Operator.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
# 평점 데이터 다운
def _fetch_ratings(templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
ratings = list(
_get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_)
fetch_ratings = PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/data/python/ratings/{{ds}}.json",
},
)
# 평점 데이터로 영화 추천
def _rank_movies(templates_dict, min_ratings=2, **_):
input_path = templates_dict["input_path"]
output_path = templates_dict["output_path"]
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
ranking.to_csv(output_path, index=True)
rank_movies = PythonOperator(
task_id="rank_movies",
python_callable=_rank_movies,
templates_dict={
"input_path": "/data/python/ratings/{{ds}}.json",
"output_path": "/data/python/rankings/{{ds}}.csv",
},
)
fetch_ratings >> rank_movies
BaseHook 클래스의 서브클래스로 생성__init__ 메서드를 정의from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
def __init__(self, conn_id): # 훅에게 어떤 connection 을 사용하는지 전달.
super().__init__()
self._conn_id = conn_id
config = self.get_connection(self._conn_id)
class MovielensHook(BaseHook):
def __init__(self, conn_id):
super().__init__()
self._conn_id = conn_id
# 훅에서 사용할 커넥션 생성
def get_conn(self):
if self._session is None:
# BaseHook 클래스에서 제공하는 메소드.
config = self.get_connection(self._conn_id)
if not config.host:
raise ValueError(f"No host specified in connection {self._conn_id}")
schema = config.schema or self.DEFAULT_SCHEMA
port = config.port or self.DEFAULT_PORT
self._base_url = f"{schema}://{config.host}:{port}"
self._session = requests.Session()
# 로그인 계정이 있으면
if config.login:
self._session.auth = (config.login, config.password)
return self._session, self._base_url
def get_ratings(self, start_date=None, end_date=None, batch_size=100):
yield from self._get_with_pagination(
endpoint="/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
def _get_with_pagination(self, endpoint, params, batch_size=100):
session, base_url = self.get_conn()
url = base_url + endpoint
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
chapter08
ㄴdags
ㄴ custom
ㄴ __init__.py
ㄴ hooks.py
ㄴ01_python.py
ㄴ02.hook.py
ㄴ docker-compose.yml
ㄴ ...
아래와 같이 호출
from custom.hooks import MovielensHook
hook = MovielensHook(conn_id=conn_id)
ratins = hook.get_ratings(
start_date=start_date,
end_date=end_date,
batch_size=batch_size
)
DAG에서 MovielensHook 사용하기
from custom.hooks import MovielensHook
with DAG(
dag_id="02_hook",
description="Fetches ratings from the Movielens API using a custom hook.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
hook = MovielensHook(conn_id=conn_id)
ratings = list(
hook.get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_)
PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
op_kwargs={"conn_id": "movielens"}, # 사용할 커넥션 지정.
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/data/custom_hook/{{ds}}.json",
},
)
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensFetchRatingsOperator(BaseOperator):
@apply_defaults # 기본 DAG 인수를 커스텀 오퍼레이터에게 전달하기 위한 데커레이터
def __init__(self,conn_id,**kwargs): # BaseOperator 생성자에게 추가 키워드 인수를 전달
super.__init__(**kwargs)
self._conn_id = conn_id
# 오퍼레이터에 기본 인수 적용 (default_args 기본 인수를 사용하여 정의)
default_args = {
"retires": 1,
"retry_delay": timedelta(minutes=5)
}
with DAG(
...
default_args=default_args
) as dag:
MyCustomOperator(
...
)
def execute(self, context): # 커스텀 오퍼레이터를 실행할때 호출되는 메인 메서드(context는 dict객체)
...
class MovielensFetchRatingsOperator(BaseOperator):
template_fields = ("_start_date", "_end_date", "_output_path")
@apply_defaults
def __init__(
self,
conn_id,
output_path,
start_date="{{ds}}",
end_date="{{next_ds}}",
batch_size=1000,
**kwargs,
):
super(MovielensFetchRatingsOperator, self).__init__(**kwargs)
self._conn_id = conn_id
self._output_path = output_path
self._start_date = start_date
self._end_date = end_date
self._batch_size = batch_size
def execute(self, context):
hook = MovielensHook(self._conn_id)
try:
self.log.info(
f"Fetching ratings for {self._start_date} to {self._end_date}"
)
ratings = list(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
batch_size=self._batch_size,
)
)
self.log.info(f"Fetched {len(ratings)} ratings")
finally:
hook.close()
self.log.info(f"Writing ratings to {self._output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(self._output_path)
os.makedirs(output_dir, exist_ok=True)
# Write output as JSON.
with open(self._output_path, "w") as file_:
json.dump(ratings, fp=file_)
from custom.operators import MovielensFetchRatingsOperator
with DAG(
dag_id="03_operator",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
fetch_ratings = MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/data/custom_operator/{{ds}}.json",
)
poke 메서드를 구현from airflow.sensors.base import BaseSensorOperator
class MyCustomSensor(BaseSensorOperator):
def poke(self, context):
...
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import MovielensHook
class MovielensRatingsSensor(BaseSensorOperator):
template_fields = ("_start_date", "_end_date")
@apply_defaults
def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
def poke(self, context):
hook = MovielensHook(self._conn_id)
try:
next( # 첫번째 레코드 가져오도록
hook.get_ratings(
start_date=self._start_date, end_date=self._end_date, batch_size=1
)
)
self.log.info(
f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
)
return True # next가 성공이면 True
except StopIteration:
self.log.info(
f"Didn't find any ratings for {self._start_date} "
f"to {self._end_date}, waiting..."
)
return False # 예외 발생이면(레코드가 없으면) False 반환
finally:
hook.close()
from custom.operators import MovielensFetchRatingsOperator
from custom.sensors import MovielensRatingsSensor
with DAG(
dag_id="04_sensor",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
wait_for_ratings = MovielensRatingsSensor(
task_id="wait_for_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
)
fetch_ratings = MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/data/custom_sensor/{{ds}}.json",
)
wait_for_ratings >> fetch_ratings
$ mkdir -p airflow-movielens
$ cd airflow-movielens
$ mkdir -p src/airflow_movielens
$ touch src/airflow_movielens/__init__.py # 패키지로 만들기 위한 파일.
# 패키지 구조
airflow-movielens
ㄴ setup.py
ㄴ src
ㄴ airflow-movielens
ㄴ __init__.py
ㄴ hook.py
ㄴ operators.py
ㄴ sensors.py
여기에 setup.py 파일을 만들어서 포함시킴
#!/usr/bin/env python
import setuptools
requirements = ["apache-airflow", "requests"]
extra_requirements = {"dev": ["pytest"]}
setuptools.setup(
name="airflow_movielens",
version="0.1.0",
description="Hooks, sensors and operators for the Movielens API.",
author="Anonymous",
author_email="anonymous@example.com",
install_requires=requirements,
extras_require=extra_requirements,
packages=setuptools.find_packages("src"),
package_dir={"": "src"},
url="https://github.com/example-repo/airflow_movielens",
license="MIT license",
)