[DataEngineering] Airflow์™€ Amazon S3 ์—ฐ๊ฒฐ

์œ ํ˜œ์ง€ยท2023๋…„ 12์›” 2์ผ
1

DataEngineering

๋ชฉ๋ก ๋ณด๊ธฐ
4/6


ํ‘ธ๐Ÿค—ํ•ญ๐Ÿค—ํ•ญ


์ €๋ฒˆ ๊ฒŒ์‹œ๊ธ€์—์„œ ์ž‘์—…ํ•œ ํฌ๋กค๋ง์„ ์ˆ˜ํ–‰ํ•˜๋Š” ํŒŒ์ด์ฌ ์ฝ”๋“œ์— ๋ช‡ ๊ฐ€์ง€ ํฌ๋กค๋ง ์ฝ”๋“œ๋ฅผ ์ข€ ๋” ์ถ”๊ฐ€ํ•˜์—ฌ
dag์„ ๊ตฌ์„ฑํ–ˆ๋‹ค. dag ๊ตฌ์„ฑ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์•˜๋‹ค(์ด๋ฏธ S3 ์—…๋กœ๋“œ๊นŒ์ง€ ์™„์„ฑ๋œ dagํŒŒ์ผ์ด๋‹ค..). (์ด ๋งํฌ์— ๋ฐฉ๋ฌธํ•˜๋ฉด dag ํŒŒ์ผ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.)

  • start: ์‹œ์ž‘
  • velog_get_url: velog์—์„œ ์ˆ˜์ง‘ํ•œ url list ๊ฐ€์ ธ์˜ค๊ธฐ
  • velog_get_info: velog์—์„œ ์ˆ˜์ง‘ํ•œ url์—์„œ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
  • event_get_data: ๊ฐœ๋ฐœ ํ–‰์‚ฌ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
  • contest_crawling: ๊ฐœ๋ฐœ ๋Œ€ํšŒ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
  • upload: ์ˆ˜์ง‘ํ•œ ์ •๋ณด S3์— ์—…๋กœ๋“œ
  • complete_bash: ์ข…๋ฃŒ

DAG ๊ตฌ์„ฑ

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.providers.amazon.aws.hooks.s3 import S3Hook # ์ถ”๊ฐ€

import sys, os
sys.path.append(os.getcwd())

from crawling_event import *
from crawling_velog import *
from crawling_contest import *


def upload_to_s3() :
    date = datetime.now().strftime("%Y%m%d")
    hook = S3Hook('de-project') # connection ID ์ž…๋ ฅ
    
    event_filename = f'/home/ubuntu/airflow/airflow/data/event_{date}.csv'
    velog_filename = f'/home/ubuntu/airflow/airflow/data/velog_{date}.csv'
    contest_filename = f'/home/ubuntu/airflow/airflow/data/contest_{date}.csv'

    event_key = f'data/event_{date}.csv'
    velog_key = f'data/velog_{date}.csv'
    contest_key = f'data/contest_{date}.csv'

    bucket_name = 'de-project-airflow' 
    hook.load_file(filename=event_filename, key=event_key, bucket_name=bucket_name, replace=True)
    hook.load_file(filename=velog_filename, key=velog_key, bucket_name=bucket_name, replace=True)
    hook.load_file(filename=contest_filename, key=contest_key, bucket_name=bucket_name, replace=True)

default_args = {
    'owner': 'owner-name',
    'depends_on_past': False,
    'email': ['youremail@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
}

dag_args = dict(
    dag_id="crawling-upload",
    default_args=default_args,
    description='description',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 11, 29),
    tags=['de-project'],
)

with DAG( **dag_args ) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start"',
    )

    upload = PythonOperator(
        task_id = 'upload',
        python_callable = upload_to_s3
    )
    
    # -------- velog -------- #
    velog_get_url_task = PythonOperator(
        task_id='velog_get_url',
        python_callable= velog_get_url
    )

    velog_get_info_task= PythonOperator(
        task_id='velog_get_info',
        python_callable= velog_get_info
    )

    # -------- event -------- #
    event_get_data_task = PythonOperator(
        task_id="event_get_data", 
        python_callable= event_get_data
    )

    # -------- contest -------- #
    contest_task = PythonOperator(
        task_id='contest_crawling',
        python_callable=contest_crawling,
    )

    complete = BashOperator(
        task_id='complete_bash',
        bash_command='echo "complete"',
    )

    start >> event_get_data_task >> upload >> complete
    start >> velog_get_url_task >> velog_get_info_task >> upload >> complete
    start >> contest_task >> upload >> complete

