[final project] airflow를 활용한 wordcloud 생성(3): airflow dag 실행

싱숭생숭어·2023년 7월 27일
0

팀프로젝트

목록 보기
3/6

airflow dag 실행

dag 파일 작성

이전에 완성된 python 파일을 dag 형식으로 수정해주었다.

dag 파일 작성 시 유의사항

  1. 필요한 모듈 import

  2. Default Argument 작성하기

  3. 시간은 Asia/Seoul로 바꿔주기(기본 시간은 UTC)

airflow 시간 바꾸는 방법 세가지

  • 환경변수 AIRFLOW__CORE__DEFAULT_TIMEZONEAsia/Seoul로 설정
  • airflow.cfg 파일에서 default_timezone = utc 부분을 default_timezone = Asia/Seoul으로 바꿔줌
  • DAG파일에서 pendulum 패키지를 이용해 시간을 변경
    -> 내가 선택한 방법 ! 아래와 같은 코드를 py파일에 추가해주면 됨
import pendulum
# 기본 UTC 시간에서 Asia/Seoul로 변경
local_tz = pendulum.timezone("Asia/Seoul")
  1. DAG를 선언하기
  • 나의 경우 db를 연결하는 방법을 PythonOperator를 사용해 con, cur로 연결해주었지만, 필요한 경우 webserver의 connection과 airflow의 mysqlOperator를 사용해 db를 연결해줄 수 있음
  1. task 작성

  2. Dependency 설정(task 순서 설정)

아래는 내가 작성한 dag 파일의 전문

import requests
from bs4 import BeautifulSoup
import pendulum
from datetime import datetime, timedelta
import pandas as pd
import pymysql
import re
from wordcloud import WordCloud
from PIL import Image
import matplotlib.pyplot as plt
from collections import Counter
import numpy as np
from konlpy.tag import Okt

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# 기본 UTC 시간에서 Asia/Seoul로 변경
local_tz = pendulum.timezone("Asia/Seoul")

# default_args 정의
default_args = {
    "owner" : "admin",
    "start_date" : datetime(2023, 7, 27, tzinfo=local_tz),
    # dag 실행 중 오류 발생하면 발송하는 메일 비활성화
    "email_on_failure" : False
}

dag = DAG(
    dag_id="airflow_wordcloud",
    max_active_runs = 1, # dag 동시 실행 방지
    default_args = default_args, # 위에서 선언한 기본 args 
    # 분 시 일 월 요일 순으로 실행 주기 설정
    schedule_interval="0 8 * * *", #매일 오전 8시 실행
    # 이전 dag가 오래걸려서 다음 dag 시작 시간을 초과한 경우,
    # True면 이전 dag 종료 이후 다음 dag 실행, False면 이전 dag 실행 중 다음 dag 실행
    catchup = False
)

def _connect_db():
    con = pymysql.connect(
        host = 'db서버 ip',
        port = 3306,
        user = 'root',
        password = '비밀번호',
        database = 'project'
    )
    cur = con.cursor()
    return cur

def _cafe_crawling(start=1, article_num=0, **context):

    # 기존 DB에 적재된 데이터 중 가장 최신 id값 가져오기
    cur.execute('select max(id) from wordcloud;')
    recent_article_id = cur.fetchall()[0][0]
    
    article_id = recent_article_id + 1
    while article_id <= recent_article_id + 100: # 한번 크롤링 할때마다 100개씩 추출
        url = f"https://apis.naver.com/cafe-web/cafe-articleapi/v2.1/cafes/28866679/articles/{article_id}?query=&menuId=1&boardType=L&useCafeId=true&requestFrom=A"
        response = requests.get(url)
        data = response.json()

        # 게시글 정보 추출
        article = data.get('result', {}).get('article', {})
        article_timestamp = article.get('writeDate', 0) / 1000  # 작성일자를 밀리초에서 초로 변환

        # 작성일자를 datetime 형식으로 변환
        article_time = datetime.fromtimestamp(article_timestamp)

        # 게시글 작성일자가 현재 이후 또는 삭제된 게시글인 경우 None 반환
        if (article_time >= datetime.now()) or (not article.get('menu')):
            article = None
            comments = None
            article_time = None

        # 댓글 정보 추출
        comments = data.get('result', {}).get('comments', {}).get('items', [])

        if article is not None: #게시글 정보가 있는 경우
            article_menu = article.get('menu', {}).get('name', '')
            article_subject = article.get('subject', '')

            # 게시글 내용 추출
            article_content_html = article.get('contentHtml', '')
            article_content_soup = BeautifulSoup(article_content_html, 'html.parser')
            article_content = article_content_soup.find('div', class_='se-component se-text se-l-default')
            if article_content:
                article_content = article_content.text.strip()
            else:
                article_content = ""

            # 댓글 내용 추출
            comment_contents = []
            for comment in comments:
                comment_contents.append(comment.get('content', ''))
            article_comment = ' '.join(comment_contents)
                
            # 정규표현식 전처리
            article_menu = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_menu)
            article_subject = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_subject)
            article_content = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_content)
            article_comment = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_comment)

            # DB에 적재
            try:
                cur.execute(f'insert into wordcloud values("{article_id}", "{article_menu}", "{article_subject}", "{article_content}", "{article_comment}", "{article_time}")')
                con.commit()
            except Exception as e:
                print(f"에러 발생: {str(e)}")

            article_num += 1 # 방금 크롤링으로 DB에 적재된 데이터 개수

        # article_id 증가
        article_id += 1
        start += 1

        
    return article_num

