효율적인 프로젝트를 위한 노션 슬랙 봇(1) - 설계와 원리

imasimdi·2024년 1월 14일
2
post-thumbnail

🤖 나의 슬랙 개척기

"노션 슬랙 봇? 그게 뭔데 🤔"

때는 9월, 새로운 프로젝트를 시작하기 전에 결심을 했었다. 이번에는 팀원들에게 도움을 주는 무언가를 해보자고. 어떤 것이 좋을까? 생각해보다가 프로젝트를 할 때 공통적으로 발생하는 문제점들이 생각났다. 바로 '메신저' 이다.

매번 카카오톡으로 소통을 하다보니깐 톡방이 많아지는 문제, 공유할 때 코드가 산재하는 문제, 중요한 알림을 확인하지 못하는 문제, 그리고 제일 중요한 것이 일상과 분리되지 않는다는 문제이다. 일상 톡과 개발 톡이 혼잡하게 섞여있는 카카오톡에서 탈피해서 slack 으로 프로젝트 메신저를 옮기기로 의견을 냈다. 팀원들은 흔쾌히 승낙하였다.

그리고나서 슬랙의 기능들을 보고, github 연동과 더불어 polly를 통한 투표기능을 추가하였다. 채널분리도 파트별로 나누고 나니 이정도면 쓸만하겠다! 라고 생각하였다.
하지만, 무언가 부족하였다. 메신저를 비롯한 알림기능이 slack으로 일원화되어있으니, 이를 활용하여 무언가를 만들면 더욱 효과가 잘나겠다라는 생각을 하였다.

그렇게 생각한 것이 "노션 회의 봇"이다. 노션으로 회의 페이지를 만들고 회의록을 관리하는 일이 많다보니, 언제가 회의였는지 잊는일이 다반사였다. 특히, 파트별로 회의 시간도 나누어져있다면 더더욱 잊기 쉬웠다. 따라서 나는 생각했다.

"노션의 회의를 슬랙에 알림으로 보내자"

그렇게 탄생한 것이 아래의 회의봇이다.

이 귀여운 회의봇은 준비된 템플릿으로 만들어진 노션 데이터베이스를 읽어와 회의 일정을 스케줄링 해주고, 참여자를 멘션하여 1일전, 5시간 전, 그리고 10분 전에 슬랙에 알림을 보내준다. 지금부터 어떻게 만들었는지, 나의 고민 점이 무엇이 있었는지 말하겠다.

🤖 설계

✅ 요구사항

우선 만들기에 앞서서 필요한 요구사항은 다음과 같았다.

  1. 노션 데이터베이스의 회의 시간을 저장하고, 1일전, 10분 전 알림을 슬랙에 보내주어야 한다.
  2. 회의록에는 참여자, 종류, 참여파트, 날짜, 이름이 있어야 한다.
  3. 회의에 참석하는 참여자를 노션에서 읽어, 해당 사람을 슬랙에서 멘션해야 한다.
  4. Celery의 상태를 모니터링 할 수 있어야 한다.

회의 시간에 맞춰 1일 전에 리마인드 시켜주고, 시작 10분 전에 알려주는 것이 좋다고 생각했다. 하지만 후에 피드백을 받아 5시간 전의 알림도 추가하였다.

그리고 내가 생각하는 두번째 핵심사항은 참여자였다. 누구나 그렇듯, 나와 상관없는 메시지가 핸드폰 푸시 알람으로 뜬다면 화가날 것이다. 따라서 규칙에 슬랙 채널마다 알림 정도를 다르게 하게 만들었다. 공지사항 정도는 모든 메시지에 푸시알람을 허용하고, 나머지는 '멘션'만 푸시알림이 가도록 하였다. 따라서, 회의 알림도 본인이 참석하는 회의만 슬랙에 알림이 다도록 멘션 기능을 추가하고 싶었다.

🤯 하지만 .. 나의 고민사항