์œ„ ์ฝ”๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ณผ์ •์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

  • ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
  • ์ถ”์ถœํ•œ ์ •๋ณด๋ฅผ csv ํŒŒ์ผ๋กœ ๋งŒ๋“ค์–ด S3์— ์—…๋กœ๋“œ
  • dag ๊ตฌ์„ฑ
  • task ์˜์กด์„ฑ ์ž‘์„ฑ

๋‚˜๋จธ์ง€ ๊ณผ์ •์€ ์ด์ „ ๊ฒŒ์‹œ๊ธ€์—์„œ ์„ค๋ช…ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์—, ์ด๋ฒˆ ๊ฒŒ์‹œ๋ฌผ์—์„œ๋Š” S3 ๊ด€๋ จํ•œ ๋‚ด์šฉ๋งŒ ์„ค๋ช…ํ•˜๊ฒ ๋‹ค.


S3 ์—ฐ๊ฒฐ

S3 ๋ฒ„ํ‚ท ์ƒ์„ฑ

AWS Console์— ์ ‘์†ํ•˜์—ฌ S3 โ†’ bucket ์œผ๋กœ ๋“ค์–ด๊ฐ„๋‹ค.

  • ๋ฒ„ํ‚ท ์ƒ์„ฑ
  • ๋ฒ„ํ‚ท ์ด๋ฆ„ ์ง€์ •
  • ๊ฐ์ฒด ์†Œ์œ ๊ถŒ: ACL ๋น„ํ™œ์„ฑํ™”๋จ(๊ถŒ์žฅ) ์„ ํƒ
  • ํผ๋ธ”๋ฆญ ์•ก์„ธ์Šค ์ฐจ๋‹จ ์„ค์ •: ๋ชจ๋“  ํผ๋ธ”๋ฆญ ์•ก์„ธ์Šค ์ฐจ๋‹จ ํ•ด์ œ
  • ๋‚˜๋จธ์ง€ ์„ค์ • ๊ธฐ๋ณธ๊ฐ’

๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ƒ์„ฑ๋˜๋ฉด ์™„๋ฃŒ๋‹ค.

IAM ์ƒ์„ฑ

AWS Console์— ์ ‘์†ํ•˜์—ฌ IAM ์œผ๋กœ ๋“ค์–ด๊ฐ„๋‹ค.

  • ์ขŒ์ธก ๋ฆฌ์ŠคํŠธ์—์„œ ์‚ฌ์šฉ์ž ์„ ํƒ
  • ์‚ฌ์šฉ์ž ์ƒ์„ฑ
    • ์‚ฌ์šฉ์ž ์ด๋ฆ„ ์ง€์ •
    • ๋‚˜๋จธ์ง€ ์„ค์ • ๊ธฐ๋ณธ๊ฐ’
      ์‚ฌ์šฉ์ž๋ฅผ ์ƒ์„ฑํ•œ ๋’ค, ์‚ฌ์šฉ์ž ์„ธ๋ถ€ ์ •๋ณด๋กœ ๋“ค์–ด๊ฐ€๋ฉด, ๋ณด์•ˆ ์ž๊ฒฉ ์ฆ๋ช… ์œผ๋กœ ๋“ค์–ด๊ฐ„๋‹ค. ์•„๋ž˜ ์ชฝ์—, ์•ก์„ธ์Šค ํ‚ค ํŒŒํŠธ์—์„œ ์•ก์„ธ์Šค ํ‚ค ๋งŒ๋“ค๊ธฐ๋ฅผ ์„ ํƒํ•ด์ค€๋‹ค.

      (๋‚˜๋Š” ์ด๋ฏธ ์ƒ์„ฑํ•ด์„œ ์กด์žฌํ•˜์ง€๋งŒ, ์‚ฌ์šฉ์ž๋ฅผ ์ƒˆ๋กœ ์ƒ์„ฑํ–ˆ๋‹ค๋ฉด, ์—†์„ ๊ฒƒ์ด๋‹ค.)
  • ์šฐ๋ฆฐ AWS ์ปดํ“จํŒ… ์„œ๋น„์Šค์—์„œ ์‹คํ–‰ํ•  ๊ฒƒ์ด๊ธฐ ๋•Œ๋ฌธ์—, 3๋ฒˆ์งธ AWS ์ปดํ“จํŒ… ์„œ๋น„์Šค์—์„œ ์‹คํ–‰๋˜๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์„ ํƒํ•ด์ค€๋‹ค.
  • ํƒœ๊ทธ ๊ฐ’์„ ์„ค์ •ํ•˜๋ฉด ์•ก์„ธ์Šค ํ‚ค๊ฐ€ ์ƒ์„ฑ๋œ๋‹ค.
    ์•ก์„ธ์Šค ํ‚ค์™€ ๋น„๋ฐ€ ์•ก์„ธ์Šค ํ‚ค๊ฐ€ ์žˆ๋Š”๋ฐ, ๋น„๋ฐ€ ์•ก์„ธ์Šค ํ‚ค๋Š” ์žฌํ™•์ธํ•˜๊ฑฐ๋‚˜ ๋ณต๊ตฌํ•  ์ˆ˜ ์—†์œผ๋ฏ€๋กœ ์ž˜ ์ €์žฅํ•ด๋‘๊ธธ ๋ฐ”๋ž€๋‹ค.