# 해당 기간동안의 크롤링 데이터 DB에서 불러오기
def _get_content(**context):

    # 불러오고 싶은 기간 선택(인자 days, 여기선 일주일로 함)
    today = datetime.now()
    select_day = datetime.strftime(today - timedelta(days=7),"%Y-%m-%d %H:%M:%S") 

    # db에 적재된 게시글들의 정보 불러오기
    cur.execute(f"SELECT subject,content,comment FROM wordcloud WHERE time > '{select_day}';")
    result = cur.fetchall()

    # 데이터 한 문자열로 합치기
    join_result = [' '.join(sub_tuple) for sub_tuple in result]
    final_result = ' '.join(join_result)


    return final_result

def _make_wordcloud(**context):
    word_data = context['task_instance'].xcom_pull(task_ids='get_content')  # task3로부터 word_data를 가져옴

    # 자연어 처리
    okt = Okt()
    nouns = okt.nouns(word_data) # 명사인 것들만 추출
    words = [n for n in nouns if len(n) > 1] # 단어가 1글자 이상인 것만 추출

    word_counts = Counter(words) # 빈도수 count 
    
    # 워드클라우드 만들기
    wc = WordCloud(
    font_path="./NanumGothic.ttf", # 폰트 지정 << 경로 문제 발생 가능
    width=400, # 너비 지정
    height=400, # 높이 지정
    max_font_size=100, # 가장 빈도수 높은 단어의 폰트 사이즈 지정
    background_color="white" # 배경색 지정
    )
    word_cloud = wc.generate_from_frequencies(word_counts) # 워드클라우드 빈도수 지정

    # 워드클라우드 그래프 출력
    plt.figure()
    plt.imshow(word_cloud)
    plt.axis("off")
	
    # 워드클라우드 PNG 파일로 저장
    today = datetime.now().date()
    a = word_cloud.to_image().save(f"wc_{today}.png", format='PNG')
    
    return a

task1 = PythonOperator(
    task_id='connect_db',
    python_callable=_connect_db,
    dag=dag
)

task2 = PythonOperator(
    task_id='cafe_crawling',
    python_callable=_cafe_crawling,
    provide_context=True,
    dag=dag
)

task3 = PythonOperator(
    task_id='get_content',
    python_callable=_get_content,
    provide_context=True,
    dag=dag
)

task4 = PythonOperator(
    task_id='make_wordcloud',
    python_callable=_make_wordcloud,
    op_args=task3.output,  # task3의 반환 값을 task4의 파라미터로 전달합니다.
    provide_context=True,
    dag=dag
)

task1 >> task2 >> task3 >> task4

airflow에서 dag 실행

airflow 실행

나는 docker에 airflow를 컨테이너로 띄워두었으므로 docker 부터 실행 !

  • sudo service docker start

  • sudo service docker status

airflow 컨테이너 실행

  • docker start airflow

airflow 컨테이너 진입

  • docker exec -it airflow /bin/bash

airflow bask 창에 입력, airflow webserver와 scheduler 실행

  • airflow webserver && airflow scheduler

dags 폴더에 dag 파일 넣기

이전에 docker에 airflow 컨테이너를 띄울때, docker volume을 지정해두었다. 따라서 지정한 /home/gyu/workspace/dags 폴더에 dag 파일을 넣어주면, 도커 볼륨에 의해서 docker에 띄워둔 airflow의 dags 폴더에도 dag 파일이 복사가 된다.
-> dags 폴더에 권한문제가 발생할 경우, (1)에 적어두었다.


