Airflow Pipeline 만들기 - API를 활용한 Image 다운로드

bradley·2022년 8월 29일
1

Airflow

목록 보기
8/16

개요


Data Pipelines with Apache Airflow 책에 나오는 내용 중 관련 종사자가 아니면 잘 보기 힘든 재밌는 데이터이기도 하면서, 기록을 위해 가져온 Pipeline이다. 포스팅할 Code는 샘플 코드에서 일부 커스텀한 Code이다.
이 샘플 코드를 통해 Airflow에서 API 데이터를 가져올 때 어떻게 Pipeline을 구성하는지 감을 잡을 수 있다.

데이터 소개


Launch Library 2라는 곳에서는 로켓 발사, 우주 관련 Event 데이터들을 오픈 API를 통해 제공하고 있다.

Launch Library 2
https://thespacedevs.com/llapi

이 중 우리가 가져올 데이터는 예정된 10개의 로켓 발사에 대한 데이터와 로켓 이미지에 대한 URL을 가져오는 Launches 데이터이다.

Launches API Endpoint
https://ll.thespacedevs.com/2.0.0/launch/upcoming/

Architecture


Architecture는 다음과 같다.

1) API로부터 JSON 데이터를 내려받는다.
2) JSON 데이터에서 Image URL을 추출한다.
3) Image URL에 접속한다.
4) Image를 내려받는다.
5) 모든 Image를 download 받으면 E-mail로 성공 메세지를 보낸다.

DAG 만들기


import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

import pathlib # 경로를 문자열이 아닌 객체로 처리하도록 해줌
import json
import requests
import requests.exceptions as requests_exceptions

ROCKET_LAUNCHES_URL = "https://ll.thespacedevs.com/2.0.0/launch/upcoming/"
JSON_PATH = '/tmp/launches.json'
TARGET_DIR = '/tmp/images'

def _get_pictures():
    # Path() : Path 객체 생성
    # mkdir() - exist_ok=True : 폴더가 없을 경우 자동으로 생성 
    pathlib.Path(TARGET_DIR).mkdir(parents=True, exist_ok=True)

    # launches.json 파일에 있는 모든 그림 파일 download
    with open(JSON_PATH) as f:
        try:
            launches = json.load(f)
            image_urls = [launch['image'] for launch in launches['results']]

            for i, image_url in enumerate(image_urls):
                try:
                    response = requests.get(image_url)
                    image_filename = image_url.split('/')[-1]
                    target_file = f'{TARGET_DIR}/{image_filename}'

                    with open(target_file, 'wb') as f:
                        f.write(response.content)
                    
                    print(f'Downloaded {image_url} to {target_file}')
                except requests_exceptions.MissingSchema: 
                    print(f'{image_url} appears to be an invalid URL.')
                except requests_exceptions.ConnectionError:
                    print(f'Could not connect to {image_url}.')
        except KeyError as e:
            with open(JSON_PATH) as f:
                print(json.load(f)) # ex : {'detail': 'Request was throttled. Expected available in 766 seconds.'}
            raise


with DAG(
    'download_rocket_launches',
    start_date = airflow.utils.dates.days_ago(14),
    schedule_interval = None
) as dag:
    
    download_launches = BashOperator(
        task_id = 'download_launches',
        bash_command = f'curl -o {JSON_PATH} -L {ROCKET_LAUNCHES_URL}'
    )

    get_pictures = PythonOperator(
        task_id = 'get_pictures',
        python_callable = _get_pictures
    )

    notify = EmailOperator(
        task_id = 'send_email',
        to = '메일 주소',
        subject = 'Rocket Launches Data Ingestion Completed.',
        html_content = """
            <h2>Rocket Launches Data Ingestion Completed.</h2>
            <br/>
            Date : {{ ds }}
        """
    )

    download_launches >> get_pictures >> notify

DAG 구조


실행 결과


정상적으로 Images 파일들이 download된 것을 볼 수 있다.

space_launch_sy_image_20220331082429.jpeg

E-mail도 정상적으로 온 것을 확인할 수 있다.

profile
데이터 엔지니어링에 관심이 많은 홀로 삽질하는 느림보

1개의 댓글

comment-user-thumbnail
2023년 5월 24일

/tmp/images 위치가 어디있을까요 ㅜ

답글 달기