에어플로우의 개요

0

airflow

목록 보기
1/2
post-thumbnail

Apache Airflow 개요

Apache Airflow는 데이터 작업을 자동화하는 도구입니다. 데이터를 수집하고, 처리하고, 저장하는 등의 작업을 쉽게 작성하고, 예약하고, 모니터링할 수 있게 해줍니다. 주로 데이터 엔지니어들이 사용하는데, 데이터를 가져와서 변환하고 저장하는 ETL 작업에 많이 사용됩니다.

사용 예

Apache Airflow를 이용한 데이터 파이프라인 예시

매일 밤 12시에 데이터를 처리해야 하는 파이프라인이 있다고 가정해봅시다. 이 파이프라인은 다음과 같은 작업(Task)들로 구성됩니다:

  1. Downloading Data: 데이터를 다운로드
  2. Processing Data: 다운로드한 데이터를 처리
    • 데이터 클렌징: 오류와 중복을 제거
    • 데이터 변환: 형식을 변환하고 필요한 값을 계산
    • 데이터 집계: 데이터를 요약하고 합계 등을 계산
    • 데이터 통합: 여러 소스의 데이터를 하나로 통합
  3. Storing Data: 처리된 데이터를 저장

이 작업들은 순서대로 실행되어야 하고, 각각 외부 시스템과 상호작용이 필요합니다.

1. Downloading Data: 데이터를 다운로드

외부 API를 통해 데이터를 가져옵니다는 다음과 같은 의미입니다:

  • API (Application Programming Interface): 소프트웨어 프로그램 간에 상호작용을 할 수 있도록 해주는 규칙과 도구의 모음입니다.
  • 외부 API: 인터넷을 통해 접근할 수 있는 다른 시스템이나 서비스의 API입니다. 예를 들어, 날씨 데이터를 제공하는 웹 서비스나 금융 데이터를 제공하는 서비스가 있습니다.
  • API Request: API를 호출하여 데이터를 요청하는 작업입니다.

따라서 "외부 API를 통해 데이터를 가져옵니다"는 의미는, 우리가 필요한 데이터를 제공하는 다른 서비스에 API를 통해 요청을 보내고, 그 응답으로 데이터를 받아오는 과정을 말합니다.

예시:
날씨 데이터를 다운로드해야 한다고 가정해봅시다. 이를 위해 특정 날씨 데이터 제공 서비스의 API를 사용한다고 할 때, 다음과 같은 과정을 거칩니다:

import requests

url = "https://api.weather.com/v3/weather/forecast"
params = {
    "apiKey": "your_api_key",
    "geocode": "37.7749,-122.4194",  # 위도와 경도
    "format": "json"
}
response = requests.get(url, params=params)
weather_data = response.json()

위 코드에서 requests 라이브러리를 사용하여 API에 요청을 보내고, 응답으로 받은 JSON 형식의 데이터를 weather_data에 저장합니다.

2. Processing Data: 다운로드한 데이터를 처리

데이터를 변환하기 위해 Spark라는 데이터 처리 도구를 사용합니다는 다음과 같은 의미입니다:

  • Spark: Apache Spark는 빠르고 일반적인 클러스터 컴퓨팅 시스템입니다. 대규모 데이터 처리를 위해 설계된 도구로, 데이터 분석, 머신러닝, 데이터 스트리밍 등에 사용됩니다.
  • Spark Job: Spark에서 실행되는 특정 작업을 의미합니다. 데이터 읽기, 처리, 변환, 저장 등의 작업을 포함합니다.

예시:
날씨 데이터를 다운로드한 후, Spark를 사용하여 데이터를 변환한다고 가정해봅시다. 변환 작업은 예를 들어, 섭씨 온도를 화씨로 변환하거나, 날짜 형식을 통일하는 등의 작업일 수 있습니다.

from pyspark.sql import SparkSession

# Spark 세션 생성
spark = SparkSession.builder.appName("WeatherDataProcessing").getOrCreate()

