AIRFLOW 3.0, 클라우드 객체 스토리지를 Xcom 저장소로 설정하기(xcom backend)

NewNewDaddy·2025년 6월 12일
0

AIRFLOW

목록 보기
4/4
post-thumbnail

0. INTRO

  • Airflow의 XCom(Cross Communication)은 DAG 내의 태스크 간 데이터를 공유하기 위한 기능입니다. Airflow 시스템이 실행되면 기본 메타데이터 저장소인 PostgreSQL에 다양한 메타 정보가 저장되며, XCom 데이터 역시 이 PostgreSQL 데이터베이스에 기록됩니다.
  • 하지만 XCom은 어디까지나 경량 메시지 전달 용도로 설계된 기능이기 때문에, 사용 시 몇 가지 권장사항을 반드시 고려해야 합니다.
    • str, int, float, bool, list, dict 등의 단순하고 직렬화 가능한 데이터 타입만 저장
    • 수 MB 이하 수준의 작고 가벼운 데이터만 저장
    • 실제 데이터 전달보다는 상태 또는 신호 전달 용도로 사용 (경로, 처리 여부, 간단한 통계치 등)
  • Python 함수 간 데이터 전달은 같은 세션의 메모리를 공유하기 때문에 pandas.DataFrame, numpy.ndarray, 이미지 등 대용량 복합 객체도 무리 없이 전달이 가능합니다. 하지만 XCom은 메타DB를 공유하므로, 대용량 데이터를 그대로 저장하면 DB 성능과 안정성에 악영향을 줄 수 있습니다.
  • 데이터 파이프라인에서는 함수 간 DataFrame 객체 전달이 자주 발생하는데 이런 경우는 어떻게 해야 할까요? Airflow는 이를 위해 Xcom 데이터를 PostgreSQL이 아닌 AWS S3, Google GCS, Azure Blob Storage 같은 클라우드 객체 저장소에 저장할 수 있는 XComObjectStorageBackend 설정을 제공합니다.
  • 이번 글에서는 Airflow 3.0 버전에서 Xcom을 AWS S3와 Google Cloud Storage로 설정하는 방법을 각각 다뤄보도록 하겠습니다. (Airflow 3.0 버전을 Docker Compose로 설치 후 진행하므로 설치 관련해서는 공식 문서 참고 바랍니다.)

1. Airflow 3.0 설정 파일 확인

  • Airflow에는 각종 세팅들을 저장해놓는 airflow.cfg 파일이 있습니다. 2.10 버전까지는 Airflow의 도커 컨테이너 내부 /opt/airflow/airflow.cfg 경로에 존재했었는데 3 버전이 되면서 /opt/airflow/config/airflow.cfg 경로로 위치가 바뀌었습니다.
  • 사용자의 로컬 디렉토리와 볼륨 매핑이 되어있는 경로로 위치가 바뀌어 예전이라면 도커 컨테이너 내부에 들어가서 수정해야 했었던 것이 이제 로컬 디렉토리에서 바로 수정이 가능해졌습니다.
  • XComObjectStorageBackend 설정시 airflow.cfg 파일을 수정하고 docker compose restart 명령으로 시스템 재시작을 해주어야 제대로 적용이 됩니다.

2. Connections 설정

  • Airflow UI에서 AWS와 GCP에 대한 connections 설정을 해주도록 하겠습니다. (Admin > Connections)

1) AWS Connection

  • Connection Type : aws
  • AWS Access Key ID : IAM USER로 발급받은 Key의 ID 부분
  • AWS Secret Access Key : IAM USER로 발급받은 Key의 Secret 부분
  • Extra Fields : {"region_name" : "ap-northeast-2"}

2) GCP Connection

  • Connection Type : google_cloud_platform
  • Project Id : GCP 프로젝트 ID
  • Keyfile Path : 서비스 계정 JSON 키 경로