S3์™€ Airflow ์—ฐ๊ฒฐ

Airflow web ui์—์„œ Admin โ†’ Connections ์œผ๋กœ ๋“ค์–ด๊ฐ„๋‹ค.

์ขŒ์ธก ์ƒ๋‹จ์— + ๋ฒ„ํŠผ์„ ๋ˆŒ๋Ÿฌ์ค€๋‹ค.

  • Connection ID ์ง€์ •
  • Connection Type: Amazon Web Services
  • AWS Access Key ID: IAM ์ƒ์„ฑํ•  ๋•Œ ์ €์žฅํ•ด๋‘์—ˆ๋˜ ์•ก์„ธ์Šค ํ‚ค ์ž…๋ ฅ
  • AWS Secret Access Key: IAM ์ƒ์„ฑํ•  ๋•Œ ์ €์žฅํ•ด๋‘์—ˆ๋˜ ๋น„๋ฐ€ ์•ก์„ธ์Šค ํ‚ค ์ž…๋ ฅ
  • Extra: ์•„๋ž˜์™€ ๊ฐ™์ด region์— ๋Œ€ํ•œ default ๊ฐ’์„ ์ง€์ •ํ•œ๋‹ค.
{
  "region_name": "ap-northeast-2"
}

Connection Type์— Amazon Web Services๊ฐ€ ๋‚˜ํƒ€๋‚˜์ง€ ์•Š์„ ๋•, pip install apache-airflow-providers-amazon ์„ ์ž…๋ ฅํ•œ๋‹ค. ์ดํ›„ webserver๋ฅผ ์ข…๋ฃŒ ํ›„ ๋‹ค์‹œ ์‹คํ–‰์‹œ์ผœ์ค˜์•ผ Connection Type์— Amazon Web Services๊ฐ€ ๋‚˜ํƒ€๋‚œ๋‹ค.

Test Connection

test ๋ฒ„ํŠผ์ด ํ™œ์„ฑํ™”๋˜์–ด์žˆ์ง€ ์•Š์„ ์ˆ˜ ์žˆ๋‹ค. ์ด ๋•, airflow.cfg ํŒŒ์ผ์—์„œ test_connection ๋ถ€๋ถ„์ด Disabled๋กœ ๋˜์–ด์žˆ์„ํ…๋ฐ, Enabled๋กœ ๋ฐ”๊ฟ”์ค€๋‹ค.

๊ทธ๋Ÿฌ๋ฉด test ๋ฒ„ํŠผ์ด ํ™œ์„ฑํ™”๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. test ๋ฒ„ํŠผ์„ ๋ˆ„๋ฅด๊ณ , ์ƒ๋‹จ์— ์ดˆ๋ก์ฐฝ์ด ๋œจ๋ฉด ์ž˜ ์—ฐ๊ฒฐ๋œ ๊ฒƒ์ด๋‹ค.