# JSON 파일에서 데이터 읽기
weather_df = spark.read.json("path/to/weather_data.json")

# 데이터 변환: 섭씨를 화씨로 변환
weather_df = weather_df.withColumn("temperature_F", weather_df["temperature_C"] * 9 / 5 + 32)

# 변환된 데이터를 저장
weather_df.write.parquet("path/to/processed_weather_data.parquet")

spark.stop()

위 코드에서 Spark를 사용하여 JSON 형식의 날씨 데이터를 읽어오고, 섭씨 온도를 화씨로 변환한 후, 변환된 데이터를 파케이 형식으로 저장합니다.

3. Storing Data: 처리된 데이터를 저장

변환된 데이터를 데이터베이스(DB)에 저장합니다는 다음과 같은 의미입니다:

  • Insert/Update 작업: 데이터베이스에 새로운 데이터를 삽입하거나 기존 데이터를 업데이트하는 작업입니다.
  • 데이터베이스: 데이터를 체계적으로 저장하고 관리하는 시스템입니다. 예를 들어, MySQL, PostgreSQL, SQLite 등이 있습니다.

예시:
변환된 날씨 데이터를 데이터베이스에 저장한다고 가정해봅시다. 이를 위해 변환된 데이터를 데이터베이스에 삽입하거나 업데이트할 수 있습니다.

import sqlite3
import pandas as pd

# SQLite 데이터베이스 연결
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()

# 변환된 데이터 읽기
processed_weather_df = pd.read_parquet("path/to/processed_weather_data.parquet")

# 데이터 삽입/업데이트
for index, row in processed_weather_df.iterrows():
    cursor.execute("""
        INSERT INTO weather (date, location, temperature_F)
        VALUES (?, ?, ?)
        ON CONFLICT(date, location) DO UPDATE SET
        temperature_F = excluded.temperature_F
    """, (row['date'], row['location'], row['temperature_F']))

# 변경 사항 저장 및 연결 종료
conn.commit()
conn.close()

위 코드에서 SQLite 데이터베이스에 연결한 후, 변환된 데이터를 읽어와서 각 행을 데이터베이스에 삽입하거나 업데이트합니다. ON CONFLICT 절을 사용하여 이미 존재하는 데이터의 경우 업데이트를 수행합니다.

Airflow의 정의

  • Apache Airflow는 프로그래밍 방식으로 워크플로우를 작성, 예약 및 모니터링하는 오픈 소스 플랫폼
  • Airflow는 작업을 정확한 시간에, 정확한 방법으로, 정확한 순서대로 실행하게 해주는 오케스트레이터

주요 개념

  1. DAG (Directed Acyclic Graph)

    DAG는 방향성을 가지며 순환하지 않는 그래프를 의미합니다. Airflow에서 DAG는 워크플로우의 기본 구성 요소로, 각 작업(Task)들이 실행되는 순서와 의존성을 정의합니다.

주요 특징

  • 방향성 (Directed):
    각 작업은 특정 방향으로만 진행됩니다. 즉, 한 작업에서 다음 작업으로 흐름이 이어집니다.
  • 비순환성 (Acyclic):
    작업들 사이에 순환이 없습니다. 즉, 어떤 작업도 다시 자신에게 돌아오지 않습니다. 이 특성은 무한 루프를 방지하고 작업의 논리적 순서를 보장합니다.
  • 순차적 실행:
    작업(Task)들이 순차적으로 실행됩니다. 각 작업이 정해진 순서대로 실행되기 때문에 논리적 오류를 방지할 수 있습니다.

예시

예를 들어, 데이터 다운로드, 처리, 저장의 순서가 명확히 정의된 파이프라인을 생각해보세요. 각 작업이 차례로 실행되며, 오류가 발생하지 않도록 보장됩니다.


from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def download_data():
    # 데이터를 다운로드하는 코드
    pass

def process_data():
    # 다운로드한 데이터를 처리하는 코드
    pass

