spark, airflow

yoon__0_0·2024년 7월 19일
0

이어드림 수업

목록 보기
97/103

이전 실습
이전 실습은 crontab을 사용하였다면, 이를 airflow DAG으로 생성해보기

과제
1) 이전에 만든 DB table 에서 parquet 파일 생성
2) 만들어진 parquet 파일을 서버1로 전송

postgres-airflow 연결 전

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum
import psycopg2
import pandas as pd

with DAG(
    dag_id = "move_file_2_server",
    schedule= "* * * * *",
    start_date=pendulum.datetime(2024,7,19, tz = "Asia/Seoul"),
    catchup=False
) as dag:

    @task(task_id = "make_file")
    def make_file():
        host = '127.0.0.1'
        port = 5432
        user = 'postgres'
        pw  = 'postgres'
        db = 'login_db'
        table = 'userlogin_20240527'
        sqlquery = 'SELECT * FROM {}'.format(table)
        save_path = '/home/ubuntu/work/pyscript/data/'
        file_name = table + '.parquet'

        conn = psycopg2.connect (host = host,
                                port = port,
                                dbname = db,
                                user = user,
                                password = pw)

        cur = conn.cursor()
        cur.execute(sqlquery)

        # column name
        colnames = [element[0] for element in cur.description]

        # dataframe
        data = cur.fetchall()
        df = pd.DataFrame(data, columns = colnames)

        cur.close()
        conn.close()

        df.to_parquet(path = save_path + file_name)

    move_file = BashOperator(
        task_id = "move_file",
        env={
        'destination_server': 'ubuntu@server1_내부IP',
        'source_path': '/home/ubuntu/work/pyscript/data/userlogin_20240717.parquet',
        'destination_path': '/home/ubuntu/work/spark01/data'
    	},
    	bash_command='scp "$source_path" "$destination_server:$destination_path"',
    )

    make_file() >> move_file
    

postgres - airflow 연결 후

profile
신윤재입니다

0개의 댓글