Apache Airflow는 데이터 작업을 자동화하는 도구입니다. 데이터를 수집하고, 처리하고, 저장하는 등의 작업을 쉽게 작성하고, 예약하고, 모니터링할 수 있게 해줍니다. 주로 데이터 엔지니어들이 사용하는데, 데이터를 가져와서 변환하고 저장하는 ETL 작업에 많이 사용됩니다.
매일 밤 12시에 데이터를 처리해야 하는 파이프라인이 있다고 가정해봅시다. 이 파이프라인은 다음과 같은 작업(Task)들로 구성됩니다:
이 작업들은 순서대로 실행되어야 하고, 각각 외부 시스템과 상호작용이 필요합니다.
외부 API를 통해 데이터를 가져옵니다는 다음과 같은 의미입니다:
따라서 "외부 API를 통해 데이터를 가져옵니다"는 의미는, 우리가 필요한 데이터를 제공하는 다른 서비스에 API를 통해 요청을 보내고, 그 응답으로 데이터를 받아오는 과정을 말합니다.
예시:
날씨 데이터를 다운로드해야 한다고 가정해봅시다. 이를 위해 특정 날씨 데이터 제공 서비스의 API를 사용한다고 할 때, 다음과 같은 과정을 거칩니다:
import requests
url = "https://api.weather.com/v3/weather/forecast"
params = {
"apiKey": "your_api_key",
"geocode": "37.7749,-122.4194", # 위도와 경도
"format": "json"
}
response = requests.get(url, params=params)
weather_data = response.json()
위 코드에서 requests
라이브러리를 사용하여 API에 요청을 보내고, 응답으로 받은 JSON 형식의 데이터를 weather_data
에 저장합니다.
데이터를 변환하기 위해 Spark라는 데이터 처리 도구를 사용합니다는 다음과 같은 의미입니다:
예시:
날씨 데이터를 다운로드한 후, Spark를 사용하여 데이터를 변환한다고 가정해봅시다. 변환 작업은 예를 들어, 섭씨 온도를 화씨로 변환하거나, 날짜 형식을 통일하는 등의 작업일 수 있습니다.
from pyspark.sql import SparkSession
# Spark 세션 생성
spark = SparkSession.builder.appName("WeatherDataProcessing").getOrCreate()
# JSON 파일에서 데이터 읽기
weather_df = spark.read.json("path/to/weather_data.json")
# 데이터 변환: 섭씨를 화씨로 변환
weather_df = weather_df.withColumn("temperature_F", weather_df["temperature_C"] * 9 / 5 + 32)
# 변환된 데이터를 저장
weather_df.write.parquet("path/to/processed_weather_data.parquet")
spark.stop()
위 코드에서 Spark를 사용하여 JSON 형식의 날씨 데이터를 읽어오고, 섭씨 온도를 화씨로 변환한 후, 변환된 데이터를 파케이 형식으로 저장합니다.
변환된 데이터를 데이터베이스(DB)에 저장합니다는 다음과 같은 의미입니다:
예시:
변환된 날씨 데이터를 데이터베이스에 저장한다고 가정해봅시다. 이를 위해 변환된 데이터를 데이터베이스에 삽입하거나 업데이트할 수 있습니다.
import sqlite3
import pandas as pd
# SQLite 데이터베이스 연결
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()
# 변환된 데이터 읽기
processed_weather_df = pd.read_parquet("path/to/processed_weather_data.parquet")
# 데이터 삽입/업데이트
for index, row in processed_weather_df.iterrows():
cursor.execute("""
INSERT INTO weather (date, location, temperature_F)
VALUES (?, ?, ?)
ON CONFLICT(date, location) DO UPDATE SET
temperature_F = excluded.temperature_F
""", (row['date'], row['location'], row['temperature_F']))
# 변경 사항 저장 및 연결 종료
conn.commit()
conn.close()
위 코드에서 SQLite 데이터베이스에 연결한 후, 변환된 데이터를 읽어와서 각 행을 데이터베이스에 삽입하거나 업데이트합니다. ON CONFLICT
절을 사용하여 이미 존재하는 데이터의 경우 업데이트를 수행합니다.
DAG (Directed Acyclic Graph)
DAG는 방향성을 가지며 순환하지 않는 그래프를 의미합니다. Airflow에서 DAG는 워크플로우의 기본 구성 요소로, 각 작업(Task)들이 실행되는 순서와 의존성을 정의합니다.
예를 들어, 데이터 다운로드, 처리, 저장의 순서가 명확히 정의된 파이프라인을 생각해보세요. 각 작업이 차례로 실행되며, 오류가 발생하지 않도록 보장됩니다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def download_data():
# 데이터를 다운로드하는 코드
pass
def process_data():
# 다운로드한 데이터를 처리하는 코드
pass
def store_data():
# 처리된 데이터를 저장하는 코드
pass
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'data_pipeline_example',
default_args=default_args,
description='A simple data pipeline example',
schedule_interval='@daily',
)
t1 = PythonOperator(
task_id='download_data',
python_callable=download_data,
dag=dag,
)
t2 = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
t3 = PythonOperator(
task_id='store_data',
python_callable=store_data,
dag=dag,
)
t1 >> t2 >> t3 # 작업의 순서를 정의
위 코드에서 dag
객체는 전체 워크플로우의 구조를 정의하며, t1
, t2
, t3
작업은 각각 데이터를 다운로드, 처리, 저장하는 역할을 합니다. t1 >> t2 >> t3
는 작업들이 순서대로 실행되도록 의존성을 정의합니다.
Action Operator: 실제 작업을 실행하는 Operator입니다.
예시: Bash 명령어 실행, Python 코드 실행 등.
from airflow.operators.bash_operator import BashOperator
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
이 코드에서 BashOperator는 Bash 명령어를 실행하는 Task를 정의합니다. task_id는 Task의 식별자이며, bash_command는 실행할 명령어를 지정합니다. 이 예제에서는 현재 날짜를 출력하는 date 명령어를 실행합니다.
Transfer Operator: 데이터를 한 시스템에서 다른 시스템으로 옮기는 작업을 담당합니다.
예시: FTP에서 파일을 가져오거나 데이터베이스 간 데이터를 이동.
from airflow.operators.mysql_to_hive_operator import MySqlToHiveTransfer
t2 = MySqlToHiveTransfer(
task_id='transfer_data',
sql='SELECT * FROM source_table',
hive_table='destination_table',
create=True,
recreate=True,
delimiter=',',
dag=dag,
)
이 코드에서 MySqlToHiveTransfer는 MySQL 데이터베이스에서 데이터를 추출하여 Hive 테이블로 옮기는 Task를 정의합니다. sql은 데이터를 추출할 SQL 쿼리를 지정하고, hive_table은 데이터를 저장할 Hive 테이블을 지정합니다. create와 recreate는 Hive 테이블을 생성하고 재생성할지 여부를 설정합니다.
Sensor Operator: 특정 조건이 만족될 때까지 기다렸다가 조건이 충족되면 다음 작업을 실행합니다.
예시: 파일이 특정 위치에 생성될 때까지 기다리기.
from airflow.operators.sensors import FileSensor
t3 = FileSensor(
task_id='sense_file',
filepath='/path/to/file',
poke_interval=30,
timeout=600,
dag=dag,
)
이 코드에서 FileSensor는 특정 파일이 지정된 경로에 존재할 때까지 기다리는 Task를 정의합니다. filepath는 감시할 파일의 경로를 지정하고, poke_interval은 파일의 존재 여부를 확인할 간격(초)이며, timeout은 타임아웃 시간(초)을 설정합니다. 파일이 존재하면 다음 작업이 실행됩니다.
def download_data():
# 데이터를 다운로드하는 코드
pass
dag = DAG(
'example_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval='@daily',
)
t1 = PythonOperator(
task_id='download_data',
python_callable=download_data,
dag=dag,
)
t1
은 매일 밤 12시에 실행되는 download_data
Task입니다. 이 Task가 실행될 때마다 하나의 Task Instance가 생성됩니다.t1 = PythonOperator(
task_id='download_data',
python_callable=download_data,
dag=dag,
)
t2 = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
t3 = PythonOperator(
task_id='store_data',
python_callable=store_data,
dag=dag,
)
t1 >> t2 >> t3 # 작업의 순서를 정의
위 예시에서는 t1
이 데이터를 다운로드하고, t2
가 데이터를 처리하며, t3
가 데이터를 저장하는 작업을 수행합니다. t1 >> t2 >> t3
는 작업들이 순서대로 실행되도록 의존성을 정의합니다.
Airflow는 초단위 이하의 데이터 처리를 위해 설계되지 않았습니다. 초단위 이하 데이터 처리란 아주 짧은 시간 간격으로 데이터를 실시간으로 처리하는 것을 말합니다. 스트리밍 데이터 처리에는 적합하지 않습니다.
Apache Airflow는 배치 처리 및 예약된 워크플로우를 관리하는 데 최적화된 도구입니다. 즉, 일정 시간 간격으로 작업을 수행하거나, 데이터 파이프라인을 주기적으로 실행하는 데 매우 유용합니다. 그러나 초단위 이하의 데이터 처리, 즉 매우 짧은 시간 간격으로 데이터를 실시간으로 처리하는 데는 적합하지 않습니다. 그 이유는 다음과 같습니다:
실시간 데이터 처리가 필요한 경우, 다음과 같은 도구를 고려할 수 있습니다:
Airflow는 자체적으로 데이터를 처리하는데 최적화되지 않았습니다. 대신 Spark 같은 외부 프레임워크를 이용해 데이터를 처리하고, Airflow는 이러한 작업을 오케스트레이션하는 데 사용됩니다. 오케스트레이션이란 여러 작업을 조율해서 순서대로 실행되도록 관리하는 것을 의미합니다.
Apache Airflow는 주로 워크플로우 오케스트레이션 도구로 설계되었으며, 자체적으로 데이터를 처리하는 데 최적화되지 않았습니다. 대신, Airflow는 Spark와 같은 외부 데이터 프로세싱 프레임워크와 함께 사용됩니다. 그 이유는 다음과 같습니다: