๐Ÿ“ฆ Airflow ( CSVํŒŒ์ผ์„ MySQL์— ์ ์žฌ )

๋ฌธํ•ดํ”ผยท2023๋…„ 6์›” 3์ผ
1

airflow

๋ชฉ๋ก ๋ณด๊ธฐ
5/5

์ด์ „์— ์ž‘์„ฑํ–ˆ๋˜ ๊ฒŒ์‹œ๋ฌผ์—์„œ๋Š” data๋ผ๋Š” ํด๋”์— csvํŒŒ์ผ๋กœ ์ €์žฅ๋งŒ ํ–ˆ๋‹ค๋ฉด
์ด์ œ ์ €์žฅ๋œ csvํŒŒ์ผ์„ Airflow์— connectionํ•œ MySQL์— ์ ์žฌ๋ฅผ ํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

์ฒ˜์Œ mysql์— ์ ์žฌ๋ฅผ ํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋กœ์ปฌ์— ์žˆ๋Š” mysql์„ ์“ฐ๋ ค๊ณ  ํ•˜๋‹ค๋ณด๋‹ˆ ๊ฐ„๋‹จํ•˜๊ฒŒ ์ƒ๊ฐํ•ด HOST์— localhost๋ฅผ ์“ฐ๋ฉด ๋ ๊ฒƒ์ด๋ผ๊ณ  ์ƒ๊ฐ์„ ํ–ˆ์Šต๋‹ˆ๋‹ค.
ํ•˜์ง€๋งŒ ๊ทธ๋ ‡๊ฒŒ ๊ฐ„๋‹จํ–ˆ๋‹ค๋ฉด ์ธ์ƒ์ด ์ •๋ง ์žฌ๋ฏธ๊ฐ€ ์—†์—ˆ์„๋“ฏ ํ•ฉ๋‹ˆ๋‹ค.
์ œ๊ฐ€ ๋ถ€์กฑํ•˜๊ณ  ์•„์ง ๋งŽ์ด ๋ชฐ๋ผ์„œ ์ด๋Ÿฐ์ผ์ด ๋ฐœ์ƒํ–ˆ๋Š”์ง€๋Š” ๋ชจ๋ฅด๊ฒ ์Šต๋‹ˆ๋‹ค.

๐Ÿ–‡๏ธ Airflow์— MySQL connectionํ•˜๊ธฐ

์‰ฝ๊ฒŒ ๊ณผ์ •์„ ์„ค๋ช…ํ•˜์ž๋ฉด airflow์˜ ์›นUI์—์„œ Admin - connections์—์„œ mysql์„ ์ถ”๊ฐ€ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

๐Ÿ–‡๏ธ 1. ์ž์‹ ์ด ์“ฐ๊ณ ์žˆ๋Š” IP๋ฅผ ์•Œ์•„๋ณด๊ธฐ

ifconfig | grep inet

์ €๋Š” mac์„ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๊ธฐ๋•Œ๋ฌธ์— ipconfig์“ฐ๋Š”๊ฒƒ ๋Œ€์‹ ์— ifconfig๋ฅผ ํ†ตํ•ด์„œ ๋‚˜์˜จ ip์ค‘ inet์„ ํ†ตํ•ด ํ˜„์žฌ ์“ฐ๊ณ ์žˆ๋Š” ip๋ฅผ ์ฐพ์•˜์Šต๋‹ˆ๋‹ค.

๐Ÿ–‡๏ธ 2. MySQL ํ™˜๊ฒฝ ์กฐ์„ฑ

์ €๋Š” air๋ผ๋Š” ์ด๋ฆ„์œผ๋กœ ๊ณ„์ •์„ ์ƒˆ๋กœ ๋งŒ๋“ค์—ˆ์Šต๋‹ˆ๋‹ค.

  1. MySQL์— ์ ‘์†ํ•ด์„œ ์œ ์ €๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    CREATE USER air;
    
  2. ๋น„๋ฐ€๋ฒˆํ˜ธ๋ฅผ ๋ถ€์—ฌํ•ฉ๋‹ˆ๋‹ค.

    CREATE USER 'air'@'localhost' identified by '๋น„๋ฐ€๋ฒˆํ˜ธ';
  3. ์™ธ๋ถ€์˜ ์ ‘๊ทผ์„ ํ—ˆ์šฉํ•˜๊ธฐ์œ„ํ•ด host๋ฅผ '%'๋กœ ๋ฐ”๊ฟ”์ค๋‹ˆ๋‹ค.

    CREATE USER 'air'@'%' identified by '๋น„๋ฐ€๋ฒˆํ˜ธ';
  4. ์ด๋ ‡๊ฒŒ ๋งŒ๋“ค์–ด์ง„ ๊ณ„์ •์œผ๋กœ ๋“ค์–ด๊ฐ€์ค๋‹ˆ๋‹ค.
    mysql -u air -p

  5. aladin์ด๋ผ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ๋งŒ๋“ค์–ด์ฃผ๊ณ  ์›๋ž˜ ํฌ๋กค๋งํ–ˆ๋˜ ๋ฐ์ดํ„ฐ๋“ค์˜ ์ปฌ๋Ÿผ๊ณผ ๋งž๊ฒŒ mytable์ด๋ผ๋Š” ํ…Œ์ด๋ธ”๋„ ๋งŒ๋“ค์–ด ์ค๋‹ˆ๋‹ค.

  6. my.cnf์— bind-address๋ฅผ ์™ธ๋ถ€์—์„œ ๋“ค์–ด์˜ฌ์ˆ˜ ์žˆ๋„๋ก 0.0.0.0์œผ๋กœ ๋ฐ”๊ฟ”์ฃผ์—ˆ์Šต๋‹ˆ๋‹ค.
    ํ•˜์ง€๋งŒ ์ด ๋ฐฉ๋ฒ•์€ ๋ณด์•ˆ์ƒ ๋งค์šฐ๋งค์šฐ ์•ˆ์ข‹์€๊ฑฐ ๊ฐ™์Šต๋‹ˆ๋‹ค. ๋‹ค๋ฅธ๋ฐฉ๋ฒ•์„ ์ฐพ์•„๋ด์•ผ๊ฒ ์–ด์š”.

