이전 실습
이전 실습은 crontab을 사용하였다면, 이를 airflow DAG으로 생성해보기
과제
1) 이전에 만든 DB table 에서 parquet 파일 생성
2) 만들어진 parquet 파일을 서버1로 전송
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum
import psycopg2
import pandas as pd
with DAG(
dag_id = "move_file_2_server",
schedule= "* * * * *",
start_date=pendulum.datetime(2024,7,19, tz = "Asia/Seoul"),
catchup=False
) as dag:
@task(task_id = "make_file")
def make_file():
host = '127.0.0.1'
port = 5432
user = 'postgres'
pw = 'postgres'
db = 'login_db'
table = 'userlogin_20240527'
sqlquery = 'SELECT * FROM {}'.format(table)
save_path = '/home/ubuntu/work/pyscript/data/'
file_name = table + '.parquet'
conn = psycopg2.connect (host = host,
port = port,
dbname = db,
user = user,
password = pw)
cur = conn.cursor()
cur.execute(sqlquery)
# column name
colnames = [element[0] for element in cur.description]
# dataframe
data = cur.fetchall()
df = pd.DataFrame(data, columns = colnames)
cur.close()
conn.close()
df.to_parquet(path = save_path + file_name)
move_file = BashOperator(
task_id = "move_file",
env={
'destination_server': 'ubuntu@server1_내부IP',
'source_path': '/home/ubuntu/work/pyscript/data/userlogin_20240717.parquet',
'destination_path': '/home/ubuntu/work/spark01/data'
},
bash_command='scp "$source_path" "$destination_server:$destination_path"',
)
make_file() >> move_file