ํธ๐คํญ๐คํญ
์ ๋ฒ ๊ฒ์๊ธ์์ ์์
ํ ํฌ๋กค๋ง์ ์ํํ๋ ํ์ด์ฌ ์ฝ๋์ ๋ช ๊ฐ์ง ํฌ๋กค๋ง ์ฝ๋๋ฅผ ์ข ๋ ์ถ๊ฐํ์ฌ
dag์ ๊ตฌ์ฑํ๋ค. dag ๊ตฌ์ฑ์ ๋ค์๊ณผ ๊ฐ์๋ค(์ด๋ฏธ S3 ์
๋ก๋๊น์ง ์์ฑ๋ dagํ์ผ์ด๋ค..). (์ด ๋งํฌ์ ๋ฐฉ๋ฌธํ๋ฉด dag ํ์ผ์ ํ์ธํ ์ ์๋ค.)
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.providers.amazon.aws.hooks.s3 import S3Hook # ์ถ๊ฐ
import sys, os
sys.path.append(os.getcwd())
from crawling_event import *
from crawling_velog import *
from crawling_contest import *
def upload_to_s3() :
date = datetime.now().strftime("%Y%m%d")
hook = S3Hook('de-project') # connection ID ์
๋ ฅ
event_filename = f'/home/ubuntu/airflow/airflow/data/event_{date}.csv'
velog_filename = f'/home/ubuntu/airflow/airflow/data/velog_{date}.csv'
contest_filename = f'/home/ubuntu/airflow/airflow/data/contest_{date}.csv'
event_key = f'data/event_{date}.csv'
velog_key = f'data/velog_{date}.csv'
contest_key = f'data/contest_{date}.csv'
bucket_name = 'de-project-airflow'
hook.load_file(filename=event_filename, key=event_key, bucket_name=bucket_name, replace=True)
hook.load_file(filename=velog_filename, key=velog_key, bucket_name=bucket_name, replace=True)
hook.load_file(filename=contest_filename, key=contest_key, bucket_name=bucket_name, replace=True)
default_args = {
'owner': 'owner-name',
'depends_on_past': False,
'email': ['youremail@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
}
dag_args = dict(
dag_id="crawling-upload",
default_args=default_args,
description='description',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 11, 29),
tags=['de-project'],
)
with DAG( **dag_args ) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "start"',
)
upload = PythonOperator(
task_id = 'upload',
python_callable = upload_to_s3
)
# -------- velog -------- #
velog_get_url_task = PythonOperator(
task_id='velog_get_url',
python_callable= velog_get_url
)
velog_get_info_task= PythonOperator(
task_id='velog_get_info',
python_callable= velog_get_info
)
# -------- event -------- #
event_get_data_task = PythonOperator(
task_id="event_get_data",
python_callable= event_get_data
)
# -------- contest -------- #
contest_task = PythonOperator(
task_id='contest_crawling',
python_callable=contest_crawling,
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete"',
)
start >> event_get_data_task >> upload >> complete
start >> velog_get_url_task >> velog_get_info_task >> upload >> complete
start >> contest_task >> upload >> complete
์ ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์ ๊ณผ์ ์ ์ํํ๋ค.
๋๋จธ์ง ๊ณผ์ ์ ์ด์ ๊ฒ์๊ธ์์ ์ค๋ช ํ๊ธฐ ๋๋ฌธ์, ์ด๋ฒ ๊ฒ์๋ฌผ์์๋ S3 ๊ด๋ จํ ๋ด์ฉ๋ง ์ค๋ช ํ๊ฒ ๋ค.
AWS Console์ ์ ์ํ์ฌ S3 โ bucket ์ผ๋ก ๋ค์ด๊ฐ๋ค.
๋ฒํท ์ด๋ฆ
์ง์ ๊ฐ์ฒด ์์ ๊ถ
: ACL ๋นํ์ฑํ๋จ(๊ถ์ฅ) ์ ํํผ๋ธ๋ฆญ ์ก์ธ์ค ์ฐจ๋จ ์ค์
: ๋ชจ๋ ํผ๋ธ๋ฆญ ์ก์ธ์ค ์ฐจ๋จ ํด์ ๋ค์๊ณผ ๊ฐ์ด ์์ฑ๋๋ฉด ์๋ฃ๋ค.
AWS Console์ ์ ์ํ์ฌ IAM ์ผ๋ก ๋ค์ด๊ฐ๋ค.
์ฌ์ฉ์
์ ํ์ฌ์ฉ์ ์์ฑ
์ฌ์ฉ์ ์ด๋ฆ
์ง์ ๋ณด์ ์๊ฒฉ ์ฆ๋ช
์ผ๋ก ๋ค์ด๊ฐ๋ค. ์๋ ์ชฝ์, ์ก์ธ์ค ํค
ํํธ์์ ์ก์ธ์ค ํค ๋ง๋ค๊ธฐ
๋ฅผ ์ ํํด์ค๋ค.AWS ์ปดํจํ
์๋น์ค์์ ์คํ๋๋ ์ ํ๋ฆฌ์ผ์ด์
์ ์ ํํด์ค๋ค.์ก์ธ์ค ํค
์ ๋น๋ฐ ์ก์ธ์ค ํค
๊ฐ ์๋๋ฐ, ๋น๋ฐ ์ก์ธ์ค ํค๋ ์ฌํ์ธํ๊ฑฐ๋ ๋ณต๊ตฌํ ์ ์์ผ๋ฏ๋ก ์ ์ ์ฅํด๋๊ธธ ๋ฐ๋๋ค.Airflow web ui์์ Admin โ Connections ์ผ๋ก ๋ค์ด๊ฐ๋ค.
์ข์ธก ์๋จ์ +
๋ฒํผ์ ๋๋ฌ์ค๋ค.
{
"region_name": "ap-northeast-2"
}
Connection Type์ Amazon Web Services๊ฐ ๋ํ๋์ง ์์ ๋,
pip install apache-airflow-providers-amazon
์ ์ ๋ ฅํ๋ค. ์ดํ webserver๋ฅผ ์ข ๋ฃ ํ ๋ค์ ์คํ์์ผ์ค์ผ Connection Type์ Amazon Web Services๊ฐ ๋ํ๋๋ค.
test ๋ฒํผ์ด ํ์ฑํ๋์ด์์ง ์์ ์ ์๋ค. ์ด ๋, airflow.cfg
ํ์ผ์์ test_connection
๋ถ๋ถ์ด Disabled๋ก ๋์ด์์ํ
๋ฐ, Enabled๋ก ๋ฐ๊ฟ์ค๋ค.
๊ทธ๋ฌ๋ฉด test ๋ฒํผ์ด ํ์ฑํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค. test ๋ฒํผ์ ๋๋ฅด๊ณ , ์๋จ์ ์ด๋ก์ฐฝ์ด ๋จ๋ฉด ์ ์ฐ๊ฒฐ๋ ๊ฒ์ด๋ค.
crawling_contest, crawling_event, crawling_velog ์ธ ๊ฐ์ง ํฌ๋กค๋ง ํ์ผ์์ ์์งํ ๋ฐ์ดํฐ๋ค์ csv ํ์ผ๋ก ๋ง๋ค์ด์ฃผ์.
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
S3์ ์์งํ ๋ฐ์ดํฐ๋ฅผ ๋ด์ csv ํ์ผ์ ์ ๋ก๋ํ๋ task๋ฅผ ์ํด ํจ์๋ฅผ ์ถ๊ฐํด์ค๋ค.
def upload_to_s3() :
date = datetime.now().strftime("%Y%m%d")
hook = S3Hook('de-project') # connection ID ์
๋ ฅ
event_filename = f'/home/ubuntu/airflow/airflow/data/event_{date}.csv'
velog_filename = f'/home/ubuntu/airflow/airflow/data/velog_{date}.csv'
contest_filename = f'/home/ubuntu/airflow/airflow/data/contest_{date}.csv'
event_key = f'data/event_{date}.csv'
velog_key = f'data/velog_{date}.csv'
contest_key = f'data/contest_{date}.csv'
bucket_name = 'de-project-airflow'
hook.load_file(filename=event_filename, key=event_key, bucket_name=bucket_name, replace=True)
hook.load_file(filename=velog_filename, key=velog_key, bucket_name=bucket_name, replace=True)
hook.load_file(filename=contest_filename, key=contest_key, bucket_name=bucket_name, replace=True)
connection ID
๋ฅผ ์
๋ ฅํด์ค๋ค.filename
: S3์ ์ฌ๋ฆด csv ํ์ผ์ด ์ ์ฅ๋ ๊ฒฝ๋กkey
: S3์ ์ ์ฅํ ๊ฒฝ๋กbucket_name
: S3์ ๋ฒํท ์ด๋ฆwith DAG( **dag_args ) as dag:
...
upload = PythonOperator(
task_id = 'upload',
python_callable = upload_to_s3
)
...
start >> event_get_data_task >> upload >> complete
start >> velog_get_url_task >> velog_get_info_task >> upload >> complete
start >> contest_task >> upload >> complete
๊ฐ๊ฐ์ ํฌ๋กค๋ง ์ฝ๋๋ฅผ ํตํด ๋ฐ์ดํฐ๋ฅผ ์์งํ ๋ค์, csv ํ์ผ์ ์ ์ฅํ ๋ค์ S3์ ์ ์ฅ๋ csv ํ์ผ์ ์ ๋ก๋ํ ์ ์๋๋ก task ์์กด์ฑ์ ์์ ํด์ฃผ์๋ค.
๋ค์์๋ airflow์ slack์ ์ฐ๊ฒฐํด ํฌ๋กค๋ง์ ํตํด ์์งํ ๋ฐ์ดํฐ๋ฅผ ํน์ ์ํฌ์คํ์ด์ค์ ์๋์ผ๋ก ๋ณด๋ด์ฃผ๋ ์ฌ๋๋ด์ ๋ง๋ค์ด๋ณด๊ฒ ๋ค.