๐Ÿ–‡๏ธ 3. Airflow์— connectionํ•˜๊ธฐ

Airflow ์›นUI๋ฅผ ํ†ตํ•ด๋“ค์–ด์™€ Admin - Connections์— ๋“ค์–ด์˜ค๊ฒŒ ๋˜๋ฉด ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํˆด๋“ค์„ ์ปค๋„ฅ์…˜ ํ• ์ˆ˜ ์žˆ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
์šฐ๋ฆฌ๋Š” ์ƒˆ๋กœ์šด MySQL์„ ์—ฐ๊ฒฐํ•ด์•ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์™ผ์ชฝ์— ํŒŒ๋ž€๋ฒ„ํŠผ์œผ๋กœ ์žˆ๋Š” + ๋ฒ„ํŠผ์„ ๋ˆŒ๋Ÿฌ์„œ ์ถ”๊ฐ€ํ•ด ์ค์‹œ๋‹ค.

์ด๋Ÿฐ ํ™”๋ฉด์ด ๋‚˜์˜ค๊ฒŒ ๋˜๋Š”๋ฐ ์ž‘์„ฑํ•ด์•ผ ํ•  ๊ฒƒ๋“ค์„ ํ•˜๋‚˜์”ฉ ์„ค๋ช…๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค.

  • Connection Id : ์ปค๋„ฅ์…˜์˜ ์ด๋ฆ„๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. ์ถ”ํ›„์— ์Šคํฌ๋ฆฝํŠธ์— ์ด ID๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•ฉ๋‹ˆ๋‹ค.
  • Connection Type : ์—ฐ๊ฒฐํ•  ์ปค๋„ฅ์…˜์˜ ์ข…๋ฅ˜๋ฅผ ๋งํ•ฉ๋‹ˆ๋‹ค. (์˜ˆ : mysql, aws ๋“ฑ๋“ฑ)
  • Host : ์‚ฌ์šฉํ•˜๋Š” IP๋‚˜ endpoint๋ฅผ ์—ฌ๊ธฐ์— ์ž…๋ ฅํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.
  • Schema : ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์ด๋ฆ„์„ ๋งํ•ฉ๋‹ˆ๋‹ค.
  • Login : ๊ณ„์ • ์ด๋ฆ„์ž…๋‹ˆ๋‹ค.
  • Password : ๊ณ„์ •์˜ ๋น„๋ฐ€๋ฒˆํ˜ธ์ž…๋‹ˆ๋‹ค.

์œ„์˜ ๋ชฉ๋ก๋“ค์„ ๋‹ค ์ž‘์„ฑํ•˜๊ณ  ๋‚˜๋ฉด ์ปค๋„ฅ์…˜์ด ์™„๋ฃŒ๊ฐ€ ๋ฉ๋‹ˆ๋‹ค.

์ œ๊ฐ€ ์•„์ง ๋ถ€์กฑํ•ด์„œ ์กฐ์–ธ์„ ๊ตฌํ•˜๊ณ  ์‹ถ์Šต๋‹ˆ๋‹ค.
๐Ÿ’ก ๋‹ค๋ฅธ๋ถ„๋“ค์ด ํ•˜๋Š”๊ฑธ ๋ณด๋ฉด Host์— localhost๋ฅผ ์ž‘์„ฑ ํ•˜์‹œ๋˜๋ฐ ์ €๋Š” ์™œ ์•ˆ๋˜๋Š”๊ฑธ๊นŒ์š”??


โš™๏ธ DAG ์ž‘์„ฑ

์•ž๋‹จ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜๋Š” ์ฝ”๋“œ๋Š” ์•ž์˜ ๊ฒŒ์‹œ๊ธ€๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

from airflow.hooks.base_hook import BaseHook
from sqlalchemy import create_engine
import pymysql

