Airflow 간단한 DAG 작성: DB의 테이블을 Parquet 파일로 변환하여 AWS S3에 업로드하기

GarionNachal·2025년 4월 27일
0

airflow

목록 보기
8/8
post-thumbnail

개요

데이터 파이프라인에서 가장 흔한 작업 중 하나는 데이터베이스에서 데이터를 추출하여 분석에 적합한 형식으로 변환한 후 저장하는 것입니다. Parquet은 컬럼 기반 저장 방식으로 효율적인 압축과 빠른 쿼리 성능을 제공하는 데이터 형식입니다. 이번 글에서는 Apache Airflow를 사용하여 데이터베이스의 테이블을 Parquet 파일로 변환하고 AWS S3에 저장하는 방법을 알아보겠습니다.

전제 조건

이 DAG를 실행하기 위해서는 다음과 같은 준비가 필요합니다:

  1. Apache Airflow 설치 (2.0 이상 권장)

  2. 필요한 Python 패키지 설치:

    Copypip install apache-airflow[amazon]
    pip install pandas pyarrow
    
  3. AWS S3 버킷 및 접근 권한 설정

  4. Airflow에 AWS 연결 구성 (Admin > Connections)

DAG 구성

우리가 만들 DAG는 다음과 같은 단계로 구성됩니다:

  1. DB 연결 및 데이터 추출: SQL 쿼리를 통해 DB에서 데이터 추출
  2. 데이터 변환: 추출한 데이터를 Pandas DataFrame으로 변환하여 처리
  3. Parquet 변환: DataFrame을 Parquet 형식으로 변환
  4. S3 업로드: 변환된 Parquet 파일을 AWS S3에 업로드
  5. 검증: 업로드 완료 및 파일 존재 확인

코드 작성하기

이제 실제 DAG 코드를 작성해 보겠습니다:

Copyfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from airflow.sensors.s3 import S3KeySensor

import pandas as pd
import io
import logging

# DAG 기본 인자 설정
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': datetime(2025, 4, 27),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
dag = DAG(
    'db_to_parquet_s3',
    default_args=default_args,
    description='DB 테이블을 Parquet으로 변환하여 S3에 업로드하는 DAG',
    schedule_interval=timedelta(days=1),
    catchup=False,
    tags=['db', 'parquet', 's3'],
)

# 설정 변수
DB_CONN_ID = 'postgres_conn'       # Airflow에 설정된 PostgreSQL 연결 ID
S3_CONN_ID = 'aws_default'         # Airflow에 설정된 AWS 연결 ID
TABLE_NAME = 'source_table'        # 추출할 DB 테이블명
S3_BUCKET = 'my-data-bucket'       # S3 버킷명
S3_PREFIX = 'parquet-data/'        # S3 경로 prefix

def extract_transform_load(**context):
    """
    PostgreSQL DB에서 데이터를 추출하여 Parquet으로 변환 후 S3에 업로드
    """
    # 실행 날짜 정보 가져오기
    execution_date = context['execution_date']
    formatted_date = execution_date.strftime('%Y-%m-%d')

    # 로그 설정
    logging.info(f"작업 시작: {TABLE_NAME} 테이블 데이터 추출 및 변환")

    # PostgreSQL 연결 및 데이터 추출
    postgres_hook = PostgresHook(postgres_conn_id=DB_CONN_ID)
    conn = postgres_hook.get_conn()

    # 1. SQL 쿼리 실행하여 데이터 추출
    query = f"SELECT * FROM {TABLE_NAME}"
    df = postgres_hook.get_pandas_df(query)

    logging.info(f"데이터 추출 완료: {len(df)} 행 추출됨")

    # 2. 필요한 데이터 전처리 (필요시)
    # 예: df['created_at'] = pd.to_datetime(df['created_at'])

    # 3. 파일명 생성
    file_name = f"{TABLE_NAME}_{formatted_date}.parquet"
    s3_key = f"{S3_PREFIX}{file_name}"

    # 4. DataFrame을 Parquet으로 변환하여 메모리 버퍼에 저장
    buffer = io.BytesIO()
    df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False)
    buffer.seek(0)  # 버퍼 포인터를 시작점으로 이동 (중요!)

    # 5. S3에 업로드
    s3_hook = S3Hook(aws_conn_id=S3_CONN_ID)
    s3_hook.load_file_obj(
        file_obj=buffer,
        key=s3_key,
        bucket_name=S3_BUCKET,
        replace=True
    )

    logging.info(f"파일 업로드 완료: s3://{S3_BUCKET}/{s3_key}")

    # 다음 태스크에서 사용할 정보를 XCom에 저장
    context['ti'].xcom_push(key='s3_key', value=s3_key)
    return s3_key