처음에는 "노션 API와 슬랙 SDK 만 쓴다면 문제 없겠지!" 라는 패기로운 생각으로 바로 개발에 착수하였다. 그러나 큰 문제점이 존재했다. 바로 노션 API 에서는 데이터베이스 시그널이 없다는 것이었다. 즉, 데이터베이스에 새로운 페이지가 생겨도 나의 서버에 생겼음을 알려주려면 지속적인 배치 작업으로 데이터베이스를 읽어야 했다. 그래서 내가 필수로 필요했던 기술들은 다음과 같았다.

  • Fastapi: 큰 규모의 프로젝트는 아니기에 django 보다는 비동기 작업을 보다 쉽게할 수 있는 fastapi를 채택하였다.
  • Celery Worker, Celerty Beat: celery beat를 통해서 주기적으로 노션 데이터베이스를 스캔하고, 슬랙에 보낼 메시지들을 저장시킨다. 그러고 나서 메시지 별 eta에 따라서 worker가 슬랙에 메시지를 보내준다.
  • Celery Flower: 셀러리의 상태를 보여준다.
  • redis: celery의 메시지 브로커로 사용하였고, 후에 서술할 유저 리스트를 담아 두는 캐시로도 사용하였다.
  • AWS EC2, AWS RDS: 봇을 실해할 ec2 인스턴스와, 정보를 저장할 RDS를 위해 필요했다.
  • AWS CodeDeploy, Github Actions: 더 효율적인 배포시스템을 위해 Github actions에 CI를, Codedeploy로 CD를 구축했다.
  • Docker-compose: 띄워야 할 컨테이너들이 많았기에(fastapi, celery beat, worker, flower, redis) docker-compose를 사용했다.

⚙️ ERD

Slacks에는 워크스페이스의 토큰을 담아두고, SlackChannels에는 채널 명을 담아둔다. 그리고 Notions에는 노션 api 토큰을, NotionDatabases에는 회의록이 담겨있는 노션 데이터베이스를 저장시킨다. 데이터베이스와 채널을 이어주는 것이 바로 NotionSlacks 테이블이다.
처음에는 이렇게 테이블이 세분화가 되어있지 않고 코드로 하드코딩 되어있으니, 채널이 바뀔 때나 연결이 바뀔 때마다 재배포를 해야됐다. 이렇게 테이블을 만들어서 디비 조작으로만으로도 관리가 가능하였다. 그리고 스캔한 회의 페이지는 NotionPages에 저장된다. 이제 코드로 핵심 기능을 어떻게 구현했는지 소개하겠다.

🤖 핵심 구현 사항

모든 코드는 여기에서 확인할 수 있다!
https://github.com/leehjhjhj/nothon-calander-slackbot-project

노션 데이터베이스 스캔 및 메시지 스케줄링

가장 핵심 부분이자, 많이 공들이고 최적화를 많이 시도했던 파트이다. oreilly 식으로 한번 설명해보겠다.

api/service/save_meeting_facade.py

def save_meeting(self):
        try:
            notion_database_ids = self._notion_slack_mapping_repo.get_all_database_ids() #1
            for notion_database_id in notion_database_ids:
                notion_api_key = self._notion_repo.get_api_token_by_notion_database_id(notion_database_id) #2
                data = self._read_notion_database(notion_database_id, notion_api_key)
                results = data.get('results') #3
                
                list_meeting_ids = self._meeting_repo.get_all_page_ids(notion_database_id)
                set_meeting_ids = set(list_meeting_ids) #4

                for result in results:
                    try:
                        meeting = self._farthing_calender_data(result) #5
                        if not self._check_meeting_status(meeting.status):
                            continue #6
                        if self._check_meeting_time(meeting.time) and not self._check_meeting_id(meeting.page_id, set_meeting_ids): #7
                            logging.info(f"{meeting.name}이 if에 들어왔다.")
                            meeting = self._participants_process.add_participants(meeting, result) #8
                            worker_facade(meeting) #9
                            try:
                                self._meeting_repo.add_meeting(meeting) #10
                            except Exception as e:
                                logging.error(f"저장 오류: {e}")
                    except Exception as e:
                        logging.error("DB ID: {}, 파싱 에러 발생: {}".format(notion_database_id, e))
                        continue
        except Exception as e:
            logging.error(f"save meeting 자체 오류 발생!: {e}")
  • #1 : 스캔 대상의 노션 데이터베이스(매핑 된 데이터베이스)의 id를 모두 불러온다.
  • #2 : 가져온 노션 데이터베이스의 id로 노션 api 토큰을 가져온다.
  • #3 : 해당 api 키로 노션api를 통해서 회의 페이지들의 정보를 가져오고, 이중 results만 가져온다.
  • #4 : NotionPage에 있는 page들의 id를 가져온 후, 집합으로 만든다. 후에 중복된 페이지인지 확인할 때 쓰인다.
  • #5 : #3 에서 가져온 결과들을 하나씩 돌면서 필요한 필드만 뽑아 MeetingPage 객체를 만든다.
  • #6 : 만약 회의의 상태가 미정 이거나 취소 및 변경이면 즉시 건너 뛴다.
  • #7 : 만약 회의의 시간이 지금보다 작거나, 2일 이상 크다면 False, 그리고 #4에서 만든 NotionPage 집합에 있다면 True를 뱉어(not이 붙어서 반대로 적용) 한번 더 거른다.
  • #8 : 이제 if문까지 들어왔다면 새로운 회의 페이지이다. 후술할 참여자 추가 로직을 진행시킨다.
  • #9 : 이제 완성된 회의 객체를 worker_facade 라는 슬랙에 보낼 메시지들을 스케줄러에 등록한다.
  • #10 : 그리고 NotionPage를 저장한다.

