Event Queue, Slack Alert

이경환·2024년 1월 8일

프로젝트_RSS Reader

목록 보기
2/3
post-thumbnail

이 글을 쓰는 이유

저희 파리지옥 RSS Reader에서 상당히 중요한 서비스 중 하나인 Post를 크롤링 해오는 작업을 하면서 겪었던 고민과 개선 경험을 공유하고자 합니다. 복잡한 로직이다보니

  • 알림 서비스
  • Post 크롤링 서비스 개선

2파트로 나눠서 진행하겠습니다. 이번 시간은 알림 서비스에 대해 소개해 보겠습니다.

저희 프로젝트는 밑에 블로그에 소개되어있습니다.

스터디 파리지옥과 그 첫 번째 프로젝트를 소개합니다

요약하면 RSS_Reader는 블로그 새 글을 알림으로 받는 서비스입니다.

사용자가 구독한 블로그 글이 수정되거나 추가 됐을 시 저희 서버 DB에 Parser를 통해 글을 저장하게 됩니다.

아무래도 중요한 작업이다 보니 성능 측면이나 의존성 고려해야 할 부분이 많은 부분이고 팀원들과 여러 방향성을 고려하여 지속적인 업데이트하는 서비스입니다.

RSS 페이지 파싱 후 Post를 저장하는 스케쥴

먼저 초기 RSS 페이지 파싱 후 Post를 저장하는 스케줄 입니다. 이제부터 이 서비스 계층을 PostCollectService라 부르겠습니다.

PR 링크는 다음과 같습니다. https://github.com/FlytrapHub/RSS-Reader/pull/42 시퀀스 다이어 그램과 같이 진행됩니다.

동작 방식

이 부분은 같은 팀원인 APE가 진행해 주셨는데요. 요약하자면 다음과 같습니다.

  • @Schedule 어노테이션을 사용해 10분마다 블로그의 RSS XML 문서를 파싱하여 DB에 저장하도록 구현 (이미 DB에 저장된 Post는 Subscribe id와 guid로 구별)

  • 모든 구독 정보를 조회해 TaskExecutor 클래스를 사용해 비동기 처리되어 있으며 블로그 하나당 하나의 스레드에서 동작하도록 합니다.

  • RssPostParser 클래스의 parseRssDocuments()는 구독 URL을 넘겨받아 (XML 문서 파싱은 org.w3c.dom.Document 사용합니다.) 필요한 dto를List형식으로 반환하도록 합니다.

    다시 말해 RssPostParser 가 블로그 URL을 받아 해당 블로그의 글들을 List형식으로 반환합니다.

		private final RssPostParser postParser;
    private final SubscribeEntityJpaRepository subscribeEntityJpaRepository;
    private final PostEntityJpaRepository postEntityJpaRepository;
    private final TaskExecutor taskExecutor;

@Scheduled(fixedDelay = TEN_MINUTE)
    public void collectPosts() {
        List<SubscribeEntity> subscribes = subscribeEntityJpaRepository.findAll();

        for (SubscribeEntity subscribe : subscribes) {
            processPostCollectionAsync(subscribe);
        }
        //TODO 스케줄 작업이 끝나면 알림 envet작업이 발생한다. 큐에 담긴 이벤트를 발행한다.
        log.info("alert 실행");
        alertFacadeService.alert();
    }

/**
     * 구독한 블로그의 RSS에서 게시글들을 읽어서 DB에 저장한다.<br> 비동기 처리되어 있으며 블로그 하나당 하나의 스레드에서 동작한다.
     *
     * @param subscribe 구독한 블로그
     */
    private void processPostCollectionAsync(SubscribeEntity subscribe) {
        taskExecutor.execute(() -> postParser.parseRssDocuments(subscribe.getUrl())
                .ifPresent(resource -> {
                    updateSubscribeTitle(resource, subscribe);
                    savePosts(resource, subscribe);
                }));
    }

고민

