0. INTRO
- Airflow의 XCom(Cross Communication)은 DAG 내의 태스크 간 데이터를 공유하기 위한 기능입니다. Airflow 시스템이 실행되면 기본 메타데이터 저장소인 PostgreSQL에 다양한 메타 정보가 저장되며, XCom 데이터 역시 이 PostgreSQL 데이터베이스에 기록됩니다.
 
- 하지만 XCom은 어디까지나 경량 메시지 전달 용도로 설계된 기능이기 때문에, 사용 시 몇 가지 권장사항을 반드시 고려해야 합니다.
str, int, float, bool, list, dict 등의 단순하고 직렬화 가능한 데이터 타입만 저장 
- 수 MB 이하 수준의 작고 가벼운 데이터만 저장
 
- 실제 데이터 전달보다는 상태 또는 신호 전달 용도로 사용 (경로, 처리 여부, 간단한 통계치 등)
 
 
- Python 함수 간 데이터 전달은 같은 세션의 메모리를 공유하기 때문에 
pandas.DataFrame, numpy.ndarray, 이미지 등 대용량 복합 객체도 무리 없이 전달이 가능합니다. 하지만 XCom은 메타DB를 공유하므로, 대용량 데이터를 그대로 저장하면 DB 성능과 안정성에 악영향을 줄 수 있습니다. 
- 데이터 파이프라인에서는 함수 간 DataFrame 객체 전달이 자주 발생하는데 이런 경우는 어떻게 해야 할까요? Airflow는 이를 위해 Xcom 데이터를 PostgreSQL이 아닌 AWS S3, Google GCS, Azure Blob Storage 같은 클라우드 객체 저장소에 저장할 수 있는 
XComObjectStorageBackend 설정을 제공합니다. 
- 이번 글에서는 Airflow 3.0 버전에서 Xcom을 AWS S3와 Google Cloud Storage로 설정하는 방법을 각각 다뤄보도록 하겠습니다. (Airflow 3.0 버전을 Docker Compose로 설치 후 진행하므로 설치 관련해서는 공식 문서 참고 바랍니다.)
 
1. Airflow 3.0 설정 파일 확인
- Airflow에는 각종 세팅들을 저장해놓는 
airflow.cfg 파일이 있습니다. 2.10 버전까지는 Airflow의 도커 컨테이너 내부 /opt/airflow/airflow.cfg 경로에 존재했었는데 3 버전이 되면서 /opt/airflow/config/airflow.cfg 경로로 위치가 바뀌었습니다. 
- 사용자의 로컬 디렉토리와 볼륨 매핑이 되어있는 경로로 위치가 바뀌어 예전이라면 도커 컨테이너 내부에 들어가서 수정해야 했었던 것이 이제 로컬 디렉토리에서 바로 수정이 가능해졌습니다.

 
XComObjectStorageBackend 설정시 airflow.cfg 파일을 수정하고 docker compose restart 명령으로 시스템 재시작을 해주어야 제대로 적용이 됩니다. 
2. Connections 설정
- Airflow UI에서 AWS와 GCP에 대한 connections 설정을 해주도록 하겠습니다. (
Admin > Connections) 
1) AWS Connection
Connection Type : aws 
AWS Access Key ID : IAM USER로 발급받은 Key의 ID 부분 
AWS Secret Access Key : IAM USER로 발급받은 Key의 Secret 부분 
Extra Fields : {"region_name" : "ap-northeast-2"} 

 
2) GCP Connection
Connection Type : google_cloud_platform 
Project Id : GCP 프로젝트 ID 
Keyfile Path : 서비스 계정 JSON 키 경로

 
3. DAG 작성
- 실습 DAG의 경우 아래 두 가지 Task로 구성되어 있습니다.
1번 Task(upload_df) > 컬럼 4개, 행 10개짜리 샘플 DataFrame을 생성하여 이를 Xcom에 저장 
2번 Task(read_df) > Xcom에 저장된 DataFrame을 읽어와 로그에 출력 
 
import pendulum
from datetime import timedelta
import datetime
from airflow.sdk import DAG, task, get_current_context, ObjectStoragePath, Param
from airflow.operators.python import PythonOperator
import pandas as pd
def create_sample_df():
    data = {
        'name': ['John', 'Emma', 'Michael', 'Sarah', 'David', 'Lisa', 'James', 'Emily', 'Daniel', 'Anna'],
        'age': [25, 32, 28, 35, 41, 29, 33, 27, 38, 31],
        'city': ['Seoul', 'Boston', 'London', 'Paris', 'Tokyo', 'Sydney', 'Berlin', 'Toronto', 'Rome', 'Madrid'],
        'score': [85, 92, 78, 95, 88, 83, 91, 87, 94, 89]
    }
    return pd.DataFrame(data)
with DAG(
        dag_id="xcom_backend_dag",
        schedule="@once",
        start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
        catchup=False
) as dag:
	
    @task(task_id='upload_df')
    def upload_df():
        df = create_sample_df()
        return df
    
    
    @task(task_id='read_df')
    def read_df():
        context = get_current_context()
        df = context['task_instance'].xcom_pull(task_ids='upload_df')
        print(df.head())
        return df
    
    upload_df() >> read_df()
4. airflow.cfg 파일 수정 후 적용
1) AWS S3
- AWS S3를 Xcom Backend로 설정하고자 하는 경우, 
airflow.cfg 파일의 내용을 아래와 같이 설정하면 됩니다. 
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
# 예시) xcom_objectstorage_path = s3://aws_connection@airflow-hyunsoo-bucket/xcom
xcom_objectstorage_path = s3://[AWS Connection 명]@[S3 버킷 경로]
xcom_objectstorage_threshold = 0
xcom_objectstorage_compression = gzip
xcom_objectstorage_path : Xcom 데이터가 저장될 객체 스토리지 경로 지정 
xcom_objectstorage_threshold : Xcom 데이터가 객체 스토리지에 저장되지 위한 최소값 지정
threshold 값 미만 : 메타DB에 저장 
threshold 값 이상 : 객체 스토리지에 저장 
-1 : 항상 메타DB에 저장 
0 : 항상 객체 스토리지에 저장 
 
xcom_objectstorage_compression : Xcom 데이터의 압축 방식을 지정 
2) Google Cloud Storage
- 위 설정과 비슷하며 
xcom_objectstorage_path만 GCS 경로로 바꾸면 됩니다. 
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
# 예시) xcom_objectstorage_path = gs://my_gcp_conn@codeit_sprint/xcom
xcom_objectstorage_path = gs://[GCP Connection 명]@[GCS 버킷 경로]
xcom_objectstorage_threshold = 0
xcom_objectstorage_compression = gzip
3) 설정 내용 적용
airflow.cfg 파일의 위 4가지 Key값을 설정하였다면 시스템 재시작이 필요합니다. docker compose로 설치되어 있다면 docker compose restart 명령어를 통해 재시작해주세요. 
- 재시작 후 위의 DAG를 실행하면 아래와 같이 객체 저장소에 
버킷 경로/[DAG ID]/[작업 시간]/[TASK ID]/[Xcom 데이터]와 같이 디렉토리 구조가 생성되며 Task에서 return한 데이터가 저장됩니다.
- AWS S3

 
- Google Cloud Storage

 
 
5. 참고 자료