뭔가 많이 느려보이지만, 그럼에도 불구하고 최소한의 회의만 다루려고 많은 조건들을 추가했다. 그랬더니 평균 2~3초였던 속도가 0초대로 최적화 되었다.

메시지 스케줄링

api/service/worker_facade.py

def worker_facade(meeting: NotionPage):
    logging.info('in worker facade')
    try:
        extra = {
            "page_id" : meeting.page_id,
            "time": meeting.time,
            "meeting_url" : meeting.meeting_url,
            "name" : meeting.name,
            "meeting_type": meeting.meeting_type,
            "notion_database_id": meeting.notion_database_id,
            "participants": meeting.participants,
        }
        schedule_periods = [
            {"period": "one_day", "delta": timedelta(days=1), "task": schedule_one_day_before},
            {"period": "five_hours", "delta": timedelta(hours=5), "task": schedule_five_hours_before},
            {"period": "ten_minutes", "delta": timedelta(minutes=10), "task": schedule_ten_minutes_before},
        ]

        for schedule in schedule_periods:
            reminder_time = meeting.time - schedule["delta"]
            uuid = make_uuid(meeting.page_id, schedule["period"])
            res = AsyncResult(id=uuid, app=celery_task)

            if not res.ready():
                schedule["task"].apply_async(kwargs=extra, eta=reminder_time, task_id=uuid)

    except Exception as e:
        logging.error('Error in worker_facade', exc_info=e)

앞서 보았던 worker_facade 이다. 이곳에서는 하루, 다섯시간, 열시간 전에 알림 메시지가 가도록 스케줄링한다. UUID를 통해서 식별자를 만들어주고, 회의 시간 - 기간을 통해서 나온 reminder_timeeta를 통해서 알림 도착 예정 시간을 지정해준다.

슬랙에 스케줄된 메시지 전송

api/celery_config/celery_worker.py

@celery_task.task
def schedule_one_day_before(**kwargs):
    notion_slack_mapping_repo = NotionSlackMappingRepository()
    slack_repo = SlackRepository()
    try:
        page_id = kwargs.get('page_id')
        time = kwargs.get('time')
        name = kwargs.get('name')
        meeting_url = kwargs.get('meeting_url')
        meeting_type = kwargs.get('meeting_type')
        participants = kwargs.get('participants')
        
        notion_database_id = kwargs.get('notion_database_id')
        slack_channel_ids = notion_slack_mapping_repo.get_slack_channel_id_by_notion_database_id(notion_database_id)

        transformed_date = transform_date(time)
        message = f">:bell: {transformed_date}에 \"{name}\"이/가 예정되어있어요! 잊지 마세요. \n" \
            f"> 회의 타입: `{meeting_type}`\n" \
            f"> 회의 노션페이지: <{meeting_url}|바로가기>\n"\
            f"> 참여자: {participants}"
        
        if check_status(page_id):
            for slack_channel_id in slack_channel_ids:
                slack_token = slack_repo.get_api_token_by_slack_channel_id(slack_channel_id)
                slack = SendToSlackAPI(slack_token, slack_channel_id) 
                slack.send_message(message)
                print(f"{message} 전송 완료")

    except Exception as e:
        print(f"전송실패: {e}")

