์ด๋ฒ ํฌ์คํ ์ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ์ฌ์ ์ ๊ฐ๋ ฅํ ๋ ๊ฐ๋ฅผ ๋ฌ์์ค Airflow์ ๋ํ ๋ชจ๋ ๊ฒ์ ๋ด์๋ค. ๊ธด ์คํฌ๋กค ์๋ฐ์ด ์์๋์ง๋ง, ๊ทธ๋งํผ ์์ฐฌ ์ ๋ณด๋ก ๊ฐ๋ ์ฑ์ฐ๋ ค๊ณ ๋ ธ๋ ฅํ๋ค! ๐ฅ
LLM ํ์ต์ ๋น ์ ธ ์ด๊ณ ์๋ ์ฃผ๋์ด ๊ฐ๋ฐ์๋ก์, ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๊ณผ์ ์์ ์๋ง์ ์ด๋ ค์์ ๊ฒช์๋ค. ์๋์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฎ๊ธฐ๊ณ , ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๊ณ , ์ค๋ฅ๋ฅผ ๊ฐ์ํ๋ ์ผ์ ์ ๋ง ๊ณ ๋๋ค. ๐ตโ๐ซ
์ด๋ฐ ๋ณต์กํ๊ณ ๋ฐ๋ณต์ ์ธ ์์ ์ ์๋ํํ ์ ์๋ ๋ฐฉ๋ฒ์ด ์์๊น? ๊ณ ๋ฏผํ๋ ์ค ๋์ ๋ค์ด์จ ๊ฒ์ด ๋ฐ๋ก Airflow!
Airflow๋ ๋ณต์กํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ ์, ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋งํ ์ ์๋ ๊ฐ๋ ฅํ ์คํ์์ค ํ๋ซํผ์ด๋ค. ๋ฐ์ดํฐ ์์ง๋์ด๋ง์ ํ์ ๋๊ตฌ๋ก ์๋ฆฌ๋งค๊นํ๊ณ ์์ผ๋ฉฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์๋ํ์ ํต์ฌ ๊ธฐ์ ์ด๋ผ๊ณ ํ ์ ์์ !!
Airflow๊ฐ ํ์ํ ์ด์ :
Airflow๋ฅผ ์ ๋๋ก ํ์ฉํ๊ธฐ ์ํด์๋ ๋ช ๊ฐ์ง ํต์ฌ ๊ฐ๋ ์ ์ดํดํด์ผ ํ๋ค. ๋ง์น ๋ ๊ณ ๋ธ๋ก์ฒ๋ผ, ์ด ๊ฐ๋ ๋ค์ ์กฐํฉํ์ฌ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๋ง๋ค ์ ์๋ ๊ฒ !
- DAG (Directed Acyclic Graph): Airflow์ ํต์ฌ ๋จ์๋ก, ์์ (task)๋ค์ ์์กด ๊ด๊ณ๋ฅผ ๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ(DAG) ํํ๋ก ํํ. DAG๋ ํ์ดํ๋ผ์ธ์ ๋ ผ๋ฆฌ์ ๊ตฌ์กฐ๋ฅผ ์ ์.
- Task: DAG๋ฅผ ๊ตฌ์ฑํ๋ ๊ฐ๋ณ ์์ ๋จ์๋ฅผ ์๋ฏธ. (๋ฐ์ดํฐ ์ถ์ถ, ๋ณํ, ๋ก๋ ๋ฑ)
- Operator: Task๊ฐ ์ํํ๋ ์ค์ ์์ ์ ์ ์. Airflow๋ ๋ค์ํ ์ข ๋ฅ์ Operator๋ฅผ ์ ๊ณตํ๋ฉฐ, ์ฌ์ฉ์ ์ ์ Operator๋ฅผ ๋ง๋ค ์๋ ์์. (BashOperator, PythonOperator, PostgresOperator ๋ฑ)
- Scheduler: ์ ์๋ DAG๋ฅผ ์ค์ผ์ค์ ๋ฐ๋ผ ์คํํ๋ ์ญํ . Cron ํํ์์ ์ฌ์ฉํ์ฌ ์คํ ์ฃผ๊ธฐ๋ฅผ ์ค์ ํ ์ ์๋ค.
- Executor: Task๋ฅผ ์คํํ๋ ์์ง. (LocalExecutor, CeleryExecutor, KubernetesExecutor ๋ฑ)
- Web UI: Airflow ์น ์ธํฐํ์ด์ค๋ฅผ ํตํด DAG ์คํ ์ํ๋ฅผ ๋ชจ๋ํฐ๋งํ๊ณ , ๋ก๊ทธ๋ฅผ ํ์ธํ๊ณ , ์ค์ผ์ค์ ๊ด๋ฆฌ
Airflow๋ ๋ฐ์ดํฐ ์์ง๋์ด๋ง์ ๋ค์ํ ๋ถ์ผ์์ ํ์ฉ๋ ์ ์๋ค. ๋ช ๊ฐ์ง ์์๋ฅผ ํตํด Airflow์ ํ์ฉ ๊ฐ๋ฅ์ฑ์ ์ดํด๋ณด์ !
- ๋ฐ์ดํฐ ์์ง ๋ฐ ํตํฉ: API, ๋ฐ์ดํฐ๋ฒ ์ด์ค, ํ์ผ ์์คํ ๋ฑ ๋ค์ํ ์์ค์์ ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ณ ํตํฉํ๋ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- ๋ฐ์ดํฐ ๋ณํ ๋ฐ ์ ์ : ์์ง๋ ๋ฐ์ดํฐ๋ฅผ ์ํ๋ ํํ๋ก ๋ณํํ๊ณ ์ ์ ํ๋ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ ์ ์๋ค. (๋ฐ์ดํฐ ๋๋ฝ ๊ฐ ์ฒ๋ฆฌ, ์ด์์น ์ ๊ฑฐ, ๋ฐ์ดํฐ ํ์ ๋ณํ ๋ฑ์ ์๋ํ)
- ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ ํ์ต: ๋ชจ๋ธ ํ์ต์ ํ์ํ ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ๊ณผ์ ์ ์๋ํํ๊ณ , ๋ชจ๋ธ ํ์ต ๋ฐ ๋ฐฐํฌ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค ๊ตฌ์ถ: ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๊ณ ๊ด๋ฆฌํ๋ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- BI (Business Intelligence) ๋ณด๊ณ ์ ์์ฑ: ๋ฐ์ดํฐ๋ฅผ ๋ถ์ํ์ฌ BI ๋ณด๊ณ ์๋ฅผ ์์ฑํ๊ณ ๋ฐฐํฌํ๋ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
Airflow ํ์ฉ ์์ (Python ์ฝ๋):
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_python_function():
print("Hello from Python!")
with DAG(
dag_id='my_first_airflow_dag',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
bash_task = BashOperator(
task_id='run_bash_command',
bash_command='echo "Hello from Bash!"'
)
python_task = PythonOperator(
task_id='run_python_function',
python_callable=my_python_function
)
bash_task >> python_task
์ ์ฝ๋๋ Bash ๋ช ๋ น์ด๋ฅผ ์คํํ๋ Task์ Python ํจ์๋ฅผ ์คํํ๋ Task๋ก ๊ตฌ์ฑ๋ ๊ฐ๋จํ DAG๋ค. ์ด ์ฝ๋๋ฅผ Airflow์ ๋ฑ๋กํ๋ฉด, ์ ์๋ ์์๋๋ก Task๊ฐ ์คํ๋๋ค.
pip install apache-airflow
์ด๋ ๊ฒ pip๋ก ์ง์ ์ค์นํ๊ฑฐ๋
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
airflow์์ ์ ๊ณตํ๋ docker-compose ํ์ผ์ ๋ฐ์์๋ ๋๋ค.
์ด๋ฐ๊ฑฐ ๋๋ฌด ๊ท์ฌ์
๋์ ๊ฒฝ์ฐ ๊ธฐ์กด์ ์ฌ์ฉํ๋ MySQL์์ PostgreSQL๋ก ์ด๋ํ๋ ์ฝ๋๋ฅผ ์์ฑํ๋ค.
version: '3.7' services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: source MYSQL_USER: user MYSQL_PASSWORD: password ports: - "3306:3306" volumes: - mysql_data:/var/lib/mysql redis: image: redis:7.2-bookworm ports: - "6379:6379" airflow-webserver: image: apache/airflow:2.7.1 environment: - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0 depends_on: - postgres - mysql - redis ports: - "8080:8080" volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins command: > bash -c " airflow db init && airflow users create --username airflow --password airflow --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver" airflow-scheduler: image: apache/airflow:2.7.1 depends_on: - postgres - redis volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins command: > bash -c " airflow scheduler" volumes: postgres_data: mysql_data:
๊ทธ ๋ค์ docker-compose up -d
๋ก ์ฌ๋ฆฌ๋ฉด ๊ธฐ๋ณธ ์ธํ
๋ !
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import mysql.connector import psycopg2 # MySQL์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ def transfer_data(): # MySQL ์ฐ๊ฒฐ mysql_conn = mysql.connector.connect( host="mysql", user="user", password="password", database="db" ) mysql_cursor = mysql_conn.cursor() mysql_cursor.execute("SELECT * FROM your_table LIMIT 100") rows = mysql_cursor.fetchall() # PostgreSQL ์ฐ๊ฒฐ postgres_conn = psycopg2.connect( host="postgres", user="airflow", password="airflow", database="airflow" ) postgres_cursor = postgres_conn.cursor() # PostgreSQL์ ๋ฐ์ดํฐ ์ฝ์ insert_query = """ INSERT INTO my_table (job_title, job_post, company) VALUES (%s, %s, %s) """ postgres_cursor.executemany(insert_query, rows) # ์ปค๋ฐ ๋ฐ ์ฐ๊ฒฐ ์ข ๋ฃ postgres_conn.commit() mysql_cursor.close() mysql_conn.close() postgres_cursor.close() postgres_conn.close() # DAG ์ ์ default_args = { "owner": "airflow", "depends_on_past": False, "email_on_failure": False, "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), } with DAG( "mysql_to_postgres", default_args=default_args, description="Transfer data from MySQL to PostgreSQL", schedule_interval="0 * * * *", # ๋งค์๊ฐ 0๋ถ์ ์คํ start_date=datetime(2025, 1, 1), catchup=False, ) as dag: transfer_task = PythonOperator( task_id="transfer_data", python_callable=transfer_data, )
๋งค ์๊ฐ๋ง๋ค 100๊ฐ์ฉ ์ฎ๊ธฐ๋๋ก ์ค์ ํด๋์๋ค.
yamlํ์ผ์์ ์ค์ ํ http://localhost:8080๋ก ์ ์ํด ๋ก๊ทธ์ธํ๋ฉด ๋ฐฐ์น ์์
์ ํ์ธํ ์ ์๋ค.
์ด๋ฐ์์ผ๋ก !
๊ฐ๋จํ ์์
์ด์ด์ ๋๋ ์กฐ๊ธ ๋ ์จ๋ด์ผ ์๊ฒ ์ง๋ง
ํ์ฉ๋๊ฐ ๋์ ํด์ด๋ค๋ณด๋ ๋ ๊น๊ฒ ๊ณต๋ถํด๋ด์ผ๊ฒ ๋ค๋ ์๊ฐ !!
๋์ค์๋ ๋ฐ์ดํฐ ๋ถ์์ ์ํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ๋ ๋ง๋ค์ด๋ด์ผ๊ฒ ๋ค.
Airflow๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ์๋ํ๋ฅผ ์ํ ๊ฐ๋ ฅํ ๋๊ตฌ๋ค. ๋ณต์กํ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์๋ํํ๊ณ , ํจ์จ์ฑ์ ๋์ด๋ฉฐ, ์์ ์ ์ธ ์์คํ ์ ๊ตฌ์ถํ ์ ์๋๋ก ๋์์ค๋ค. ์ฃผ๋์ด ์ ์ฅ์์ Airflow๋ฅผ ๋ฐฐ์ฐ๊ณ ํ์ฉํ๋ ๊ฒ์ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ๋ถ์ผ์ ์ฑ์ฅ์ ํฐ ๋์์ด ๋ ๊ฒ์ด๋ค.
๋๋ ์์ง Airflow๋ฅผ ๋ฐฐ์ฐ๋ ๊ณผ์ ์ ์์ง๋ง, ์์ผ๋ก๋ Airflow๋ฅผ ํ์ฉํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ ๊ฒฝํ์ ๊ณต์ ํ๋ฉฐ ๊ณ์ ์ฑ์ฅํด ๋๊ฐ์ผ๊ฒ ๋ค! ๐
๋ง๋ฌด๋ฆฌ:
์ด ํฌ์คํ ์ด Airflow์ ๋ํ ์ฌ๋ฌ๋ถ์ ์ดํด๋ฅผ ๋์ด๋ ๋ฐ ๋์์ด ๋์๊ธฐ๋ฅผ ๋ฐ๋๋๋ค. ๊ถ๊ธํ ์ ์ด ์๋ค๋ฉด ์ธ์ ๋ ์ง ๋๊ธ๋ก ์ง๋ฌธํด์ฃผ์ธ์. ๋ฐ์ดํฐ ์์ง๋์ด๋ง์ ์ธ๊ณ์์ ํจ๊ป ์ฑ์ฅํด ๋๊ฐ๋ ๋ฉ์ง ์ฌ์ ์ ๊ธฐ๋ํฉ๋๋ค! ๐ช