이전에 완성된 python 파일을 dag 형식으로 수정해주었다.
필요한 모듈 import
Default Argument 작성하기
시간은 Asia/Seoul로 바꿔주기(기본 시간은 UTC)
airflow 시간 바꾸는 방법 세가지
- 환경변수
AIRFLOW__CORE__DEFAULT_TIMEZONE
를Asia/Seoul
로 설정airflow.cfg
파일에서default_timezone = utc
부분을default_timezone = Asia/Seoul
으로 바꿔줌- DAG파일에서
pendulum
패키지를 이용해 시간을 변경
-> 내가 선택한 방법 ! 아래와 같은 코드를 py파일에 추가해주면 됨import pendulum # 기본 UTC 시간에서 Asia/Seoul로 변경 local_tz = pendulum.timezone("Asia/Seoul")
task 작성
Dependency 설정(task 순서 설정)
아래는 내가 작성한 dag 파일의 전문
import requests
from bs4 import BeautifulSoup
import pendulum
from datetime import datetime, timedelta
import pandas as pd
import pymysql
import re
from wordcloud import WordCloud
from PIL import Image
import matplotlib.pyplot as plt
from collections import Counter
import numpy as np
from konlpy.tag import Okt
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# 기본 UTC 시간에서 Asia/Seoul로 변경
local_tz = pendulum.timezone("Asia/Seoul")
# default_args 정의
default_args = {
"owner" : "admin",
"start_date" : datetime(2023, 7, 27, tzinfo=local_tz),
# dag 실행 중 오류 발생하면 발송하는 메일 비활성화
"email_on_failure" : False
}
dag = DAG(
dag_id="airflow_wordcloud",
max_active_runs = 1, # dag 동시 실행 방지
default_args = default_args, # 위에서 선언한 기본 args
# 분 시 일 월 요일 순으로 실행 주기 설정
schedule_interval="0 8 * * *", #매일 오전 8시 실행
# 이전 dag가 오래걸려서 다음 dag 시작 시간을 초과한 경우,
# True면 이전 dag 종료 이후 다음 dag 실행, False면 이전 dag 실행 중 다음 dag 실행
catchup = False
)
def _connect_db():
con = pymysql.connect(
host = 'db서버 ip',
port = 3306,
user = 'root',
password = '비밀번호',
database = 'project'
)
cur = con.cursor()
return cur
def _cafe_crawling(start=1, article_num=0, **context):
# 기존 DB에 적재된 데이터 중 가장 최신 id값 가져오기
cur.execute('select max(id) from wordcloud;')
recent_article_id = cur.fetchall()[0][0]
article_id = recent_article_id + 1
while article_id <= recent_article_id + 100: # 한번 크롤링 할때마다 100개씩 추출
url = f"https://apis.naver.com/cafe-web/cafe-articleapi/v2.1/cafes/28866679/articles/{article_id}?query=&menuId=1&boardType=L&useCafeId=true&requestFrom=A"
response = requests.get(url)
data = response.json()
# 게시글 정보 추출
article = data.get('result', {}).get('article', {})
article_timestamp = article.get('writeDate', 0) / 1000 # 작성일자를 밀리초에서 초로 변환
# 작성일자를 datetime 형식으로 변환
article_time = datetime.fromtimestamp(article_timestamp)
# 게시글 작성일자가 현재 이후 또는 삭제된 게시글인 경우 None 반환
if (article_time >= datetime.now()) or (not article.get('menu')):
article = None
comments = None
article_time = None
# 댓글 정보 추출
comments = data.get('result', {}).get('comments', {}).get('items', [])
if article is not None: #게시글 정보가 있는 경우
article_menu = article.get('menu', {}).get('name', '')
article_subject = article.get('subject', '')
# 게시글 내용 추출
article_content_html = article.get('contentHtml', '')
article_content_soup = BeautifulSoup(article_content_html, 'html.parser')
article_content = article_content_soup.find('div', class_='se-component se-text se-l-default')
if article_content:
article_content = article_content.text.strip()
else:
article_content = ""
# 댓글 내용 추출
comment_contents = []
for comment in comments:
comment_contents.append(comment.get('content', ''))
article_comment = ' '.join(comment_contents)
# 정규표현식 전처리
article_menu = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_menu)
article_subject = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_subject)
article_content = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_content)
article_comment = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_comment)
# DB에 적재
try:
cur.execute(f'insert into wordcloud values("{article_id}", "{article_menu}", "{article_subject}", "{article_content}", "{article_comment}", "{article_time}")')
con.commit()
except Exception as e:
print(f"에러 발생: {str(e)}")
article_num += 1 # 방금 크롤링으로 DB에 적재된 데이터 개수
# article_id 증가
article_id += 1
start += 1
return article_num
# 해당 기간동안의 크롤링 데이터 DB에서 불러오기
def _get_content(**context):
# 불러오고 싶은 기간 선택(인자 days, 여기선 일주일로 함)
today = datetime.now()
select_day = datetime.strftime(today - timedelta(days=7),"%Y-%m-%d %H:%M:%S")
# db에 적재된 게시글들의 정보 불러오기
cur.execute(f"SELECT subject,content,comment FROM wordcloud WHERE time > '{select_day}';")
result = cur.fetchall()
# 데이터 한 문자열로 합치기
join_result = [' '.join(sub_tuple) for sub_tuple in result]
final_result = ' '.join(join_result)
return final_result
def _make_wordcloud(**context):
word_data = context['task_instance'].xcom_pull(task_ids='get_content') # task3로부터 word_data를 가져옴
# 자연어 처리
okt = Okt()
nouns = okt.nouns(word_data) # 명사인 것들만 추출
words = [n for n in nouns if len(n) > 1] # 단어가 1글자 이상인 것만 추출
word_counts = Counter(words) # 빈도수 count
# 워드클라우드 만들기
wc = WordCloud(
font_path="./NanumGothic.ttf", # 폰트 지정 << 경로 문제 발생 가능
width=400, # 너비 지정
height=400, # 높이 지정
max_font_size=100, # 가장 빈도수 높은 단어의 폰트 사이즈 지정
background_color="white" # 배경색 지정
)
word_cloud = wc.generate_from_frequencies(word_counts) # 워드클라우드 빈도수 지정
# 워드클라우드 그래프 출력
plt.figure()
plt.imshow(word_cloud)
plt.axis("off")
# 워드클라우드 PNG 파일로 저장
today = datetime.now().date()
a = word_cloud.to_image().save(f"wc_{today}.png", format='PNG')
return a
task1 = PythonOperator(
task_id='connect_db',
python_callable=_connect_db,
dag=dag
)
task2 = PythonOperator(
task_id='cafe_crawling',
python_callable=_cafe_crawling,
provide_context=True,
dag=dag
)
task3 = PythonOperator(
task_id='get_content',
python_callable=_get_content,
provide_context=True,
dag=dag
)
task4 = PythonOperator(
task_id='make_wordcloud',
python_callable=_make_wordcloud,
op_args=task3.output, # task3의 반환 값을 task4의 파라미터로 전달합니다.
provide_context=True,
dag=dag
)
task1 >> task2 >> task3 >> task4
나는 docker에 airflow를 컨테이너로 띄워두었으므로 docker 부터 실행 !
sudo service docker start
sudo service docker status
airflow 컨테이너 실행
docker start airflow
airflow 컨테이너 진입
docker exec -it airflow /bin/bash
airflow bask 창에 입력, airflow webserver와 scheduler 실행
airflow webserver && airflow scheduler
이전에 docker에 airflow 컨테이너를 띄울때, docker volume을 지정해두었다. 따라서 지정한 /home/gyu/workspace/dags
폴더에 dag 파일을 넣어주면, 도커 볼륨에 의해서 docker에 띄워둔 airflow의 dags 폴더에도 dag 파일이 복사가 된다.
-> dags 폴더에 권한문제가 발생할 경우, (1)에 적어두었다.
airflow webserver에 뜨지 않으면 터미널에서 ctrl+c를 눌러 airflow webserver와 scheduler를 멈추고 다시 접속했다.
(내 dag 오류는 아니지만)
위와 같은 오류가 뜨면서 webserver dag 목록에 내가 작성한 dag가 뜨지 않는다.
이 경우, 계속해서 gpt에게 내 .py파일을 보여주면서 수정해야할 부분들을 수정했다.
그럼 이제 아래 사진과 같이 dags 목록에 뜬 내 airflow_wordcloud
dag를 확인할 수 있다.
근데 계속해서 dag를 trigger하면 첫 task(db_connect)부터 에러가 발생해서 catchup = False
, start_date = 오늘날짜
로 수정해주었다.
근데도 안댄다. . . 포기하고 싶다 . . .
하지만 실패는 성공의 어쩌구 . . . 따라서 이번엔 해당 오류 나는 task의 log를 확인해보기로 했다.
graph
-> 해당 task 선택
-> 뜨는 창에서 log 선택
-> log 확인
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '127.0.0.1' ([Errno 111] Connection refused)")
그냥 local db가 아니라 gcp에 띄운 db에서 작업하기로 결정 ! connect_db
task부분을 수정해준다. 그러고 다시 실행하면,
이제 드디어 task1이 정상적으로 실행되었다 !!!
곧 바로 발생한 task2의 error는요, NameError: name 'cur' is not defined
즉, 코드에서 전역변수로 설정한 con, cur을 인식하지 못해 발생하는 오류였다. 이는 gpt한테 물어봤더니,
Airflow에서는 여러 개의 워커(worker)가 동시에 실행되므로 PythonOperator에서 전역 변수를 사용하는 것이 의도대로 동작하지 않을 수 있습니다. 각 워커는 독립적으로 작동하며, 서로 간에 전역 변수를 공유하지 않습니다. 따라서 PythonOperator 내에서 정의된 전역 변수는 다른 PythonOperator에서는 접근할 수 없습니다.
대신, Airflow에서는 XCom이라는 메커니즘을 제공하여 작업(Task) 간 데이터를 공유할 수 있습니다. XCom은 작업의 출력을 다른 작업이 사용할 수 있도록 허용합니다.
문제를 해결하기 위해 _connect_db 함수의 반환 값을 XCom으로 저장하고, _cafe_crawling 함수에서 이 값을 사용하도록 수정해보겠습니다. 또한 _make_wordcloud 함수에서도 word_data를 XCom으로부터 가져와 사용할 수 있도록 수정합니다.
위에 알려준대로 connect_db
task에는 return con, cur
을 선언하고
cafe_crawling
에는 con, cur = context['task_instance'].xcom_pull(task_ids='connect_db')
구문을 추가해주었다.
그랬더니 이번엔 아예 task1
에서 TypeError: Object of type Cursor is not JSON serializable
이 발생했는데, 이는 pythonOperator
에서 함수를 실행할때, 반환 값을 XCom을 이용해 다른 곳에 전달하는 과정에서 JSON이 직렬화 돼 있어야 하는데, 내가 반환하려 했던 pymysql
의 cur
객체는 직렬화가 안되는 객체였음. 따라서 task1을 삭제하고 각각의 con, cur을 필요로 하는 task2, task3에 pymysql 구문을 추가해주었다.
...(생략)...
def _cafe_crawling(start=1, article_num=0, **context):
# DB 연결
con = pymysql.connect(
host='db서버 ip',
port=3306,
user='root',
password='비밀번호',
database='project'
)
cur = con.cursor()
# 기존 DB에 적재된 데이터 중 가장 최신 id값 가져오기
cur.execute('select max(id) from wordcloud;')
recent_article_id = cur.fetchall()[0][0]
article_id = recent_article_id + 1
while article_id <= recent_article_id + 100: # 한번 크롤링 할때마다 100개씩 추출
url = f"https://apis.naver.com/cafe-web/cafe-articleapi/v2.1/cafes/28866679/articles/{article_id}?query=&menuId=1&boardType=L&useCafeId=true&requestFrom=A"
response = requests.get(url)
data = response.json()
# 게시글 정보 추출
article = data.get('result', {}).get('article', {})
article_timestamp = article.get('writeDate', 0) / 1000 # 작성일자를 밀리초에서 초로 변환
# 작성일자를 datetime 형식으로 변환
article_time = datetime.fromtimestamp(article_timestamp)
# 게시글 작성일자가 현재 이후 또는 삭제된 게시글인 경우 None 반환
if (article_time >= datetime.now()) or (not article.get('menu')):
article = None
comments = None
article_time = None
# 댓글 정보 추출
comments = data.get('result', {}).get('comments', {}).get('items', [])
if article is not None: #게시글 정보가 있는 경우
article_menu = article.get('menu', {}).get('name', '')
article_subject = article.get('subject', '')
# 게시글 내용 추출
article_content_html = article.get('contentHtml', '')
article_content_soup = BeautifulSoup(article_content_html, 'html.parser')
article_content = article_content_soup.find('div', class_='se-component se-text se-l-default')
if article_content:
article_content = article_content.text.strip()
else:
article_content = ""
# 댓글 내용 추출
comment_contents = []
for comment in comments:
comment_contents.append(comment.get('content', ''))
article_comment = ' '.join(comment_contents)
# 정규표현식 전처리
article_menu = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_menu)
article_subject = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_subject)
article_content = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_content)
article_comment = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_comment)
# DB에 적재
try:
cur.execute(f'insert into wordcloud values("{article_id}", "{article_menu}", "{article_subject}", "{article_content}", "{article_comment}", "{article_time}")')
con.commit()
except Exception as e:
print(f"에러 발생: {str(e)}")
article_num += 1 # 방금 크롤링으로 DB에 적재된 데이터 개수
# article_id 증가
article_id += 1
start += 1
# DB 연결 종료
cur.close()
con.close()
return article_num
# 해당 기간동안의 크롤링 데이터 DB에서 불러오기
def _get_content(**context):
# DB 연결
con = pymysql.connect(
host='db서버 ip',
port=3306,
user='root',
password='비밀번호',
database='project'
)
cur = con.cursor()
# 불러오고 싶은 기간 선택(인자 days, 여기선 일주일로 함)
today = datetime.now()
select_day = datetime.strftime(today - timedelta(days=7),"%Y-%m-%d %H:%M:%S")
# db에 적재된 게시글들의 정보 불러오기
cur.execute(f"SELECT subject,content,comment FROM wordcloud WHERE time > '{select_day}';")
result = cur.fetchall()
# 데이터 한 문자열로 합치기
join_result = [' '.join(sub_tuple) for sub_tuple in result]
final_result = ' '.join(join_result)
# DB 연결 종료
cur.close()
con.close()
return final_result
def _make_wordcloud(**context):
word_data = context['task_instance'].xcom_pull(task_ids='get_content') # task3로부터 word_data를 가져옴
# 자연어 처리
okt = Okt()
nouns = okt.nouns(word_data) # 명사인 것들만 추출
words = [n for n in nouns if len(n) > 1] # 단어가 1글자 이상인 것만 추출
word_counts = Counter(words) # 빈도수 count
# 워드클라우드 만들기
wc = WordCloud(
font_path="./NanumGothic.ttf", # 폰트 지정 << 경로 문제 발생 가능
width=400, # 너비 지정
height=400, # 높이 지정
max_font_size=100, # 가장 빈도수 높은 단어의 폰트 사이즈 지정
background_color="white" # 배경색 지정
)
word_cloud = wc.generate_from_frequencies(word_counts) # 워드클라우드 빈도수 지정
# 워드클라우드 그래프 출력
plt.figure()
plt.imshow(word_cloud)
plt.axis("off")
# 워드클라우드 PNG 파일로 저장
today = datetime.now().date()
a = word_cloud.to_image().save(f"wc_{today}.png", format='PNG')
return a
task1 = PythonOperator(
task_id='cafe_crawling',
python_callable=_cafe_crawling,
provide_context=True,
dag=dag
)
task2 = PythonOperator(
task_id='get_content',
python_callable=_get_content,
provide_context=True,
dag=dag
)
task3 = PythonOperator(
task_id='make_wordcloud',
python_callable=_make_wordcloud,
provide_context=True,
dag=dag
)
task1 >> task2 >> task3
이 포스팅에 참고 및 활용한 문서들 !
좋은 정보 감사합니다