3. DAG 작성

  • 실습 DAG의 경우 아래 두 가지 Task로 구성되어 있습니다.
    • 1번 Task(upload_df) > 컬럼 4개, 행 10개짜리 샘플 DataFrame을 생성하여 이를 Xcom에 저장
    • 2번 Task(read_df) > Xcom에 저장된 DataFrame을 읽어와 로그에 출력
import pendulum
from datetime import timedelta
import datetime
from airflow.sdk import DAG, task, get_current_context, ObjectStoragePath, Param
from airflow.operators.python import PythonOperator
import pandas as pd

## 샘플 데이터프레임 생성
def create_sample_df():
    data = {
        'name': ['John', 'Emma', 'Michael', 'Sarah', 'David', 'Lisa', 'James', 'Emily', 'Daniel', 'Anna'],
        'age': [25, 32, 28, 35, 41, 29, 33, 27, 38, 31],
        'city': ['Seoul', 'Boston', 'London', 'Paris', 'Tokyo', 'Sydney', 'Berlin', 'Toronto', 'Rome', 'Madrid'],
        'score': [85, 92, 78, 95, 88, 83, 91, 87, 94, 89]
    }
    return pd.DataFrame(data)

with DAG(
        dag_id="xcom_backend_dag",
        schedule="@once",
        start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
        catchup=False
) as dag:
	## DataFrame을 Xcom에 저장
    @task(task_id='upload_df')
    def upload_df():
        df = create_sample_df()
        return df
    
    ## 저장된 Xcom을 가져와 로그에 출력
    @task(task_id='read_df')
    def read_df():
        context = get_current_context()
        df = context['task_instance'].xcom_pull(task_ids='upload_df')
        print(df.head())
        return df
    
    upload_df() >> read_df()

4. airflow.cfg 파일 수정 후 적용

1) AWS S3

  • AWS S3를 Xcom Backend로 설정하고자 하는 경우, airflow.cfg 파일의 내용을 아래와 같이 설정하면 됩니다.
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

# 예시) xcom_objectstorage_path = s3://aws_connection@airflow-hyunsoo-bucket/xcom
xcom_objectstorage_path = s3://[AWS Connection 명]@[S3 버킷 경로]

xcom_objectstorage_threshold = 0

xcom_objectstorage_compression = gzip
  • xcom_objectstorage_path : Xcom 데이터가 저장될 객체 스토리지 경로 지정
  • xcom_objectstorage_threshold : Xcom 데이터가 객체 스토리지에 저장되지 위한 최소값 지정
    • threshold 값 미만 : 메타DB에 저장
    • threshold 값 이상 : 객체 스토리지에 저장
    • -1 : 항상 메타DB에 저장
    • 0 : 항상 객체 스토리지에 저장
  • xcom_objectstorage_compression : Xcom 데이터의 압축 방식을 지정

2) Google Cloud Storage

  • 위 설정과 비슷하며 xcom_objectstorage_path만 GCS 경로로 바꾸면 됩니다.
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

# 예시) xcom_objectstorage_path = gs://my_gcp_conn@codeit_sprint/xcom
xcom_objectstorage_path = gs://[GCP Connection 명]@[GCS 버킷 경로]

xcom_objectstorage_threshold = 0

xcom_objectstorage_compression = gzip

3) 설정 내용 적용

  • airflow.cfg 파일의 위 4가지 Key값을 설정하였다면 시스템 재시작이 필요합니다. docker compose로 설치되어 있다면 docker compose restart 명령어를 통해 재시작해주세요.
  • 재시작 후 위의 DAG를 실행하면 아래와 같이 객체 저장소에 버킷 경로/[DAG ID]/[작업 시간]/[TASK ID]/[Xcom 데이터]와 같이 디렉토리 구조가 생성되며 Task에서 return한 데이터가 저장됩니다.
    • AWS S3
    • Google Cloud Storage

5. 참고 자료

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글