DAG ํŒŒ์ผ ์ˆ˜์ •

crawling_contest, crawling_event, crawling_velog ์„ธ ๊ฐ€์ง€ ํฌ๋กค๋ง ํŒŒ์ผ์—์„œ ์ˆ˜์ง‘ํ•œ ๋ฐ์ดํ„ฐ๋“ค์„ csv ํŒŒ์ผ๋กœ ๋งŒ๋“ค์–ด์ฃผ์ž.

S3 ๊ด€๋ จ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

S3์— ํŒŒ์ผ ์—…๋กœ๋“œํ•ด์ฃผ๋Š” task ์ถ”๊ฐ€

S3์— ์ˆ˜์ง‘ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์€ csv ํŒŒ์ผ์„ ์—…๋กœ๋“œํ•˜๋Š” task๋ฅผ ์œ„ํ•ด ํ•จ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

def upload_to_s3() :
    date = datetime.now().strftime("%Y%m%d")
    hook = S3Hook('de-project') # connection ID ์ž…๋ ฅ
    
    event_filename = f'/home/ubuntu/airflow/airflow/data/event_{date}.csv'
    velog_filename = f'/home/ubuntu/airflow/airflow/data/velog_{date}.csv'
    contest_filename = f'/home/ubuntu/airflow/airflow/data/contest_{date}.csv'

    event_key = f'data/event_{date}.csv'
    velog_key = f'data/velog_{date}.csv'
    contest_key = f'data/contest_{date}.csv'

    bucket_name = 'de-project-airflow' 
    hook.load_file(filename=event_filename, key=event_key, bucket_name=bucket_name, replace=True)
    hook.load_file(filename=velog_filename, key=velog_key, bucket_name=bucket_name, replace=True)
    hook.load_file(filename=contest_filename, key=contest_key, bucket_name=bucket_name, replace=True)
  • S3Hook์˜ ์ธ์ž๋กœ connection ID๋ฅผ ์ž…๋ ฅํ•ด์ค€๋‹ค.
  • hook.load_file์˜ ์ธ์ž๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์„ค์ •ํ•ด์ค€๋‹ค.
    • filename: S3์— ์˜ฌ๋ฆด csv ํŒŒ์ผ์ด ์ €์žฅ๋œ ๊ฒฝ๋กœ
    • key: S3์— ์ €์žฅํ•  ๊ฒฝ๋กœ
    • bucket_name: S3์˜ ๋ฒ„ํ‚ท ์ด๋ฆ„

DAG ๊ฐ์ฒด ์ˆ˜์ •

with DAG( **dag_args ) as dag:
	...
    
    upload = PythonOperator(
        task_id = 'upload',
        python_callable = upload_to_s3
    )
    
    ...

task ์˜์กด์„ฑ ์ˆ˜์ •

    start >> event_get_data_task >> upload >> complete
    start >> velog_get_url_task >> velog_get_info_task >> upload >> complete
    start >> contest_task >> upload >> complete

๊ฐ๊ฐ์˜ ํฌ๋กค๋ง ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•œ ๋‹ค์Œ, csv ํŒŒ์ผ์„ ์ €์žฅํ•œ ๋’ค์— S3์— ์ €์žฅ๋œ csv ํŒŒ์ผ์„ ์—…๋กœ๋“œํ•  ์ˆ˜ ์žˆ๋„๋ก task ์˜์กด์„ฑ์„ ์ˆ˜์ •ํ•ด์ฃผ์—ˆ๋‹ค.


๋‹ค์Œ์—๋Š” airflow์™€ slack์„ ์—ฐ๊ฒฐํ•ด ํฌ๋กค๋ง์„ ํ†ตํ•ด ์ˆ˜์ง‘ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ํŠน์ • ์›Œํฌ์ŠคํŽ˜์ด์Šค์— ์ž๋™์œผ๋กœ ๋ณด๋‚ด์ฃผ๋Š” ์Šฌ๋ž™๋ด‡์„ ๋งŒ๋“ค์–ด๋ณด๊ฒ ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€