[DataEngineering] Airflow task 작업 Slack으로 메시지 받기

유혜지·2023년 12월 2일
1

DataEngineering

목록 보기
5/6


이제 곧


해당 게시글은 아래 블로그들을 바탕으로 작성했습니다.


이제까지의 Airflow 관련 게시물에서 작성된 dag의 scheduling interval은 24시간이다. 이렇게 24시간 간격으로 지정한 사이트를 크롤링해오고, 수집한 데이터를 csv 파일로 생성한 뒤, S3에 업로드된다. 개발 행사, 대회, 트렌딩 글을 csv 파일로 만들고, 이를 편리하게 한 눈에 파악할 수 있도록 slack API를 이용해 메시지봇을 만들 예정이다. 이 메시지봇은 dag의 scheduling interval과 동일하게, 24시간 간격으로 메시지를 전송한다.


bot token 부여받기

slack workspace를 생성한다. 이메일 인증을 하고, 간단히 몇 단계를 거쳐 생성할 수 있다.

이 링크를 따라 들어가 Create New app 버튼을 눌러 새로운 app을 생성해준다.

  • From Scratch 선택

  • `App Name 지정

  • 방금 생성한 workspace 선택

위 좌측 리스트에 OAuth & Permission 메뉴가 있다. 이곳으로 들어가자. 그리고 아래로 내리다보면 Scopes 파트가 있다. 여기에 chat:writeim:write 을 추가해준다.

두 가지 OAuth가 추가되었다면 맨 위로 올라가서 Install to Workspace 버튼을 눌러준다. 다음에 뜨는 팝업에서 허용까지 눌러주면 토큰 발급이 완료된다.

위와 같이 토큰 발급까지 마쳤다면, Python 환경에서 이 토큰을 이용해 Slack SDK와 연동하여 slack에 메시지를 보낼 수 있다.


Python에서 Slack SDK를 이용한 메시지 보내기 test

먼저 아래와 같이 slack sdk 라이브러리를 설치해줘야 한다.

pip install slack_sdk

python 코드를 이용해 slack에 message가 가는지 테스트해보겠다.
그 이전에, 발급받은 token의 봇을 내가 사용하려는 채널에 포함시켜줘야 한다(작성자는 이미 추가한 뒤이기 때문에 빨간 메시지가 뜨는 것이고, 처음 등록하는 것이라면 저런 메시지가 나오지 않을 것이다).

이제 진짜 python 코드를 이용해 slack에 message가 가는지 테스트 해보겠다.

from slack_sdk import WebClient
client = WebClient(token='your_token')
client.chat_postMessage(channel='#your-channel', text='test msg!')

오류가 발생하지 않으면 아래와 같이 slack에서 생성한 봇에게 메시지를 받을 수 있을 것이다.


Airflow와 Slack SDK를 활용한 크롤링 결과 메시지 받기

직전에 작성된 게시글의 크롤링 코드를 그대로 사용해 slack 봇을 통해 메시지를 보내겠다.
slack_alarm.py 파일은 다음과 같은 형태다. event_alarm과 velog_alarm은 event, velog 정보 크롤링을 모두 마치면, 각 리스트에 들어있는 값들을 하나씩 가져와 slack에 메시지를 보내는 형태이다. contest 정보 크롤링 코드는 정보를 하나씩 가져올 때마다 csv에 담는 형태이기 때문에, slack에 메시지를 보낼 때도, 정보를 하나 가져올 때마다 메시지를 보내는 방식으로 작성했다.

slack_alarm.py

from slack_sdk import WebClient
from datetime import datetime

class SlackAlarm:
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def event_alarm(self, titles, hosts, hashtags, startd, endd, links):
        for t, h, hasht, sd, ed, l in zip(titles, hosts, hashtags, startd, endd, links):
            text = '\n\n     🔔 *개발 행사 속보입니다!!!*'
            text += "\n✔️ 제목: " + t
            text += "\n✔️ 해시태그: " + ", ".join(hasht)
            text += '\n✔️ 주최: ' + h
            text += '\n✔️ 기간: ' + sd + " ~ " + ed
            text += "\n\n✔️ 더 자세한 정보를 알고 싶다면 하단 링크에 방문하세요!"
            text += '\n' + l
            self.client.chat_postMessage(channel=self.channel, text=text)            

    def contest_alarm(self, title, host, category, target, startd, endd, estartd, eendd, ad, dday, status, link):
        text = "\n\n     📣 *개발 대회 속보입니다!!!*"
        text += "\n✔️ 제목: " + title
        text += '\n✔️ 카테고리: ' + ", ".join(category)
        text += '\n✔️ 주최: ' + host
        text += '\n✔️ 대상: ' + ", ".join(target)
        text += '\n✔️ 접수 기간: ' + startd + " ~ " + endd
        text += '\n✔️ 심사 기간: ' + estartd + " ~ " + eendd
        text += '\n✔️ 발표일: ' + ad
        text += '\n✔️ D-Day: ' + dday
        text += "\n✔️ 상태: "+ status
        text += "\n\n✔️ 더 자세한 정보를 알고 싶다면 하단 링크에 방문하세요!"
        text += '\n' + link
        self.client.chat_postMessage(channel=self.channel, text=text)

    def velog_alarm(self, titles, writers, texts, links):
        for t, w, texts, l in zip(titles, writers, texts, links):
            text = '\n\n     📢 *개발 트렌드 속보입니다!!!*'
            text += "\n✔️ 제목: " + t
            text += '\n✔️ 작성자: ' + w
            text += "\n\n✔️ 더 자세한 정보를 알고 싶다면 하단 링크에 방문하세요!"
            text += '\n' + l
            self.client.chat_postMessage(channel=self.channel, text=text)

crawling_contest.py
개발 대회 정보를 가져오는 크롤링 코드에 다음과 같은 코드를 추가해주었다.

...
from slack_alarm import SlackAlarm
slack = SlackAlarm('#your_channel', 'your_token')

...

slack.contest_alarm(title, sponsor, category, target, reception_period[0], reception_period[1],
            evaluation_period[0], evaluation_period[1], announcement_date, d_day, condition, link)

crawling_event.py
개발 행사 정보를 가져오는 크롤링 코드에 다음과 같은 코드를 추가해주었다.

...
from slack_alarm import SlackAlarm
slack = SlackAlarm('#your_channel', 'your_token')

...

slack.event_alarm(titles, hosts, hashtags, start_dates, end_dates, links)

crawling_velog.py
개발 트렌딩 게시글 정보를 가져오는 크롤링 코드에 다음과 같은 코드를 추가해주었다.

...
from slack_alarm import SlackAlarm
slack = SlackAlarm('#your_channel', 'your_token')

...

slack.velog_alarm(title, writer, text, link)

DAG 구성

최종 dag 파이썬 파일은 변함 없다. 왜냐하면 크롤링 함수 안에 slack 메시지 전송 함수를 삽입했기 때문이다. 사실 본래 흐름이라면, S3에 적재된 csv 파일을 읽어 slack 봇을 통해 메시지를 전송해야 하는데 EC2 인스턴스 유형이 못버텨줘서🥲 크롤링과 함께 메시지를 보내는 형태로 구현했다..
최종적인 DAG 구성은 다음과 같다.

초록색으로 네모 쳐진 task에서 각각의 크롤링 정보를 얻는 작업과 slack 봇을 통해 메시지를 보내는 작업을 동시에 수행한다.

결과


DAG을 실행하면 위와 같이 slack 메시지가 오는 것을 확인할 수 있다.


다음에는 지금까지 airflow를 이용해 작업한 것들을 prometheus와 grafana를 이용해 모니터링 및 시각화 해보는 글을 작성하겠다.

0개의 댓글