[Airflow]Postgres Operator

포동동·2023년 11월 13일
1

Postgres to S3

최근, 새로운 파이프라인을 짜면서 기존 RDB인 Postgres에서 raw 데이터가 쌓이는 S3로 보내는 DAG를 작성하던 중, operator를 커스텀 하였다. airflow를 정확히는 모르지만, 우선 하고봐야 하는 스타트업의 데엔으로써 얻은 잡지식 하나 공유하려 한다. 한국 블로그에도 많이 없고 외국에서도 postgres hook을 이용한 operator는 많이 공유되어 있지 않아 코드를 적어둔다.


Postgres_Hook

postgres도 다른 rdb처럼 airflow에서는 hook으로써 제공하고 있었다. [Github] [공식문서]

S3_Hook

s3는 모두가 알듯이 hook으로 제공되고 있다. [Github] [공식문서]


이 두 개의 hook을 이용하여 mysql_to_s3_operator를 커스텀 하여 아래와 같은 코드를 작성하였다.

# operators/postgres_to_s3_operator.py

import os

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults

import tempfile
from typing import Optional


class PostgresToS3Operator(BaseOperator):
    template_fields = ("_query", "_s3_key")

    @apply_defaults
    def __init__(
        self, 
        postgres_conn_id, query, 
        s3_conn_id, s3_bucket, s3_key, 
        pd_csv_kwargs: dict = None, 
        index: Optional[bool] = False, 
        *args, **kwargs
    ) -> None : 
    
        super().__init__(*args, **kwargs)
        self._postgres_conn_id = postgres_conn_id
        self._query = query
        self._s3_conn_id = s3_conn_id
        self._s3_bucket = s3_bucket
        self._s3_key = s3_key
        self._pd_csv_kwargs = pd_csv_kwargs

        if not self._pd_csv_kwargs:
            self._pd_csv_kwargs = {}

        if "index" not in self._pd_csv_kwargs:
            self._pd_csv_kwargs["index"] = index

    def execute(self, context):
        postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
        s3_hook = S3Hook(aws_conn_id=self._s3_conn_id)
        
        data_df = postgres_hook.get_pandas_df(self._query)
        
        with tempfile.NamedTemporaryFile(mode='r+', suffix='.csv') as tmp_csv:
            tmp_csv.file.write(data_df.to_csv(**self._pd_csv_kwargs))
            tmp_csv.file.seek(0)
            s3_hook.load_file(filename=tmp_csv.name,
                            key=self._s3_key,
                            bucket_name=self._s3_bucket)

        if s3_hook.check_for_key(self._s3_key, bucket_name=self._s3_bucket):
            file_location = os.path.join(self._s3_bucket, self._s3_key)
            self.log.info("File saved correctly in %s", file_location)

# dags/postgres_to_s3.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from operators.postgres_to_s3_operator import PostgresToS3Operator


# Default Arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 11, 7),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Postgres Connection
pg_conn_id = 'wein_mart'
pg_schema = 'winflex'

# S3 Connection
year = datetime.today().year
month = datetime.today().month
day = datetime.today().day

s3_conn_id = ''
s3_bucket = ''
s3_path = f'{경로}/{year}/{month}/{day}'

# Create a PostgresHook for PostgreSQL connection
pg_hook = PostgresHook(postgres_conn_id=pg_conn_id)
tables = pg_hook.get_records(f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{pg_schema}'")

# Define your DAG
with DAG(
    dag_id='postgres_to_s3',
    schedule_interval="@daily",
    default_args=default_args,
    tags=["example"],
    catchup=False
) as dag:
    for table in tables:
        table_name = table[0]
        task_id = f"export_table_{table_name}"
        export_task = PostgresToS3Operator(
            task_id=task_id,
            postgres_conn_id=pg_conn_id,
            query=f"SELECT * FROM {pg_schema}.{table_name}",
            s3_conn_id=s3_conn_id,
            s3_bucket=s3_bucket,
            s3_key=f"{s3_path}/{table_name}.csv",
        )
        
        # Set task dependencies
        export_task

주의 사항

구글링을 하다 보면, operator 만들 때 postgres_hook 이용하는 부분에 data_df = postgres_hook.get_pandas_df(self._query)가 아닌 results = postgres_hook.get_records(self._query)를 쓰는 경우가 있는데, 후자의 경우엔 컬럼명이 따라오지 않는다. 따라서 컬럼이 이미 정해져있고 데이터만 옮길 때는 사용할 수 있지만, 나는 raw 데이터를 s3에 저장하고 다시 컬럼명 중 하나로 날짜 파티셔닝을 해야해서 pandas dataframe으로 변환하여 가져오는 메서드를 이용했다.


이 코드를 짜면서 현재 버전의 airflow 깃헙이 아니라 그 전 버전들을 많이 참고하였는데, 그냥 개인적인 느낌으로 지금 airflow 깃헙보다는 예전 버전이 좀 더 직관적이고 보기 좋았던 것 같다... 지금은 공식문서 옆에 꼬옥 끼고 안 보면 보기 어렵당..

profile
완료주의

0개의 댓글