그리고eta가 도달하면 각각 스케줄된 task 함수가 celert worker에서 실행된다. 위는 예시로 하루 전 task 함수를 가져왔다. 같은 노션 데이터베이스에 매핑되어있는 슬랙 채널이 많을 수 있으니 매핑된 slack_channel_ids을 불러와주고, 메시지를 만든다.
그리고 만약에 보내려고 할 때 해당 회의 페이지의 항태가 취소 및 변경 이라면 메시지를 보내지 않는다.

참여자 스캔 및 처리

'노션 데이터베이스 스캔 및 메시지 스케줄링' 에서 #8번의 self._participants_process.add_participants(meeting, result)의 로직이다. 노션 데이터베이스에는 참여자 필드가 있는데, 이를 슬랙 사용자에 멘션을 해주려면 슬랙 사용자의 유저 아이디을 알아야한다. 여기에서 많은 고민을 하였는데, 바로 슬랙 사용자를 불러오는 로직이었다.

우선 노션의 참여자를 슬랙의 실제 유저네임으로 지정한다. 슬랙에서 이현제 라는 유저네임이라면 노션 참여자 멀티 셀렉 필드에서도 이현제로 지정한다. 만일 다른 팀원이 슬랙 네임을 영어로 지정했다면 노션 필드에도 영어로 지정한다. 예를들자면 Hyunje Lee 처럼 말이다.

그 이후, 원래라면 add_participants에서 Slack SDK를 사용해 해당 워크스페이스의 모든 사용자의 유저 네임 리스트와 유저아이디 불러왔다. 하지만 문제가 발생한게 스케줄링 할 페이지가 많아지면서 슬랙 SDK를 통한 슬랙 API 호출에 장애가 생겼다. 너무 많이 API를 불러와서 호출 횟수 제한을 넘긴 것이다. 전의 신년 운세 맞이 OpenAi api때와 비슷한 상황이었다. 또한 add_participants 메서드를 불러올 때 마다 페이지 갯수 만큼 Slack api를 호출하는데 이는 네트워크 통신이기 때문에 당연히 지연도 있고 속도도 느려졌다.

따라서 생각해낸 것이 미리 유저네임과 유저아이디의 매핑을 채널아이디:dict[유저네임, 슬랙유저아이디] 으로 미리 레디스에 넣어두는 것이다! 이 또한 beat task로 스케줄링 해두고, 수동으로 Post API도 생성해두면 만약 슬랙의 워크스페이스 구성원이 달라져도 쉽게 대응할 수 있다고 생각한다.

api/service/participants_process.py

def add_participants(self, meeting: NotionPage, result) -> NotionPage:
        participants = self._make_participants(result, meeting.notion_database_id)
        meeting.participants = participants if participants else None
        return meeting

아까 두번 언급한 add_participants 메서드이다. 여기서 _make_participants 메서드를 통해서 save_meeting_facade에서 받아온 result에 참여자 멘션 문자열을 붙여서 반환한다. 이것이 그대로 celery_worker의 인자로 등록되어 전송이 되면 실제 멘션이 되어서 푸시 알림을 받을 수 있다.

def _make_participants(self, result, notion_database_id):
        try:
            mention = ''
            participants_infos = result.get('properties', {}).get("참여자", {}).get("multi_select", [{}])
            participants_names = [participants_info['name'] for participants_info in participants_infos]
            if not participants_names:
                return False

            slack_channel_id = self._slack_repo.get_slack_channel_id_by_notion_database_id(notion_database_id)
            name_user_id_map_byte = self._redis_conn.get(f"{slack_channel_id}")
            name_user_id_map = json.loads(name_user_id_map_byte)

            for participants_name in participants_names:
                user_id = name_user_id_map.get(f'{participants_name}', False)
                if user_id:
                    mention += f"<@{user_id}> "
            return mention
        
        except Exception as e:
            logging.error(f"참여자 파싱 에러 발생: {e}")

