Data Pipelines with Apache Airflow 책에 나오는 내용 중 관련 종사자가 아니면 잘 보기 힘든 재밌는 데이터이기도 하면서, 기록을 위해 가져온 Pipeline이다. 포스팅할 Code는 샘플 코드에서 일부 커스텀한 Code이다.
이 샘플 코드를 통해 Airflow에서 API 데이터를 가져올 때 어떻게 Pipeline을 구성하는지 감을 잡을 수 있다.
Launch Library 2라는 곳에서는 로켓 발사, 우주 관련 Event 데이터들을 오픈 API를 통해 제공하고 있다.
Launch Library 2
https://thespacedevs.com/llapi
이 중 우리가 가져올 데이터는 예정된 10개의 로켓 발사에 대한 데이터와 로켓 이미지에 대한 URL을 가져오는 Launches 데이터이다.
Launches API Endpoint
https://ll.thespacedevs.com/2.0.0/launch/upcoming/
Architecture는 다음과 같다.
1) API로부터 JSON 데이터를 내려받는다.
2) JSON 데이터에서 Image URL을 추출한다.
3) Image URL에 접속한다.
4) Image를 내려받는다.
5) 모든 Image를 download 받으면 E-mail로 성공 메세지를 보낸다.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
import pathlib # 경로를 문자열이 아닌 객체로 처리하도록 해줌
import json
import requests
import requests.exceptions as requests_exceptions
ROCKET_LAUNCHES_URL = "https://ll.thespacedevs.com/2.0.0/launch/upcoming/"
JSON_PATH = '/tmp/launches.json'
TARGET_DIR = '/tmp/images'
def _get_pictures():
# Path() : Path 객체 생성
# mkdir() - exist_ok=True : 폴더가 없을 경우 자동으로 생성
pathlib.Path(TARGET_DIR).mkdir(parents=True, exist_ok=True)
# launches.json 파일에 있는 모든 그림 파일 download
with open(JSON_PATH) as f:
try:
launches = json.load(f)
image_urls = [launch['image'] for launch in launches['results']]
for i, image_url in enumerate(image_urls):
try:
response = requests.get(image_url)
image_filename = image_url.split('/')[-1]
target_file = f'{TARGET_DIR}/{image_filename}'
with open(target_file, 'wb') as f:
f.write(response.content)
print(f'Downloaded {image_url} to {target_file}')
except requests_exceptions.MissingSchema:
print(f'{image_url} appears to be an invalid URL.')
except requests_exceptions.ConnectionError:
print(f'Could not connect to {image_url}.')
except KeyError as e:
with open(JSON_PATH) as f:
print(json.load(f)) # ex : {'detail': 'Request was throttled. Expected available in 766 seconds.'}
raise
with DAG(
'download_rocket_launches',
start_date = airflow.utils.dates.days_ago(14),
schedule_interval = None
) as dag:
download_launches = BashOperator(
task_id = 'download_launches',
bash_command = f'curl -o {JSON_PATH} -L {ROCKET_LAUNCHES_URL}'
)
get_pictures = PythonOperator(
task_id = 'get_pictures',
python_callable = _get_pictures
)
notify = EmailOperator(
task_id = 'send_email',
to = '메일 주소',
subject = 'Rocket Launches Data Ingestion Completed.',
html_content = """
<h2>Rocket Launches Data Ingestion Completed.</h2>
<br/>
Date : {{ ds }}
"""
)
download_launches >> get_pictures >> notify
정상적으로 Images 파일들이 download된 것을 볼 수 있다.
space_launch_sy_image_20220331082429.jpeg
E-mail도 정상적으로 온 것을 확인할 수 있다.
/tmp/images 위치가 어디있을까요 ㅜ