저희가 처음 의도 했던 특정 시간마다 POST를 갱신하는 서비스는 이로써 완성 했지만, 다음과 같은 고민과 문제가 생겼습니다.

  • 파싱 에러 시 일단 log만 찍어서 파싱되지 않더라도 애플리케이션이 멈추지 않도록 했습니다.
    • 만약 파싱이 안되면 어드민에게 알림이 가게 하는 방법 고민.
    • 파싱이 안 되는 경우는 만약 블로그 사이트가 점검 등으로 서버가 잠시 다운되어 있을 때 파싱하려고 하면 에러 날 수도있습니다.
  • Post 하나 저장할 때마다 기존에 존재하는 Post인지 DB에 질의해 확인하고 있는데 비효율적이라는 생각이듭니다.
    • DB에서 Post 리스트를 먼저 뽑아와서 리스트에서 비교하는 게 좋을지? 고민입니다.
  • 게시글 본문을 모두 저장하니까 게시글 20개 MySQL workbench에서 불러오는데 느립니다. (이 부분은 본문 자체를 DB컬럼에 그대로 저장하다 보니 일정 수가 넘어가면 DB에 장애가 발생하였습니다. 개선의 여지가 필요해 보입니다.)

일단 해결해야 할 과제

차차 과제를 해결해 보기로 하며 일단 이 두 가지를 중점으로 해결하기로 했습니다.

  • Post 하나 저장할 때 마다 기존에 존재하는 Post인지 DB에 질의해 확인하고 있는데 비 효율적인가?
  • 글이 갱신 됐을 때 사용자는 알림을 받아야 한다.

이벤트 큐 도입, Slack 알람

일단 먼저 사용자 알림을 해결해 보고자 합니다.

새로운 글이 갱신됐을 시 구독을 한 사용자에게 슬렉, 디스코드와 같은 플랫폼으로 알림을 보내는 것이 좋겠다고 생각해 기능을 추가하기로 했습니다. Evnet Queue, SpringEvent, @Scheduled, Slcak 알림을 통해 이를 구현해 보기로 했습니다.

Evnet Queue, SpringEvent, @Scheduled, Slcak 키워드를 중점으로 나아가 보겠습니다.

이벤트 큐 도입, Slack 알람 PR :https://github.com/FlytrapHub/RSS-Reader/pull/99

직접 만든 Custom Evnet Queue

외부에서 큐를 사용하는 방법도 있겠지만 초기 단계이기 때문에 직접 이벤트 큐를 구현하기로 했습니다. Evnet queue와 Spring Evnet 관련된 클래스들을 만들어 줍니다.

Github 링크입니다.

이벤트큐 구현, 슬렉 알람 구현 by leegyeongwhan · Pull Request #99 · FlytrapHub/RSS-Reader

위의 @Schedule collectPosts() 에서 이어서 진행됩니다.

신경 써서 볼 부분은 기존 processPostCollectionAsync 메서드를 진행하면서 savePosts 메서드를 진행 할 때 publisher.publish(subscribe.toDomain()); 이 부분을 추가해 줍니다.

직접 만든 커스텀 SubscribeEventPublisher 메서드를 통해 publish 하며 envet를 동작시킵니다.

잠깐 왜 이렇게 했나요?

Post를 수집하는 로직과 거기서 나오는 사용자에게 알림을 보낼 구독 정보는 Evnet를 통해 최대한 분리하는 게 좋다고 생각했고
Post를 비동기로 수집하는 Schedule 과 사용자에게 알림을 보낼 Schedule 은 일단 별개의 Schedule 로 관리하고자 했습니다.
Event Queue를 싱글톤으로 관리해 주며 Alert 알림 보내기를 따로 Event 처리해 주면 괜찮다고 생각했습니다.

동작 방법을 순서대로 보겠습니다.

동작 방식

  1. 폴더를 구독한 기준으로 post가 갱신됐을 때 사용자가 슬렉, 디스코드 등으로 알림을 받아야 하므로 이벤트 큐에 subscribe(구독) 정보를 넣어줍니다.
    private final RssPostParser postParser;
    private final SubscribeEntityJpaRepository subscribeEntityJpaRepository;
    private final PostEntityJpaRepository postEntityJpaRepository;
    private final SubscribeEventPublisher publisher;
    private final AlertFacadeService alertFacadeService;
    private final TaskExecutor taskExecutor;

