사전에 등록해둔 두개의 cloud 계정의 Connections 정보를 등록해두었다.
<유의사항 >
IAM 자격증명 내 bucket에 접근 권한 확인 필요.

구성해볼 파이프라인은 BigQuery to S3 ( .csv ) 형태의 daily task다.
S3HOOK 을 이용해서 bucket 명 출력 및 bucket에 .csv파일 넣기 ( read&write) 권한 확인이 필요하다 .
사전에 IAM 에서 arn 설정을 해줬다 하더라도 실제로 접근 가능한지 체크
import pendulum
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def list_s3_buckets_with_hook():
logger = logging.getLogger("airflow.task") # Airflow 로그용 로거
# S3Hook 인스턴스 생성 (aws_conn_id는 Airflow Connection에서 등록한 ID)
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_client = s3_hook.get_conn() # boto3 client 반환
response = s3_client.list_buckets()
logger.info("=== S3 Buckets (Using S3Hook) ===")
for bucket in response.get('Buckets', []):
logger.info(f"- {bucket['Name']}")
with DAG(
dag_id='dags_s3_hook',
start_date=pendulum.today('UTC').add(days=-1),
schedule=None,
catchup=False,
tags=['aws', 's3']
) as dag:
list_buckets_task = PythonOperator(
task_id='list_s3_buckets_with_hook',
python_callable=list_s3_buckets_with_hook
)

LOG 출력을 통해 Bucket LIst 접근 권한 확인
대략적인 과정은 이렇다
Extract : BigQuery ( ODS )
Transformation : BigQuery 데이터를 읽고 record타입과 같은 특정 타입을 column 형태로 변형 (.csv )
Load : ETL 시간을 파일명으로 레이블링해서 업로드
import pendulum
import logging
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from io import StringIO
def export_bigquery_to_s3():
logger = logging.getLogger("airflow.task")
# BigQueryHook 인스턴스 (gcp_conn_id는 Airflow에 설정한 GCP 연결 ID)
bq_hook = BigQueryHook(gcp_conn_id='google_cloud_default', use_legacy_sql=False)
# 실행할 쿼리
sql =
# 쿼리 실행 후 결과를 pandas DataFrame으로 받아오기
df = bq_hook.get_pandas_df(sql=sql)
logger.info("BigQuery에서 데이터 조회 완료")
# CSV로 변환 (메모리 상에서 처리)
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
# S3에 업로드
s3_hook = S3Hook(aws_conn_id='aws_default')
bucket_name =
s3_key =
s3_hook.load_string(
string_data=csv_buffer.getvalue(),
key=s3_key,
bucket_name=bucket_name,
replace=True
)
logger.info(f"S3 업로드 완료: s3://{bucket_name}/{s3_key}")
# DAG 정의
with DAG(
dag_id='dag_export_bigquery_to_s3',
start_date=pendulum.today('UTC').add(days=-1),
schedule=None,
catchup=False,
tags=['bigquery', 's3', 'export']
) as dag:
export_task = PythonOperator(
task_id='export_bigquery_to_s3',
python_callable=export_bigquery_to_s3
)
redshift connection을 위해서 airflow에 DB 정보등록

이로써 3개의 connection이 완성되었다.