์ด์ ์ ์์ฑํ๋ ๊ฒ์๋ฌผ์์๋ data๋ผ๋ ํด๋์ csvํ์ผ๋ก ์ ์ฅ๋ง ํ๋ค๋ฉด
์ด์ ์ ์ฅ๋ csvํ์ผ์ Airflow์ connectionํ MySQL์ ์ ์ฌ๋ฅผ ํด๋ณด๊ฒ ์ต๋๋ค.
์ฒ์ mysql์ ์ ์ฌ๋ฅผ ํ๊ธฐ ์ํด์ ๋ก์ปฌ์ ์๋ mysql์ ์ฐ๋ ค๊ณ ํ๋ค๋ณด๋ ๊ฐ๋จํ๊ฒ ์๊ฐํด HOST์ localhost๋ฅผ ์ฐ๋ฉด ๋ ๊ฒ์ด๋ผ๊ณ ์๊ฐ์ ํ์ต๋๋ค.
ํ์ง๋ง ๊ทธ๋ ๊ฒ ๊ฐ๋จํ๋ค๋ฉด ์ธ์์ด ์ ๋ง ์ฌ๋ฏธ๊ฐ ์์์๋ฏ ํฉ๋๋ค.
์ ๊ฐ ๋ถ์กฑํ๊ณ ์์ง ๋ง์ด ๋ชฐ๋ผ์ ์ด๋ฐ์ผ์ด ๋ฐ์ํ๋์ง๋ ๋ชจ๋ฅด๊ฒ ์ต๋๋ค.
์ฝ๊ฒ ๊ณผ์ ์ ์ค๋ช ํ์๋ฉด airflow์ ์นUI์์ Admin - connections์์ mysql์ ์ถ๊ฐํ๋ฉด ๋ฉ๋๋ค.
ifconfig | grep inet
์ ๋ mac์ ์ฌ์ฉํ๊ณ ์๊ธฐ๋๋ฌธ์ ipconfig์ฐ๋๊ฒ ๋์ ์ ifconfig๋ฅผ ํตํด์ ๋์จ ip์ค inet์ ํตํด ํ์ฌ ์ฐ๊ณ ์๋ ip๋ฅผ ์ฐพ์์ต๋๋ค.
์ ๋ air๋ผ๋ ์ด๋ฆ์ผ๋ก ๊ณ์ ์ ์๋ก ๋ง๋ค์์ต๋๋ค.
MySQL์ ์ ์ํด์ ์ ์ ๋ฅผ ๋ง๋ญ๋๋ค.
CREATE USER air;
๋น๋ฐ๋ฒํธ๋ฅผ ๋ถ์ฌํฉ๋๋ค.
CREATE USER 'air'@'localhost' identified by '๋น๋ฐ๋ฒํธ';
์ธ๋ถ์ ์ ๊ทผ์ ํ์ฉํ๊ธฐ์ํด host๋ฅผ '%'๋ก ๋ฐ๊ฟ์ค๋๋ค.
CREATE USER 'air'@'%' identified by '๋น๋ฐ๋ฒํธ';
์ด๋ ๊ฒ ๋ง๋ค์ด์ง ๊ณ์ ์ผ๋ก ๋ค์ด๊ฐ์ค๋๋ค.
mysql -u air -p
aladin์ด๋ผ๋ ์คํค๋ง๋ฅผ ๋ง๋ค์ด์ฃผ๊ณ ์๋ ํฌ๋กค๋งํ๋ ๋ฐ์ดํฐ๋ค์ ์ปฌ๋ผ๊ณผ ๋ง๊ฒ mytable์ด๋ผ๋ ํ ์ด๋ธ๋ ๋ง๋ค์ด ์ค๋๋ค.
my.cnf์ bind-address๋ฅผ ์ธ๋ถ์์ ๋ค์ด์ฌ์ ์๋๋ก 0.0.0.0์ผ๋ก ๋ฐ๊ฟ์ฃผ์์ต๋๋ค.
ํ์ง๋ง ์ด ๋ฐฉ๋ฒ์ ๋ณด์์ ๋งค์ฐ๋งค์ฐ ์์ข์๊ฑฐ ๊ฐ์ต๋๋ค. ๋ค๋ฅธ๋ฐฉ๋ฒ์ ์ฐพ์๋ด์ผ๊ฒ ์ด์.
Airflow ์นUI๋ฅผ ํตํด๋ค์ด์ Admin - Connections์ ๋ค์ด์ค๊ฒ ๋๋ฉด ์ฌ๋ฌ๊ฐ์ง ํด๋ค์ ์ปค๋ฅ์
ํ ์ ์๊ฒ ๋ฉ๋๋ค.
์ฐ๋ฆฌ๋ ์๋ก์ด MySQL์ ์ฐ๊ฒฐํด์ผํ๊ธฐ ๋๋ฌธ์ ์ผ์ชฝ์ ํ๋๋ฒํผ์ผ๋ก ์๋ + ๋ฒํผ์ ๋๋ฌ์ ์ถ๊ฐํด ์ค์๋ค.
์ด๋ฐ ํ๋ฉด์ด ๋์ค๊ฒ ๋๋๋ฐ ์์ฑํด์ผ ํ ๊ฒ๋ค์ ํ๋์ฉ ์ค๋ช ๋๋ฆฌ๊ฒ ์ต๋๋ค.
์์ ๋ชฉ๋ก๋ค์ ๋ค ์์ฑํ๊ณ ๋๋ฉด ์ปค๋ฅ์ ์ด ์๋ฃ๊ฐ ๋ฉ๋๋ค.
์ ๊ฐ ์์ง ๋ถ์กฑํด์ ์กฐ์ธ์ ๊ตฌํ๊ณ ์ถ์ต๋๋ค.
๐ก ๋ค๋ฅธ๋ถ๋ค์ด ํ๋๊ฑธ ๋ณด๋ฉด Host์ localhost๋ฅผ ์์ฑ ํ์๋๋ฐ ์ ๋ ์ ์๋๋๊ฑธ๊น์??
์๋จ์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ๋ ์ฝ๋๋ ์์ ๊ฒ์๊ธ๊ณผ ๊ฐ์ต๋๋ค.
from airflow.hooks.base_hook import BaseHook
from sqlalchemy import create_engine
import pymysql
def read_csv_and_store_in_mysql():
# CSV ํ์ผ์ ์ฝ์ด DataFrame์ผ๋ก ๋ณํํฉ๋๋ค.
today = str(date.today()).replace("-", "")
df = pd.read_csv(f"/home/airflow/data/aladin{today}.csv")
connection=BaseHook.get_connection('airsql') #airflow์์ connectionํ mysql id
database_username=connection.login
database_password=connection.password
database_ip = connection.host
database_port = connection.port
database_name = connection.schema
database_connection = f"mysql+pymysql://{database_username}:{database_password}@{database_ip}:{database_port}/{database_name}"
engine = create_engine(database_connection)
# ๋ฐ์ดํฐ๋ฅผ MySQL์ ์ ํ
์ด๋ธ์ ์ ์ฅํฉ๋๋ค. ๊ธฐ์กด์ ์๋ ํ
์ด๋ธ์ด๋ผ๋ฉด, ๋ฐ์ดํฐ๋ฅผ ๋ฎ์ด์ฐ๊ฑฐ๋ ์ถ๊ฐํ ์ ์์ต๋๋ค.
df.to_sql(con=engine, name='mytable', if_exists='append', index=False)
task_read_csv_and_store_in_mysql = PythonOperator(
task_id='read_csv_and_store_in_mysql',
python_callable=read_csv_and_store_in_mysql,
dag=dag,
)
์ ๋ alchemy๋ฅผ ํตํด์ MySQL์๋ฒ์ ์ฐ๊ฒฐํ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ์๊น connectionํ์๋ ๋ณด์๋ ๊ฒ๋ค์ด ๋ง์๋ฐ์.
BaseHook.get_connection('connectionID')๋ฅผ ํตํด์ connectionํ ๋ด์ฉ์ ๋ถ๋ฌ์์ต๋๋ค.