이곳에서는 result의 참여자 부분만 파싱하여서 participants_names에 담아둔다.
그리고는 slack_channel_id로 레디스에 있는 name_user_id_map을 불러와 participants_name에 대응하는 실제 슬랙 유저아이디(ex: @U063MU53~~)을 찾는다. 그리고 최종 mention 할 문자열을 반환해준다.

def save_user_lists_to_redis(self):
        slack_ids = self._notion_slack_mapping_repo.get_all_slack_channels()
        for slack_id in slack_ids:
            try:
                slack_token = self._slack_repo.get_api_token_by_slack_channel_id(slack_id)
                name_user_id_map = self._get_slack_users(slack_token)
                name_user_id_bytes = json.dumps(name_user_id_map)
                self._redis_conn.set(f"{slack_id}", name_user_id_bytes)
            except Exception as e:
                logging.error(f"레디스 유저리스트 저장 오류 발생: {e}")
                
def _get_slack_users(self, slack_token):
        client = WebClient(slack_token)
        response = client.users_list()
        if response['ok']:
            members = response['members']
            name_id_map = {}
            for member in members:
                id = member.get('id')
                name = member.get('real_name')
                name_id_map[f'{name}'] = id
            return name_id_map
        else:
            return None

이 부분은 1일에 한번씩 celery beat로 돌아가는 로직이다._get_slack_users에서 WebClient 라는 slack sdk를 class를 통해서 user 리스트를 가져오고, 유저의 이름과 유저의 아이디를 매핑한 name_id_map을 반환한다.

save_user_lists_to_redis에서는 모든 슬랙 채널의 name_id_map을 받아 슬랙 채널아이디:dict[유저네임, 슬랙유저아이디] 를 저장해준다.

이렇게 레디스를 활용해서 API 호출 제한도 걸리지 않고, API 호출시 네트워크 IO도 줄일 수 있어 조금 더 효율적인 로직이 될 수 있었다.

🤖 트러블 슈팅 🧨

처음 이 회의봇을 만들었을 때, 정말 고생했던 것이 있었다. 바로 visibility_timeout 설정이다. 해당 설정을 모르고 있었을 때, 왜인지 모르게 1시간마다 계속 중복으로 메시지 스케줄링이 되고 있었다. 정말 왜인지 몰라서 매우 헤매고 헤매다가 마침내 저 설정을 찾아냈었다.

celery_task.conf.broker_transport_options = {'visibility_timeout': 31536000} 

여기서 visibility_timeout 이란 워커에서 task를 시간 내에 완료하지 못 했을 때, 다른 워커에게 task를 전달한다. 이 visibility_timeout의 기본 설정이 1시간이었던 것이고, worker 또한 하나 밖에 없었기 때문에 계속해서 1시간 마다 재할당이 이루어졌던 것이다. 뒤늦게 알아차려서 다행이었지.. 이걸로 추석 연휴 다 날린거 생각하면 지금도 마음이 아프다.

🤖 앞으로..

정말 공들이고 많은 실험을 해서 만든 봇이다. 너무 호기롭고, 쉽게 생각하고 진입했다가 큰코 다친 개인 프로젝트.. 하지만 그만큼 팀원들에게 도움이 되었고, 심지어 한 팀원은 다른 자신의 프로젝트에 도입하고 싶다고해서 밑과 같은 안내 페이지까지 만들었었다.
회의봇 설정 관련 설명 페이지

또한 이렇게 Celery flower 까지 사용해보니 'Celery를 정말 잘 사용하면 대박이겠는 걸?' 이라는 호기심 또한 생기게 해주었다.

정말 재밌는 프로젝트고, 앞으로도 꾸준히 디벨롭 해나갈 나의 소중한 프로젝트이다. 2탄에서는 최근에 어떻게 고도화를 시켰는지 소개하겠다.

profile
터키어 하는 개발자(호소인)이에요

0개의 댓글