private void savePosts(RssSubscribeResource subscribeResource, SubscribeEntity subscribe) {
        List<PostEntity> posts = postEntityJpaRepository.findAllBySubscribeOrderByPubDateDesc(
                subscribe);

        Map<String, PostEntity> postMap = convertListToHashSet(posts);

        for (RssItemResource itemResource : subscribeResource.itemResources()) {
            PostEntity post;

            if (postMap.containsKey(itemResource.guid())) {
                post = postMap.get(itemResource.guid());
                post.updateBy(itemResource);
            } else {
                post = PostEntity.from(itemResource, subscribe);
            }

            postEntityJpaRepository.save(post);
        }
        //TODO 새로운 POST가 수집 되면  큐에 구독을 담는다 이것을 이벤트로 발생
        publisher.publish(subscribe.toDomain());
    }
  1. Event가 pulish 되면 eventQueue에 offer를 해줍니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class SubscribeEventListener {

    private final SubscribeEventQueue eventQueue;
    private final AlertEntityJpaRepository repository;

    @EventListener
    public void onEvent(Subscribe subscribe) {
        if (eventQueue.isFull()) {
            log.info("eventQueue full ");
        }
        eventQueue.offer(subscribe);
        log.info("eventQueue peek 의 상태 = {}  ", eventQueue.peek());
    }
}
  1. eventQueue 동작이 끝나면 이제 collectPosts()의 alertFacadeService.alert();를 실행합니다.

    AlertService와의 의존성을 낮추기 위해 AlertFacadeService를 따로 구현해 줬습니다.

@Scheduled(fixedDelay = TEN_MINUTE)
    public void collectPosts() {
        List<SubscribeEntity> subscribes = subscribeEntityJpaRepository.findAll();

        for (SubscribeEntity subscribe : subscribes) {
            processPostCollectionAsync(subscribe);
        }
        //TODO 스케줄 작업이 끝나면 알림 envet작업이 발생한다. 큐에 담긴 이벤트를 발행한다.
        log.info("alert 실행");
        alertFacadeService.alert();
    }
  1. 이제 AlertFacadeService에서 큐가 살아있는지 확인하며 Scheduled을 걸어줘 큐에서 poll 해온 정보로 alertService.notifyAlert(alertEntity.getServiceId()); 메서드를 통해 슬렉에 알림을 보내게 됩니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class AlertFacadeService {

    private final SubscribeEventQueue queue;
    private final AlertService alertService;

    @Async("taskScheduler")
    @Scheduled(fixedRate = 1000)
    public void alert() {
        //TODO: 먼저 큐에있는 Susbscirbe를 확인한다.
        //TODO:큐를 확인해 큐의 Subscribe로 구독된 Folder를 찾는다. -> Alert를 찾는다 -> Member에게 알림을 고한다.
        //TODO: ex) 만약 0번 Member가 6번 Folder를 Subscribe 중이라면 이때 Slack(1)으로 알람 받기를 원하는 경우
        if (queue.isRemaining()) {
            Subscribe subscribe = queue.poll();
            log.info("eventQueue poll 의 상태 = {}  ", subscribe.toString());
            List<AlertEntity> alertList = alertService.getAlertList(subscribe.getId());

            for (AlertEntity alertEntity : alertList) {
                log.info("alert 정보 = {} ", alertEntity);
                alertService.notifyAlert(alertEntity.getServiceId());
            }
        }
    }
}

4-1. AlertService입니다. 커스텀 Aspect 를 통해 저희 서비스에 있는 모든 Event들의 publish를 공통으로 처리하고 있습니다.

notifyAlert 메서드가 실행되며 alert evnet가 실행됩니다.

@Slf4j
@Service
@RequiredArgsConstructor
public class AlertService {

    private final AlertEntityJpaRepository alertRepository;
    private final SlackAlarmService slackAlarmService;
---- 생략 -----

    private AlertEntity findByAlert(Long folderId, Long memberId) {
        return alertRepository.findByFolderIdAndMemberId(folderId, memberId).orElseThrow();
    }

    public void notifyPlatform(AlertParam value) {
        //TODO 플랫폼 별 알람
        slackAlarmService.notifyReturn();
        log.info("플랫폼 별 알람 value = {}", value);
    }

    //TODO: 굳이 사실 지금은 필요없어 보이기는합니다.
    @PublishEvent(eventType = AlertEvent.class,
            params = "#{T(com.flytrap.rssreader.service.dto.AlertParam).create(#serviceId)}")
    public void notifyAlert(Integer serviceId) {
        log.info("notifyAlert evnet publish");
    }

---- 생략 -----
}

4-2 Alert evnet가 실행되며 slackAlarmService의 슬렉알림이 실행되며 최종적으로 결과 값을 받습니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class AlertEventHandler {

    private final AlertService alertService;

    @Async
    @EventListener(AlertEvent.class)
    public void onEvent(AlertEvent event) {
        alertService.notifyPlatform(event.getValue());
    }
}

결론

알림은 해결되었지만 추가적인 문제가 생겼습니다.

