📖 작성한 크롤링 코드를 Airflow에 DAG로 옮겨보는 작업을 해보자!
✅ 코드가 동작되는 순서를 정리해보고, 이에 따라 task를 구성해보자.
✅ 실제 task는 다음과 같이 구성했다.
웹서버를 실행하거나 DAG를 실행하면 자동으로 생성되는 파일들은 다음과 같다
저번에 정리를 못 해서 이제 하는 건 안 비밀😉
🔎 실습 환경 : AWS EC2에서 가상환경(virtualenv)로 작업
lsof -i tcp:8080
으로도 PID를 확인할 수 있다.) 트리 구조로 전체 파일들을 확인하면 아래와 같다.
이제 각각 task와 직접적으로 관련된 파일들을 어떻게 작성했는지 정리해보자.
crawling_velog_dag.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import sys, os
sys.path.append(os.getcwd())
from crawling.crawling_velog import *
def print_result(**kwargs):
r = kwargs["task_instance"].xcom_pull(key='result_msg')
print("message : ", r)
default_args = {
'owner': 'owner-name',
'depends_on_past': False,
'email': ['your-email@g.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
}
dag_args = dict(
dag_id="crawling-velog",
default_args=default_args,
description='tutorial DAG ml',
schedule_interval=timedelta(minutes=50),
start_date=datetime.datetime(2023, 11, 24),
tags=['example-sj'],
)
with DAG( **dag_args ) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "start!"',
)
get_url_task = PythonOperator(
task_id='selenium_get_url',
python_callable=get_url_list,
)
get_info_task = PythonOperator(
task_id='bs_get_info',
python_callable=crawling,
op_kwargs={'url_list':"url_list"}
)
msg = PythonOperator(
task_id='msg',
python_callable=print_result
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete!"',
)
start >> get_url_task >> get_info_task >> msg >> complete
DAG, 필요한 종류의 Operator들, trigger 규칙 등과 기타 라이브러리를 불러온다.
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import sys, os
sys.path.append(os.getcwd())
from crawling.crawling_velog import *
dag의 ID
, 스케줄 간격
, 시작 시간
, 과거 의존성
, 재시도
등과 관련된 설정 지정한다
# 결과 출력용 함수 정의
def print_result(**kwargs):
r = kwargs["task_instance"].xcom_pull(key='result_msg')
print("message : ", r)
# dag의 이름, 의존성, 이메일, 실패 시 이메일, 재시도 횟수, 재시도 시간 간격 등을 지정
default_args = {
'owner': 'owner-name',
'depends_on_past': False,
'email': ['your-email@g.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
}
# dag의 id, arguments, 설명, 실행 간격, 시작 일자, 태그 등을 지정
dag_args = dict(
dag_id="crawling-velog",
default_args=default_args,
description='tutorial DAG ml',
schedule_interval=timedelta(minutes=50),
start_date=datetime.datetime(2023, 11, 24),
tags=['example-sj'],
)
위에서 지정한 argument로 설정된 DAG 객체를 정의하고, 그 안에 실행될 task
들과 그 실행 순서 그래프
를 명시한다.
with DAG( **dag_args ) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "start!"',
)
get_url_task = PythonOperator(
task_id='selenium_get_url',
python_callable=get_url_list,
)
get_info_task = PythonOperator(
task_id='bs_get_info',
python_callable=crawling,
op_kwargs={'url_list':"url_list"}
)
msg = PythonOperator(
task_id='msg',
python_callable=print_result
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete!"',
)
# Task를 연결하여 dag 그래프 그리기
start >> get_url_task >> get_info_task >> msg >> complete
start
,end
: Bash Operator로 echo 명령어로 메세지를 출력get_url_task
: Python Operator로 get_url_list 함수를 실행해서 selenium으로 트렌딩 블로그 url을 크롤링하는 taskget_info_task
: Python Operator로 crawling 함수를 실행해서 bs4로get_url_task
에서 받아온 글(url)의 정보를 크롤링하는 taskmsg
: 최종 실행 결과에 대한 출력을 하는 task
- 각 task마다
task_id
지정- Bash operator
bash_command
로 실행할 명령어 지정- Python operator
python_callable
로 실행할 함수(혹은 객체) 지정op_kwargs
로 파이선 개체에 전달할 키워드 인자(kwargs) 지정
이제 기존 크롤링 코드를 DAG에 맞게 수정하거나, Xcome을 활용하는 등 일부 수정하자
DAG를 위해서 수정한 부분만 아래에서 자세히 다뤄보도록 하자~!
crawling/crawling_velog.py
import pandas as pd
import time,os,requests
import warnings
import datetime
from selenium import webdriver
from bs4 import BeautifulSoup as bs
from selenium.webdriver.common.by import By
warnings.filterwarnings(action="ignore")
# selenium으로 velog 메인 페이지에서 트렌딩 글들 링크만 가져오기 (beautilfulSoup으로 하니까 못 찾아서)
def get_url_list(**kwargs) :
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
browser = webdriver.Chrome(options = chrome_options)
browser.get("https://velog.io/")
time.sleep(5)
url_list = browser.find_elements(By.CLASS_NAME, 'VLink_block__Uwj4P.PostCard_styleLink__DYahQ')
result = []
for url in url_list :
link = url.get_attribute('href')
print(f"######{link}######")
result.append(link)
result = list(set(result))
kwargs['task_instance'].xcom_push(key='url_list', value = result)
return "end get url list"
# beautifulSoup으로 각 블로그 글 접속
def open_page(url) :
req=requests.get(url).text
page=bs(req,"html.parser")
return page
# beautifulSoup으로 접속한 블로그 정보 가져오기
def get_title(page) :
title = page.find('div', {'class':'head-wrapper'})
title = title.find('h1').text
return title
def get_writer(page) :
writer = page.find('a',{'class' : 'user-logo'}).text
return writer
def get_thumnail(page) :
img_all = page.find_all('img')
img_src = img_all[0].get('src')
return img_src
def get_text(page) :
text_all = page.find_all(['p','h1','h2','h3','li'])
text=""
for t in text_all :
text += t.text
return text
def crawling(**kwargs) :
url_list = kwargs['task_instance'].xcom_pull(key = 'url_list')
title, writer, img, text, link = [], [], [], [], []
for l in url_list :
link.append(l)
page = open_page(l)
time.sleep(1)
title.append( get_title(page) )
writer.append( get_writer(page) )
img.append( get_thumnail(page) )
text.append( get_text(page) )
print(title)
data = pd.DataFrame({'title' : title, 'writer' : writer, 'img' : img, 'text' : text, 'link' : link})
date = datetime.datetime.now().strftime("%Y%m%d")
data.to_csv(f"/home/ubuntu/airflow/airflow/data/velog_{date}.csv", index=False)
kwargs['task_instance'].xcom_push(key = f'velog_csv', value= f"/home/ubuntu/airflow/airflow/data/velog_{date}.csv")
kwargs['task_instance'].xcom_push(key='result_msg', value= f"total number of blogs this week : {len(data)}")
def get_url_list(**kwargs) :
############ chrome 브라우저 설정 ################
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
browser = webdriver.Chrome(options = chrome_options)
######## 메인 페이지 접속하여 클래스 이름으로 url찾기 #############
browser.get("https://velog.io/")
time.sleep(5) # 연결에 시간이 걸릴 수 있으니 대기 시간 설정
url_list = browser.find_elements(By.CLASS_NAME, 'VLink_block__Uwj4P.PostCard_styleLink__DYahQ')
result = []
for url in url_list :
link = url.get_attribute('href')
print(f"blog url : {link}")
result.append(link)
result = list(set(result)) # 중복 제거
######## XCom에 url이 모여진 리스트를 저장 #######
kwargs['task_instance'].xcom_push(key='url_list', value = result)
return "end get url list"
def crawling(**kwargs) :
##### XCom에서 url 리스트 받아오기 #####
url_list = kwargs['task_instance'].xcom_pull(key = 'url_list')
# 빈 리스트로 초기화하고 크롤링하면서 리스트에 담기
title, writer, img, text, link = [], [], [], [], []
for l in url_list :
link.append(l)
page = open_page(l)
time.sleep(1) # 페이지 연결 대기 시간
title.append( get_title(page) )
writer.append( get_writer(page) )
img.append( get_thumnail(page) )
text.append( get_text(page) )
print(title)
# csv로 저장하기 위해 리스트를 dataframe으로 변환
data = pd.DataFrame({'title' : title, 'writer' : writer, 'img' : img, 'text' : text, 'link' : link})
# 파일 이름 지정을 위해 현재 날짜 계산(예; 20231125)
date = datetime.datetime.now().strftime("%Y%m%d")
# data 폴더에 csv 파일로 저장
data.to_csv(f"/home/ubuntu/airflow/airflow/data/velog_{date}.csv", index=False)
# csv 파일을 저장한 경로를 XCom에 저장
kwargs['task_instance'].xcom_push(key = f'velog_csv', value= f"/home/ubuntu/airflow/airflow/data/velog_{date}.csv")
# msg 태스크 수행을 위해서 총 크롤링한 블로그 개수를 세서 XCom에 저장
kwargs['task_instance'].xcom_push(key='result_msg', value= f"total number of blogs this week : {len(data)}")
werbserver GUI를 켜두고 scheduler를 실행시켜서 DAG가 잘 실행되었는지 확인해보자.
$ airflow webserver --port 8080
$ airflow scheduler
시행착오를 굉장히 많이 겪었다... 사실 EC2에 Chrome 설치해서 Selenium 실행시키는 것과, 대기 시간을 설정하지 않아서 결과가 나오지 않았던 쓸데없는 오류를 고치는 데 더 많은 시간이 걸렸지만... 그래도 성공했다...!! 🥲🥲
로컬로 파일 옮겨서 csv가 잘 저장되었는지까지 확인 완료!! 이론 부분은 다음주에 마저 추가로 공부해서 정리해야겠다..!
깃허브에 업로드까지 끝!-! 👉🏻 코드가 정리된 깃허브 바로가기