def store_data():
    # 처리된 데이터를 저장하는 코드
    pass

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'data_pipeline_example',
    default_args=default_args,
    description='A simple data pipeline example',
    schedule_interval='@daily',
)

t1 = PythonOperator(
    task_id='download_data',
    python_callable=download_data,
    dag=dag,
)

t2 = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

t3 = PythonOperator(
    task_id='store_data',
    python_callable=store_data,
    dag=dag,
)

t1 >> t2 >> t3  # 작업의 순서를 정의

위 코드에서 dag 객체는 전체 워크플로우의 구조를 정의하며, t1, t2, t3 작업은 각각 데이터를 다운로드, 처리, 저장하는 역할을 합니다. t1 >> t2 >> t3는 작업들이 순서대로 실행되도록 의존성을 정의합니다.

  1. Operator
    • Operator는 Task를 정의하는 역할을 합니다. 각 Task는 실제로 수행할 작업을 의미

종류

  • Action Operator: 실제 작업을 실행하는 Operator입니다.

    • 예시: Bash 명령어 실행, Python 코드 실행 등.

      
      from airflow.operators.bash_operator import BashOperator
      
      t1 = BashOperator(
          task_id='print_date',
          bash_command='date',
          dag=dag,
      )
      

      이 코드에서 BashOperator는 Bash 명령어를 실행하는 Task를 정의합니다. task_id는 Task의 식별자이며, bash_command는 실행할 명령어를 지정합니다. 이 예제에서는 현재 날짜를 출력하는 date 명령어를 실행합니다.

  • Transfer Operator: 데이터를 한 시스템에서 다른 시스템으로 옮기는 작업을 담당합니다.

    • 예시: FTP에서 파일을 가져오거나 데이터베이스 간 데이터를 이동.

      
      from airflow.operators.mysql_to_hive_operator import MySqlToHiveTransfer
      
      t2 = MySqlToHiveTransfer(
          task_id='transfer_data',
          sql='SELECT * FROM source_table',
          hive_table='destination_table',
          create=True,
          recreate=True,
          delimiter=',',
          dag=dag,
      )
      

      이 코드에서 MySqlToHiveTransfer는 MySQL 데이터베이스에서 데이터를 추출하여 Hive 테이블로 옮기는 Task를 정의합니다. sql은 데이터를 추출할 SQL 쿼리를 지정하고, hive_table은 데이터를 저장할 Hive 테이블을 지정합니다. create와 recreate는 Hive 테이블을 생성하고 재생성할지 여부를 설정합니다.

  • Sensor Operator: 특정 조건이 만족될 때까지 기다렸다가 조건이 충족되면 다음 작업을 실행합니다.

    • 예시: 파일이 특정 위치에 생성될 때까지 기다리기.

      
      from airflow.operators.sensors import FileSensor
      
      t3 = FileSensor(
          task_id='sense_file',
          filepath='/path/to/file',
          poke_interval=30,
          timeout=600,
          dag=dag,
      )
      

      이 코드에서 FileSensor는 특정 파일이 지정된 경로에 존재할 때까지 기다리는 Task를 정의합니다. filepath는 감시할 파일의 경로를 지정하고, poke_interval은 파일의 존재 여부를 확인할 간격(초)이며, timeout은 타임아웃 시간(초)을 설정합니다. 파일이 존재하면 다음 작업이 실행됩니다.

  1. Task & Task Instance
    • Task: 데이터 파이프라인에 있는 작업을 의미합니다.
      예를 들어, 데이터를 다운로드하거나 처리하는 작업이 있습니다.
    • Task Instance: Task가 실제로 실행될 때 생성된 인스턴스입니다. 예를 들어, 매일 밤 12시에 실행되는 데이터 다운로드 작업은 각각의 실행마다 하나의 Task Instance를 갖게 됩니다.

def download_data():
    # 데이터를 다운로드하는 코드
    pass

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval='@daily',
)