일단 처음 계획했던 Post schedule과 Alsert schedule을 분리하고 싶었지만 같은 메서드에서 진행되며 정말 비동기적으로 잘 동작하는게 맞는지 확인하기 어려웠고 추가적으로 insert를 한 번에 하나를 Insert 하는 부분도 비효율적이라는 지적이 나왔습니다.

그래서 리스트를 다시 갱신합니다.

  • Post 하나 저장할 때마다 기존에 존재하는 Post인지 DB에 질의해 확인하고 있는데 비 효율적인가?
  • 텍스트글이 갱신 됐을 때 사용자는 알림을 받아야 한다.
  • Post Insert를 Bulk Insert로 바꾸기
  • Schedule 효율적으로 바꾸기
  • taskExecutor를 사용한 코드에서 때에 따라 값이 누락되는 경우

개선

테스트를 진행해 본 결과 taskExecutor를 사용해 Event를 publish 하는 부분에서 테스트를 통과하지 못하였는데요. 비동기로 작업이 진행되다 보니 값이 누락되는 경우가 발생한 것 같습니다.

private void processPostCollectionAsync(SubscribeEntity subscribe) {
        taskExecutor.execute(() -> postParser.parseRssDocuments(subscribe.getUrl())
                .ifPresent(resource -> {
                    updateSubscribeTitle(resource, subscribe);
                    savePosts(resource, subscribe);
                }));
    }

실행될 때마다 값이 달라졌습니다. 이때 가장 먼저 든 생각은 비동기로 진행되고 있는 Task의 결과 값을 보장할 필요가 있겠다는 생각이 들었습니다.

  • TaskExecutor 하는 부분의 결과를 보장한다. (확실치 않음)
  • Java의 비동기를 지원해 주는 CompletableFuture를 사용한다.

TaskExecutor, CompletableFuture

여기서 잠깐 의문이 생겼는데요 TaskExecutor는 비동기 작업을 만들고, 스레드 풀을 통해 작업을 실행합니다. 그럼 같은 비동기를 다루는 CompletableFuture와 ****다른 점이 있는가 하는 의문이 들었습니다.

  • TaskExecutor는 스레드 실행 및 스레드 풀 관리를 위한 더 높은 수준의 추상화를 제공합니다. 주로 작업을 비동기적으로 실행하고 스레드 동시성을 관리하는 데 사용됩니다.
  • CompletableFuture 비동기 계산을 처리하고 결과에 대한 작업을 구성하도록 설계되었습니다. 비동기 프로그래밍에 대한 보다 기능적이고 구성 가능한 접근 방식을 제공합니다. 계산을 구성하고 연결하는 데 더 중점을 둡니다.

최종적으로 둘은 이러한 차이점을 가진다고 결론 내렸고 지금 작업에서는 비동기로 작업을 결과에 대한 작업을 Event로 처리해야 한다고 판단해 CompletableFuture를 사용해 보장하기로 했습니다.

참고

ExecutorService vs CompletableFuture

Executor framework vs Completable future

  • 개선 후 savePosts 메서드에서 publish하는 부분을 → processPostCollectionAsync로 옮겨 줍니다. 이로서 테스트도 통과하고 비동기 결과 값을 통한 Event 처리도 잘됩니다.
private void processPostCollectionAsync(SubscribeEntity subscribe) {
        CompletableFuture<Map<String, String>> futurePosts = CompletableFuture.supplyAsync(() ->
                postParser.parseRssDocuments(subscribe.getUrl())
                        .map(resource -> {
                            updateSubscribeTitle(resource, subscribe);
                            return savePosts(resource, subscribe);
                        })
                        .orElse(new HashMap<>()));

        if (!futurePosts.join().isEmpty()) {
            SubscribeEvent event = new SubscribeEvent(subscribe.getId(),
                    Collections.unmodifiableMap(futurePosts.join()));
            publisher.publish(event);
        }
    }

마치면서

두 가지 문제는 해결되었지만 아직 다른 문제가 남았습니다. 이번 시간은 알림을 중점으로 다뤘고 다음 시간에는 Post 크롤링 개선을 중심으로 다뤄보겠습니다.

  • Post 하나 저장할 때마다 기존에 존재하는 Post인지 DB에 질의해 확인하고 있는데 비 효율적인가?
  • 글이 갱신 됐을 때 사용자는 알림을 받아야 한다.
  • Post Insert를 Bulk Insert로 바꾸기
  • Schedule 효율적으로 바꾸기
  • taskExecutor를 사용한 코드에서 때에 따라 값이 누락되는 경우
profile
개선하는 개발자, 이경환입니다

0개의 댓글