Airflow 활용

vencott·2023년 12월 8일

Airflow Connections

우리는 주로 ETL을 위해 Airflow를 활용하므로 외부 서비스와 연결을 해야하는 경우가 많다

Airflow Connection을 통해 이러한 연결 정보들을 쉽게 관리할 수 있다

상단 메뉴 Admin > Connections 화면에서 연결 정보들을 관리한다

Airflow Postgres Operator

  1. docker-compose.yaml 파일에 postgres port 정보를 추가한다
postgres:
    ports:
      - 5432:5432
  1. 아래 명령어를 통해 postgres 컨테이너를 재실행한다

docker-compose up -d --no-deps --build postgres

  1. dbevaer, datagrip 등의 DB 툴로 postgres DB와 연결한 뒤 test 데이터베이스를 생성한다

  2. airflow server에서 새로운 connection을 생성한다

  1. DAG를 작성한다
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

default_args = {
    'owner': 'vencott',
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='dag_with_postgres_operator_v03',
    default_args=default_args,
    start_date=datetime(2023, 6, 30),
    schedule_interval='@daily'
) as dag:
    task1 = PostgresOperator(
        task_id='create_postgres_table',
        postgres_conn_id='postgres_localhost',
        sql="""
            create table if not exists dag_runs (
                dt date,
                dag_id character varying,
                primary key (dt, dag_id)
            )
        """
    )

    # ds, dag.dag_id 등은 airflow에서 미리 정의된 값(airflow macros)
    task2 = PostgresOperator(
        task_id='insert_into_table',
        postgres_conn_id='postgres_localhost',
        sql="""
            insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id}}')
        """
    )

    # airflow 에서는 데이터 중복을 막기 위해 삭제하는 것을 권장한다
    task3 = PostgresOperator(
        task_id='delete_data_from_table',
        postgres_conn_id='postgres_localhost',
        sql="""
            delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id}}'
        """
    )

    task1 >> task3 >> task2
profile
Backend Developer

0개의 댓글