오늘은 이기종간의 postgres db의 데이터를
etl 해보자
상황은 A라는 서버의 postgres에서 데이터를 읽어
airflow가 설치된 etl 서버에서 csv파일을 내리고
B라는 postgres서버로 데이터를 옮기는 것이다.
pip install apache-airflow-providers-postgres
from datetime import datetime, timedelta
import logging
from textwrap import dedent
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresHook
LOGGER = logging.getLogger(__name__)
POSTGRES_CONN_ID = "postgres_default"
FILE_PATH = "/temp/dataset.csv"
PG_HOOK = PostgresHook.get_hook(POSTGRES_CONN_ID)
SEPERATOR = "\t"
def pg_extract(sql):
PG_HOOK.copy_expert(sql, filename=FILE_PATH)
LOGGER.info("success extract data")
def pg_load(target_table, fields):
with open(FILE_PATH, "r", encoding="utf-8") as file:
data = [line.replace("\n", "").split(SEPERATOR) for line in file.readlines()]
PG_HOOK.insert_rows(
table=target_table,
rows=data,
replace=False,
target_fields=fields
)
LOGGER.info("success load data")
with DAG(
"etl",
description="etl",
start_date=datetime(2022, 12, 25),
schedule=timedelta(days=1),
tags=["etl"],
) as dag:
dag.doc_md = __doc__
dag.doc_md = """
etl test
"""
t1 = PythonOperator(
task_id="extract",
python_callable=pg_extract,
op_kwargs={
"sql": f"""
COPY (
SELECT id
, keyword
, created_at
FROM extn.KEYWORD
) TO STDOUT WITH
DELIMITER '{SEPERATOR}'
"""
}
)
t1.doc_md = dedent(
"""
# extract
"""
)
t2 = PythonOperator(
task_id="load",
python_callable=pg_load,
op_kwargs={
"target_table": "extn.keyword2",
"fields": ["id", "keyword", "created_at"]
}
)
t2.doc_md = dedent(
"""
# load
"""
)
t1 >> t2