airflow webserver에서 확인

  • airflow webserver에 뜨지 않으면 터미널에서 ctrl+c를 눌러 airflow webserver와 scheduler를 멈추고 다시 접속했다.

  • (내 dag 오류는 아니지만)
    위와 같은 오류가 뜨면서 webserver dag 목록에 내가 작성한 dag가 뜨지 않는다.
    이 경우, 계속해서 gpt에게 내 .py파일을 보여주면서 수정해야할 부분들을 수정했다.

  • 그럼 이제 아래 사진과 같이 dags 목록에 뜬 내 airflow_wordcloud dag를 확인할 수 있다.


dag 실행, task1 error(log 확인)

  • 근데 계속해서 dag를 trigger하면 첫 task(db_connect)부터 에러가 발생해서 catchup = False, start_date = 오늘날짜 로 수정해주었다.

  • 근데도 안댄다. . . 포기하고 싶다 . . .

  • 하지만 실패는 성공의 어쩌구 . . . 따라서 이번엔 해당 오류 나는 task의 log를 확인해보기로 했다.

    • 로그는 위 상단 페이지에서 graph -> 해당 task 선택 -> 뜨는 창에서 log 선택 -> log 확인


      task1에서 이게 도대체 뭔 에러일지 감도 안잡히네요 . . pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '127.0.0.1' ([Errno 111] Connection refused)")
  • 그냥 local db가 아니라 gcp에 띄운 db에서 작업하기로 결정 ! connect_db task부분을 수정해준다. 그러고 다시 실행하면,

    이제 드디어 task1이 정상적으로 실행되었다 !!!


dag 실행, task2 error(log 확인)

  • 곧 바로 발생한 task2의 error는요, NameError: name 'cur' is not defined
    즉, 코드에서 전역변수로 설정한 con, cur을 인식하지 못해 발생하는 오류였다. 이는 gpt한테 물어봤더니,

    Airflow에서는 여러 개의 워커(worker)가 동시에 실행되므로 PythonOperator에서 전역 변수를 사용하는 것이 의도대로 동작하지 않을 수 있습니다. 각 워커는 독립적으로 작동하며, 서로 간에 전역 변수를 공유하지 않습니다. 따라서 PythonOperator 내에서 정의된 전역 변수는 다른 PythonOperator에서는 접근할 수 없습니다.
    대신, Airflow에서는 XCom이라는 메커니즘을 제공하여 작업(Task) 간 데이터를 공유할 수 있습니다. XCom은 작업의 출력을 다른 작업이 사용할 수 있도록 허용합니다.
    문제를 해결하기 위해 _connect_db 함수의 반환 값을 XCom으로 저장하고, _cafe_crawling 함수에서 이 값을 사용하도록 수정해보겠습니다. 또한 _make_wordcloud 함수에서도 word_data를 XCom으로부터 가져와 사용할 수 있도록 수정합니다.

  • 위에 알려준대로 connect_db task에는 return con, cur을 선언하고
    cafe_crawling에는 con, cur = context['task_instance'].xcom_pull(task_ids='connect_db') 구문을 추가해주었다.

  • 그랬더니 이번엔 아예 task1에서 TypeError: Object of type Cursor is not JSON serializable이 발생했는데, 이는 pythonOperator에서 함수를 실행할때, 반환 값을 XCom을 이용해 다른 곳에 전달하는 과정에서 JSON이 직렬화 돼 있어야 하는데, 내가 반환하려 했던 pymysqlcur 객체는 직렬화가 안되는 객체였음. 따라서 task1을 삭제하고 각각의 con, cur을 필요로 하는 task2, task3에 pymysql 구문을 추가해주었다.

...(생략)...