t1 = PythonOperator(
    task_id='download_data',
    python_callable=download_data,
    dag=dag,
)
  • t1은 매일 밤 12시에 실행되는 download_data Task입니다. 이 Task가 실행될 때마다 하나의 Task Instance가 생성됩니다.
  1. Workflow
    • 워크플로우는 DAG를 통해 정의된 작업들의 일련의 과정입니다. 각 작업은 Operator로 실행되며, 작업 간의 의존성이 정의됩니다.
t1 = PythonOperator(
    task_id='download_data',
    python_callable=download_data,
    dag=dag,
)

t2 = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

t3 = PythonOperator(
    task_id='store_data',
    python_callable=store_data,
    dag=dag,
)

t1 >> t2 >> t3  # 작업의 순서를 정의

위 예시에서는 t1이 데이터를 다운로드하고, t2가 데이터를 처리하며, t3가 데이터를 저장하는 작업을 수행합니다. t1 >> t2 >> t3는 작업들이 순서대로 실행되도록 의존성을 정의합니다.

Apache Airflow 장단점

장점

  1. 동적 데이터 파이프라인 (Dynamic Data Pipeline)
    • Airflow는 파이썬을 이용해 데이터 파이프라인을 정의합니다. 이를 통해 복잡한 로직을 쉽게 구현할 수 있습니다.
    • 예를 들어, 데이터 클렌징, 변환, 집계, 통합 등의 작업을 파이썬 코드로 간단히 작성할 수 있습니다.
  2. 확장성 (Scalability)
    • Airflow는 매우 확장성이 뛰어납니다. 여러 작업을 병렬로 실행할 수 있으며, 클러스터 환경에서도 잘 동작합니다.
    • Kubernetes와 같은 분산 환경에서 파이프라인을 실행할 수 있어 대규모 데이터 처리가 가능합니다.
  3. 편리한 사용자 인터페이스 (Useful User Interface)
    • Airflow는 웹 인터페이스를 제공하여 파이프라인의 상태를 쉽게 모니터링하고 관리할 수 있습니다.
    • DAG의 실행 상태를 시각적으로 확인할 수 있어 작업 간의 의존성 및 실행 결과를 직관적으로 파악할 수 있습니다.
  4. 높은 확장성 (High Extensibility)
    • 플러그인을 쉽게 추가할 수 있어 새로운 작업 툴이 나올 때마다 플러그인을 개발하여 적용할 수 있습니다.
    • 이를 통해 다양한 작업 환경에 맞춰 쉽게 커스터마이징할 수 있습니다.
  5. 강력한 스케줄링 기능 (Powerful Scheduling)
    • 다양한 스케줄링 옵션을 제공하여 복잡한 스케줄링 요구사항도 쉽게 처리할 수 있습니다.
    • 크론 표현식을 사용해 세밀한 시간 설정이 가능합니다.
  6. 백필 기능 (Backfill)
    • 백필 기능을 사용하면 과거 데이터를 손쉽게 재처리할 수 있습니다.
    • 코드를 변경한 후에도 이전 데이터를 다시 처리할 수 있어 데이터 일관성을 유지할 수 있습니다.

단점

  1. 초기 설정의 복잡성 (Complex Initial Setup)
    • Airflow의 초기 설치와 설정은 복잡할 수 있습니다. 다양한 구성 요소를 설정하고 연결하는 데 시간이 걸릴 수 있습니다.
    • 특히 대규모 환경에서 안정적으로 운영하려면 많은 설정이 필요합니다.
  2. 파이썬 경험의 필요성 (Need for Python Experience)
    • Airflow는 파이썬을 기반으로 하기 때문에 파이썬에 익숙하지 않은 사용자는 학습 곡선이 있을 수 있습니다.
    • DAG와 Operator를 작성하는 데 파이썬 코딩이 필요합니다.
  3. 데이터 스트리밍에 부적합 (Not Suitable for Data Streaming)
    • Airflow는 배치 처리에 최적화되어 있으며, 실시간 데이터 스트리밍 작업에는 적합하지 않습니다.
    • 초단위 이하의 데이터 처리가 필요한 경우 다른 솔루션이 필요합니다.
  4. 데이터 프로세싱 프레임워크로 부적합 (Not a Data Processing Framework)
    • Airflow는 자체적으로 데이터를 처리하는 데 최적화되어 있지 않습니다.
    • Spark와 같은 외부 데이터 프로세싱 프레임워크와 함께 사용해야 합니다.
  5. 관리의 복잡성 (Complexity in Management)
    • 파이프라인의 규모가 커지면 DAG와 Operator의 관리가 복잡해질 수 있습니다.
    • 초기 사용 시점에 엄격한 관리가 필요합니다.
  6. 안정성 문제 (Stability Issues)
    • Airflow는 여러 구성 요소가 상호작용하기 때문에 각 구성 요소의 상태에 따라 문제가 발생할 수 있습니다.
    • 특히 스케줄러와 워커 간의 통신 문제가 발생할 수 있습니다.

