Airflow Pipeline 만들기 - API를 활용한 Image 다운로드

bradley·2022년 8월 29일


Data Pipelines with Apache Airflow 책에 나오는 내용 중 관련 종사자가 아니면 잘 보기 힘든 재밌는 데이터이기도 하면서, 기록을 위해 가져온 Pipeline이다. 포스팅할 Code는 샘플 코드에서 일부 커스텀한 Code이다.
이 샘플 코드를 통해 Airflow에서 API 데이터를 가져올 때 어떻게 Pipeline을 구성하는지 감을 잡을 수 있다.

데이터 소개

Launch Library 2라는 곳에서는 로켓 발사, 우주 관련 Event 데이터들을 오픈 API를 통해 제공하고 있다.

Launch Library 2

이 중 우리가 가져올 데이터는 예정된 10개의 로켓 발사에 대한 데이터와 로켓 이미지에 대한 URL을 가져오는 Launches 데이터이다.

Launches API Endpoint


Architecture는 다음과 같다.

1) API로부터 JSON 데이터를 내려받는다.
2) JSON 데이터에서 Image URL을 추출한다.
3) Image URL에 접속한다.
4) Image를 내려받는다.
5) 모든 Image를 download 받으면 E-mail로 성공 메세지를 보낸다.

DAG 만들기

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from import EmailOperator

import pathlib # 경로를 문자열이 아닌 객체로 처리하도록 해줌
import json
import requests
import requests.exceptions as requests_exceptions

JSON_PATH = '/tmp/launches.json'
TARGET_DIR = '/tmp/images'

def _get_pictures():
    # Path() : Path 객체 생성
    # mkdir() - exist_ok=True : 폴더가 없을 경우 자동으로 생성 
    pathlib.Path(TARGET_DIR).mkdir(parents=True, exist_ok=True)

    # launches.json 파일에 있는 모든 그림 파일 download
    with open(JSON_PATH) as f:
            launches = json.load(f)
            image_urls = [launch['image'] for launch in launches['results']]

            for i, image_url in enumerate(image_urls):
                    response = requests.get(image_url)
                    image_filename = image_url.split('/')[-1]
                    target_file = f'{TARGET_DIR}/{image_filename}'

                    with open(target_file, 'wb') as f:
                    print(f'Downloaded {image_url} to {target_file}')
                except requests_exceptions.MissingSchema: 
                    print(f'{image_url} appears to be an invalid URL.')
                except requests_exceptions.ConnectionError:
                    print(f'Could not connect to {image_url}.')
        except KeyError as e:
            with open(JSON_PATH) as f:
                print(json.load(f)) # ex : {'detail': 'Request was throttled. Expected available in 766 seconds.'}

with DAG(
    start_date = airflow.utils.dates.days_ago(14),
    schedule_interval = None
) as dag:
    download_launches = BashOperator(
        task_id = 'download_launches',
        bash_command = f'curl -o {JSON_PATH} -L {ROCKET_LAUNCHES_URL}'

    get_pictures = PythonOperator(
        task_id = 'get_pictures',
        python_callable = _get_pictures

    notify = EmailOperator(
        task_id = 'send_email',
        to = '메일 주소',
        subject = 'Rocket Launches Data Ingestion Completed.',
        html_content = """
            <h2>Rocket Launches Data Ingestion Completed.</h2>
            Date : {{ ds }}

    download_launches >> get_pictures >> notify

DAG 구조

실행 결과

정상적으로 Images 파일들이 download된 것을 볼 수 있다.


E-mail도 정상적으로 온 것을 확인할 수 있다.

