우리는 주로 ETL을 위해 Airflow를 활용하므로 외부 서비스와 연결을 해야하는 경우가 많다
Airflow Connection을 통해 이러한 연결 정보들을 쉽게 관리할 수 있다
상단 메뉴 Admin > Connections 화면에서 연결 정보들을 관리한다

postgres:
ports:
- 5432:5432
docker-compose up -d --no-deps --build postgres
dbevaer, datagrip 등의 DB 툴로 postgres DB와 연결한 뒤 test 데이터베이스를 생성한다
airflow server에서 새로운 connection을 생성한다

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_args = {
'owner': 'vencott',
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='dag_with_postgres_operator_v03',
default_args=default_args,
start_date=datetime(2023, 6, 30),
schedule_interval='@daily'
) as dag:
task1 = PostgresOperator(
task_id='create_postgres_table',
postgres_conn_id='postgres_localhost',
sql="""
create table if not exists dag_runs (
dt date,
dag_id character varying,
primary key (dt, dag_id)
)
"""
)
# ds, dag.dag_id 등은 airflow에서 미리 정의된 값(airflow macros)
task2 = PostgresOperator(
task_id='insert_into_table',
postgres_conn_id='postgres_localhost',
sql="""
insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id}}')
"""
)
# airflow 에서는 데이터 중복을 막기 위해 삭제하는 것을 권장한다
task3 = PostgresOperator(
task_id='delete_data_from_table',
postgres_conn_id='postgres_localhost',
sql="""
delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id}}'
"""
)
task1 >> task3 >> task2