def validate_s3_upload(**context):
    """
    S3에 파일이 정상적으로 업로드되었는지 확인
    """
    s3_hook = S3Hook(aws_conn_id=S3_CONN_ID)
    s3_key = context['ti'].xcom_pull(task_ids='extract_transform_load', key='s3_key')

    if s3_hook.check_for_key(s3_key, bucket_name=S3_BUCKET):
        file_size = s3_hook.get_key(s3_key, bucket_name=S3_BUCKET).content_length
        logging.info(f"파일 검증 완료: s3://{S3_BUCKET}/{s3_key}, 크기: {file_size} 바이트")
        return True
    else:
        raise ValueError(f"파일을 찾을 수 없음: s3://{S3_BUCKET}/{s3_key}")

# 태스크 정의
extract_load_task = PythonOperator(
    task_id='extract_transform_load',
    python_callable=extract_transform_load,
    provide_context=True,
    dag=dag,
)

validate_task = PythonOperator(
    task_id='validate_s3_upload',
    python_callable=validate_s3_upload,
    provide_context=True,
    dag=dag,
)

# 선후관계 설정
extract_load_task >> validate_task

실행 결과

이 DAG를 Airflow에 배포하고 실행하면 다음과 같은 작업이 순차적으로 진행됩니다:

  1. extract_transform_load 태스크가 데이터베이스에서 테이블 데이터를 추출하고, Parquet 파일로 변환한 후 S3에 업로드합니다.
  2. validate_s3_upload 태스크가 S3에 파일이 정상적으로 업로드되었는지 확인합니다.

성공적으로 실행되면 Airflow UI에서 다음과 같은 그래프가 표시됩니다:

Airflow UI의 XCom 탭에서 s3_key 값을 확인할 수 있으며, S3 버킷에서 생성된 Parquet 파일을 직접 확인할 수 있습니다.

성능 최적화 팁

대용량 데이터를 처리해야 할 경우 다음과 같은 방법으로 성능을 개선할 수 있습니다:

1. 청크 단위 처리

대용량 테이블의 경우 메모리 문제를 피하기 위해 청크 단위로 데이터를 가져와서 처리할 수 있습니다:

Copydef extract_transform_load_chunks(**context):
    # ... 기존 코드 ...

    # 청크 단위로 데이터 가져오기
    chunk_size = 100000  # 적절한 크기로 조정
    for i, chunk_df in enumerate(postgres_hook.get_pandas_df_iter(query, chunksize=chunk_size)):
        # 각 청크를 개별 파일로 저장
        chunk_file_name = f"{TABLE_NAME}_{formatted_date}_part{i}.parquet"
        chunk_s3_key = f"{S3_PREFIX}{chunk_file_name}"

        buffer = io.BytesIO()
        chunk_df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False)
        buffer.seek(0)

        s3_hook.load_file_obj(
            file_obj=buffer,
            key=chunk_s3_key,
            bucket_name=S3_BUCKET,
            replace=True
        )

        logging.info(f"청크 {i} 업로드 완료: s3://{S3_BUCKET}/{chunk_s3_key}")

    # ... 나머지 코드 ...

2. SqlToS3Operator 사용

Airflow의 제공자 패키지는 SQL 데이터를 S3로 직접 전송하는 특수 연산자를 제공합니다:

Copyfrom airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

sql_to_s3_task = SqlToS3Operator(
    task_id='sql_to_s3_task',
    sql_conn_id=DB_CONN_ID,
    query=f"SELECT * FROM {TABLE_NAME}",
    s3_bucket=S3_BUCKET,
    s3_key=f"{S3_PREFIX}{TABLE_NAME}_{{ ds }}.parquet",
    replace=True,
    file_format='parquet',
    compression='snappy',
    dag=dag,
)

3. 병렬 처리

여러 테이블을 동시에 처리하거나, 대용량 테이블을 여러 파티션으로 나누어 병렬 처리할 수 있습니다.

결론

이 글에서는 Airflow를 사용하여 데이터베이스의 테이블을 Parquet 파일로 변환하고 AWS S3에 업로드하는 방법을 살펴보았습니다. Airflow의 강력한 워크플로우 관리 기능과 함께 Pandas와 PyArrow 라이브러리를 활용하여 효율적인 ETL 파이프라인을 구축할 수 있습니다.

특히 Parquet 형식은 컬럼 기반 저장 방식으로 데이터 분석에 최적화되어 있어, S3에 저장된 Parquet 파일은 Amazon Athena, AWS Glue, Redshift Spectrum 등 여러 AWS 분석 서비스와 원활하게 통합됩니다.

이러한 데이터 파이프라인을 일단 구축해 두면 데이터 엔지니어와 분석가는 최신 데이터에 항상 접근할 수 있으며, 분석 워크플로우를 자동화하여 효율성을 높일 수 있습니다.


참고 자료:

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글