Airflow의 잘못된 용도

  • 데이터 스트리밍 솔루션
    • Airflow는 초단위 이하의 데이터 처리를 위해 설계되지 않았습니다. 초단위 이하 데이터 처리란 아주 짧은 시간 간격으로 데이터를 실시간으로 처리하는 것을 말합니다. 스트리밍 데이터 처리에는 적합하지 않습니다.

      Apache Airflow는 배치 처리 및 예약된 워크플로우를 관리하는 데 최적화된 도구입니다. 즉, 일정 시간 간격으로 작업을 수행하거나, 데이터 파이프라인을 주기적으로 실행하는 데 매우 유용합니다. 그러나 초단위 이하의 데이터 처리, 즉 매우 짧은 시간 간격으로 데이터를 실시간으로 처리하는 데는 적합하지 않습니다. 그 이유는 다음과 같습니다:

  1. 스케줄링 지연 (Scheduling Latency)
    • Airflow의 스케줄러는 주기적으로 DAG 파일을 스캔하고 Task를 스케줄링합니다. 이 과정에는 일정한 지연이 있을 수 있습니다. 초단위 이하의 정밀한 스케줄링을 요구하는 경우, 이러한 지연은 문제가 될 수 있습니다.
  2. 오버헤드 (Overhead)
    • Airflow는 각 Task를 실행하기 위해 여러 구성 요소 간의 통신과 상태 관리를 수행합니다. 이 과정에서 발생하는 오버헤드는 매우 짧은 간격으로 작업을 실행할 때 효율성을 떨어뜨립니다.
    • 예를 들어, Task가 시작되기 전후에 발생하는 데이터베이스 업데이트, 로그 작성 등의 부가 작업이 많습니다.
  3. 상태 관리 (State Management)
    • Airflow는 Task의 상태를 지속적으로 추적하고 업데이트합니다. Task의 성공, 실패, 재시도 등 상태 정보를 데이터베이스에 저장하고 관리합니다. 이 상태 관리 과정은 실시간 처리 환경에서는 과부하가 될 수 있습니다.
  4. 스케줄링 빈도 제한 (Scheduling Frequency Limitations)
    • Airflow의 스케줄링은 보통 분 단위 이상의 간격으로 설정됩니다. 초단위 이하의 스케줄링을 요구하는 경우, Airflow의 기본 설정으로는 이를 처리하기 어렵습니다.
    • 크론 표현식을 사용해도 초단위 이하의 정밀한 시간 설정은 불가능합니다.
  5. 실시간 처리 요구 사항 (Real-time Processing Requirements)
    • 실시간 데이터 스트리밍 작업은 보통 메시지 큐 시스템이나 스트리밍 플랫폼을 통해 처리됩니다. 예를 들어, Apache Kafka, Apache Flink, Apache Storm 같은 도구들은 실시간 데이터를 효율적으로 처리하기 위해 설계되었습니다.
    • 이러한 도구들은 낮은 지연 시간과 높은 처리량을 제공하며, 실시간 데이터 흐름을 관리하는 데 최적화되어 있습니다.