def read_csv_and_store_in_mysql():
    # CSV ํŒŒ์ผ์„ ์ฝ์–ด DataFrame์œผ๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
    today = str(date.today()).replace("-", "")
    df = pd.read_csv(f"/home/airflow/data/aladin{today}.csv")

    connection=BaseHook.get_connection('airsql') #airflow์—์„œ connectionํ•œ mysql id
    database_username=connection.login
    database_password=connection.password
    database_ip = connection.host
    database_port = connection.port
    database_name = connection.schema
    
    database_connection = f"mysql+pymysql://{database_username}:{database_password}@{database_ip}:{database_port}/{database_name}"

    engine = create_engine(database_connection)

    # ๋ฐ์ดํ„ฐ๋ฅผ MySQL์˜ ์ƒˆ ํ…Œ์ด๋ธ”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ์กด์— ์žˆ๋Š” ํ…Œ์ด๋ธ”์ด๋ผ๋ฉด, ๋ฐ์ดํ„ฐ๋ฅผ ๋ฎ์–ด์“ฐ๊ฑฐ๋‚˜ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    df.to_sql(con=engine, name='mytable', if_exists='append', index=False)
    
task_read_csv_and_store_in_mysql = PythonOperator(
	task_id='read_csv_and_store_in_mysql',
	python_callable=read_csv_and_store_in_mysql,
	dag=dag,
)

์ €๋Š” alchemy๋ฅผ ํ†ตํ•ด์„œ MySQL์„œ๋ฒ„์™€ ์—ฐ๊ฒฐํ•˜์˜€์Šต๋‹ˆ๋‹ค.
๊ทธ๋ฆฌ๊ณ  ์•„๊นŒ connectionํ–ˆ์„๋•Œ ๋ณด์•˜๋˜ ๊ฒƒ๋“ค์ด ๋งŽ์€๋ฐ์š”.
BaseHook.get_connection('connectionID')๋ฅผ ํ†ตํ•ด์„œ connectionํ•œ ๋‚ด์šฉ์„ ๋ถˆ๋Ÿฌ์™”์Šต๋‹ˆ๋‹ค.


๐Ÿ—’๏ธ ๊นจ๋‹ณ์€๊ฒƒ

csvํŒŒ์ผ๋กœ ์ €์žฅํ•ด์„œ ์ ์žฌํ•˜๋Š” ์ด์œ 

  • ์ฒ˜์Œ์—๋Š” ์ถ”์ถœํ•œ ๋ฐ์ดํ„ฐ๋“ค์„ ํ•˜๋‚˜์˜ ๋”•์…”๋„ˆ๋ฆฌ์— ๋‹ด์€ํ›„ ๋‹ค์Œ ํ•จ์ˆ˜๋กœ ๋„˜๊ฒจ์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ํ•œ row์”ฉ insert ํ•˜๋Š” ๋ฐฉ์‹์„ ํ•˜์˜€์Šต๋‹ˆ๋‹ค.
    ํ•˜์ง€๋งŒ ํ–ฅํ›„ ๋ฐ์ดํ„ฐ์˜ ์–‘์ด ๋งŽ์•„์ง€๊ฒŒ ๋œ๋‹ค๋ฉด sql์ž์ฒด์˜ ์†๋„๊ฐ€ ์—„์ฒญ๋‚˜๊ฒŒ ๋Š๋ ค์ง€๋ฉด์„œ ์—…๋ฌด์— ์ง€์žฅ์„ ์ค„์ˆ˜ ์žˆ๋‹ค๊ณ  ์ƒ๊ฐํ•ด ์ถ”์ถœํ•œ ๋ฐ์ดํ„ฐ๋ฅผ csvํŒŒ์ผ์— ์ €์žฅํ•˜๊ณ  ๊ทธ๊ฒƒ์„ ํ•œ๋ฒˆ์— ์ ์žฌํ•ด์•ผ ์†๋„๊ฐ€ ๋Š๋ ค์ง€๋Š”๊ฒƒ์„ ๋ฐฉ์ง€ํ• ์ˆ˜ ์žˆ๋‹ค๋Š”๊ฒƒ์„ ๋ฐฐ์› ์Šต๋‹ˆ๋‹ค.

๐Ÿ—’๏ธ ํ–ฅํ›„๊ณ„ํš

  • ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ ๋ฒ ์ด์Šค๊ฐ€ ์•„๋‹Œ csvํŒŒ์ผ ์—†์ด NoSQL์— ์ ์žฌํ•˜๊ธฐ.
  • NoSQL์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ์ „์ฒ˜๋ฆฌ๋ฅผ ํ•ด์•ผํ•˜๋Š”์ง€ ์•ˆํ•ด์•ผํ•˜๋Š”์ง€ ์•Œ์•„๋ณด๊ธฐ.
profile
ํ–‰๋ณตํ•˜๋ ค๊ณ  ๊ฐœ๋ฐœ๊ณต๋ถ€ํ•˜๋Š” ๋ฌธ๊ด‘์‹์˜ ๋กœ๊ทธํŒŒ์ผ์ž…๋‹ˆ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€