def _cafe_crawling(start=1, article_num=0, **context):
    # DB 연결
    con = pymysql.connect(
        host='db서버 ip',
        port=3306,
        user='root',
        password='비밀번호',
        database='project'
    )
    cur = con.cursor()

    # 기존 DB에 적재된 데이터 중 가장 최신 id값 가져오기
    cur.execute('select max(id) from wordcloud;')
    recent_article_id = cur.fetchall()[0][0]
    
    article_id = recent_article_id + 1
    while article_id <= recent_article_id + 100: # 한번 크롤링 할때마다 100개씩 추출
        url = f"https://apis.naver.com/cafe-web/cafe-articleapi/v2.1/cafes/28866679/articles/{article_id}?query=&menuId=1&boardType=L&useCafeId=true&requestFrom=A"
        response = requests.get(url)
        data = response.json()

        # 게시글 정보 추출
        article = data.get('result', {}).get('article', {})
        article_timestamp = article.get('writeDate', 0) / 1000  # 작성일자를 밀리초에서 초로 변환

        # 작성일자를 datetime 형식으로 변환
        article_time = datetime.fromtimestamp(article_timestamp)

        # 게시글 작성일자가 현재 이후 또는 삭제된 게시글인 경우 None 반환
        if (article_time >= datetime.now()) or (not article.get('menu')):
            article = None
            comments = None
            article_time = None

        # 댓글 정보 추출
        comments = data.get('result', {}).get('comments', {}).get('items', [])

        if article is not None: #게시글 정보가 있는 경우
            article_menu = article.get('menu', {}).get('name', '')
            article_subject = article.get('subject', '')

            # 게시글 내용 추출
            article_content_html = article.get('contentHtml', '')
            article_content_soup = BeautifulSoup(article_content_html, 'html.parser')
            article_content = article_content_soup.find('div', class_='se-component se-text se-l-default')
            if article_content:
                article_content = article_content.text.strip()
            else:
                article_content = ""

            # 댓글 내용 추출
            comment_contents = []
            for comment in comments:
                comment_contents.append(comment.get('content', ''))
            article_comment = ' '.join(comment_contents)
                
            # 정규표현식 전처리
            article_menu = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_menu)
            article_subject = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_subject)
            article_content = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_content)
            article_comment = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣 ]', '', article_comment)

            # DB에 적재
            try:
                cur.execute(f'insert into wordcloud values("{article_id}", "{article_menu}", "{article_subject}", "{article_content}", "{article_comment}", "{article_time}")')
                con.commit()
            except Exception as e:
                print(f"에러 발생: {str(e)}")

            article_num += 1 # 방금 크롤링으로 DB에 적재된 데이터 개수

        # article_id 증가
        article_id += 1
        start += 1

    # DB 연결 종료
    cur.close()
    con.close()
        
    return article_num

# 해당 기간동안의 크롤링 데이터 DB에서 불러오기
def _get_content(**context):
    # DB 연결
    con = pymysql.connect(
        host='db서버 ip',
        port=3306,
        user='root',
        password='비밀번호',
        database='project'
    )
    cur = con.cursor()

    # 불러오고 싶은 기간 선택(인자 days, 여기선 일주일로 함)
    today = datetime.now()
    select_day = datetime.strftime(today - timedelta(days=7),"%Y-%m-%d %H:%M:%S") 

    # db에 적재된 게시글들의 정보 불러오기
    cur.execute(f"SELECT subject,content,comment FROM wordcloud WHERE time > '{select_day}';")
    result = cur.fetchall()

    # 데이터 한 문자열로 합치기
    join_result = [' '.join(sub_tuple) for sub_tuple in result]
    final_result = ' '.join(join_result)

    # DB 연결 종료
    cur.close()
    con.close()

    return final_result

def _make_wordcloud(**context):
    word_data = context['task_instance'].xcom_pull(task_ids='get_content')  # task3로부터 word_data를 가져옴

    # 자연어 처리
    okt = Okt()
    nouns = okt.nouns(word_data) # 명사인 것들만 추출
    words = [n for n in nouns if len(n) > 1] # 단어가 1글자 이상인 것만 추출

    word_counts = Counter(words) # 빈도수 count 
    
    # 워드클라우드 만들기
    wc = WordCloud(
    font_path="./NanumGothic.ttf", # 폰트 지정 << 경로 문제 발생 가능
    width=400, # 너비 지정
    height=400, # 높이 지정
    max_font_size=100, # 가장 빈도수 높은 단어의 폰트 사이즈 지정
    background_color="white" # 배경색 지정
    )
    word_cloud = wc.generate_from_frequencies(word_counts) # 워드클라우드 빈도수 지정

    # 워드클라우드 그래프 출력
    plt.figure()
    plt.imshow(word_cloud)
    plt.axis("off")
	
    # 워드클라우드 PNG 파일로 저장
    today = datetime.now().date()
    a = word_cloud.to_image().save(f"wc_{today}.png", format='PNG')
    
    return a

task1 = PythonOperator(
    task_id='cafe_crawling',
    python_callable=_cafe_crawling,
    provide_context=True,
    dag=dag
)

task2 = PythonOperator(
    task_id='get_content',
    python_callable=_get_content,
    provide_context=True,
    dag=dag
)

task3 = PythonOperator(
    task_id='make_wordcloud',
    python_callable=_make_wordcloud,
    provide_context=True,
    dag=dag
)

task1 >> task2 >> task3
  • 그러고 다시 airflow를 실행했더니, 이제 task3(make_wordcloud)말고는 제대로 실행이 되는 걸 볼 수 있었다 !!!

이 포스팅에 참고 및 활용한 문서들 !

profile
공부합시당

1개의 댓글

comment-user-thumbnail
2023년 7월 27일

좋은 정보 감사합니다

답글 달기