대안

실시간 데이터 처리가 필요한 경우, 다음과 같은 도구를 고려할 수 있습니다:

  1. Apache Kafka
    • 분산 스트리밍 플랫폼으로, 높은 처리량과 낮은 지연 시간으로 실시간 데이터 스트리밍을 처리합니다.
  2. Apache Flink
    • 실시간 데이터 스트리밍과 배치 처리를 모두 지원하는 강력한 분산 처리 엔진입니다.
  3. Apache Storm
    • 실시간 데이터 스트리밍을 처리하기 위한 분산 실시간 컴퓨팅 시스템입니다.
  • 데이터 프로세싱 프레임워크
    • Airflow는 자체적으로 데이터를 처리하는데 최적화되지 않았습니다. 대신 Spark 같은 외부 프레임워크를 이용해 데이터를 처리하고, Airflow는 이러한 작업을 오케스트레이션하는 데 사용됩니다. 오케스트레이션이란 여러 작업을 조율해서 순서대로 실행되도록 관리하는 것을 의미합니다.

      Apache Airflow는 주로 워크플로우 오케스트레이션 도구로 설계되었으며, 자체적으로 데이터를 처리하는 데 최적화되지 않았습니다. 대신, Airflow는 Spark와 같은 외부 데이터 프로세싱 프레임워크와 함께 사용됩니다. 그 이유는 다음과 같습니다:

  1. 처리 성능 (Processing Performance)
    • Airflow는 병렬 처리 및 대규모 데이터 세트를 효율적으로 처리하는 데 필요한 고성능 컴퓨팅 엔진을 제공하지 않습니다. Spark와 같은 프레임워크는 인메모리 컴퓨팅과 분산 처리 기능을 통해 매우 빠르고 효율적인 데이터 처리를 가능하게 합니다.
    • 예를 들어, Spark는 대규모 데이터 세트를 메모리에 로드하여 빠르게 반복 처리할 수 있는 능력을 가지고 있습니다.
  2. 확장성 (Scalability)
    • Airflow는 수평 확장이나 대규모 분산 처리에 적합하지 않습니다. Spark는 클러스터 환경에서 데이터를 병렬로 처리할 수 있으며, 데이터의 양이 증가하더라도 쉽게 확장할 수 있습니다.
    • Spark의 RDD(Resilient Distributed Dataset)와 같은 데이터 구조는 대규모 데이터 처리에 매우 효율적입니다.
  3. 실시간 데이터 처리 (Real-time Data Processing)
    • Airflow는 배치 처리에 적합하며, 실시간 데이터 처리에는 최적화되지 않았습니다. Spark는 스트리밍 데이터를 실시간으로 처리할 수 있는 Spark Streaming 기능을 제공하여, 실시간 데이터 분석 및 처리에 적합합니다.
    • Spark Streaming은 실시간 데이터 스트림을 처리하고 분석하는 데 필요한 고성능 컴퓨팅을 제공합니다.
  4. 내장 연산 및 라이브러리 (Built-in Operations and Libraries)
    • Spark는 데이터 처리 및 분석을 위한 풍부한 내장 연산 및 라이브러리를 제공합니다. 예를 들어, SQL 쿼리, 머신러닝 라이브러리(MLlib), 그래프 처리 라이브러리(GraphX) 등이 포함되어 있습니다.
    • 이러한 라이브러리는 복잡한 데이터 처리 작업을 쉽게 구현할 수 있도록 도와줍니다.
  5. 오케스트레이션 역할 (Orchestration Role)
    • Airflow의 주 역할은 데이터 처리 작업을 조율하고 관리하는 것입니다. 여러 단계로 구성된 복잡한 워크플로우를 정의하고, 각 단계가 순서대로 실행되도록 관리합니다.
    • Airflow는 Spark 작업을 오케스트레이션하여, 데이터 다운로드, 데이터 처리(Spark 작업), 결과 저장 등의 작업이 순차적으로 실행되도록 합니다.

0개의 댓글