채팅 아키텍처 설계하기 v3과 동시성 문제

hyng·2023년 2월 17일
1

smilegate-winter-dev-camp

목록 보기
15/15

이전 버전에서 변경된 부분을 기록한다.

신고 횟수와 신고 이력 저장 하나의 DB 테이블로 통일

기존 방식

      @Override
      public void report(ReportRequest request) {
        long count = reportCountRepository.increaseReportCount(request.getTopicId(),
          request.getReportedUser());
        sendMessage(count, request.getTopicId(), request.getReportedUser());
        Report report = Report.of(request.getReportedUser(), request.getReporter(), request.getMessage(), request.getReason());
        reportRepository.save(report);
      }
  • long count = reportCountRepository.increaseReportCount(request.getTopicId(), request.getReportedUser());
    • redis를 사용하여 사용자의 신고 횟수를 관리
  • sendMessage(count, request.getTopicId(), request.getReportedUser());
    • 10번 이상 신고된 사용자의 경우 kafka로 메시지 발행, 이후 채팅 서버가 메시지를 가져가 처리
  • Report report = Report.of(request.getReportedUser(), request.getReporter(), request.getMessage(), request.getReason());
    reportRepository.save(report);
    • report를 생성해 mysql에 저장

변경 이유

  • 기존에는 count와 report를 별도의 DB에서 관리하였던 것을 report에 count컬럼을 추가하여 하나의 DB만 사용하는 것으로 변경하였다.

    1. 차이가 크지 않지만 관리해야 할 컴포넌트는 증가함
      처음엔 ‘신고하기’ 기능에서 kafka에 메시지 발행까지 하는게 단일 책임 원칙에서 벗어난다는 생각이 들었고 응답 시간에 영향을 줄 것이라 생각하여 따로 redis에 신고 횟수를 저장하고 스프링 스케줄러나 배치에서 레디스에서 모든 사용자 정보를 불러와 신고횟수가 10번 이상일 경우 메시지를 발행하는 로직을 하도록 하려고 했었다. 그런데 redis에서 불필요하게 모든 사용자 정보를 불러온다는 게 리소스 낭비가 심할 것이라고 생각이 되어서 신고하기 기능에서 kafka 메시지 발행까지 처리하도록 변경했다. 메시지를 발행하는 것이 성능에 영향을 미친다고 판단되면 비동기로 메시지를 보내는 것을 고려해 볼 수 있다.

      → inmemory로 속도가 빠른 레디스를 사용한 주된 이유가 조회 성능을 높이기 위해서였기 때문에 신고하기 메서드에서 메시지 발행까지 처리한다면 굳이 레디스를 사용할 필요가 없음.
      -> redis를 사용함으로써 관리해야 할 컴포넌트가 증가하고 성능 측면에서도 테이블에서 컬럼 하나를 줄이는 것은 의미가 없다. 그리고 count를 레디스에서 별도로 관리할 때와 mysql의 report 테이블에서 하나로 관리할 때 조회 쿼리 1번 정도의 차이만 있다

    • count를 레디스로 관리
     1. count update
      2. insert
    • report 테이블 하나로 관리
    1. topicId ,reportedUser로 select
      1-1. 있다면, count update
      1-2. 없다면, insert
    1. 트랜잭션
      하나의 report 테이블만 있는 것이기 때문에 count update 가 성공하면 report insert도 저장되도록 하고 count update 가 실패하면 report insert 실패하도록 하기 위한 분산 트랜잭션이 필요 없음

동시성 문제

report 테이블에 count 컬럼을 추가하여 mysql만 사용하는 것으로 변경했다.

@Transactional
public void report(ReportRequest request) {
    reportRepository.findByTopicIdAndReportedUser(request.getTopicId(), request.getReportedUser())
                    .ifPresentOrElse(
                      report -> {
                        report.increaseCount();
                        sendMessage(report.getCount(), request.getTopicId(),
                          request.getReportedUser());
                      },
                      () -> reportRepository.save(
                        Report.of(request.getTopicId(), request.getReportedUser(),
                          request.getReporter(), request.getMessage(), request.getReason())));
  }

레코드가 존재하지 않으면 insert, 존재하면 count update를 수행하도록 했는데 jmeter로 테스트해 보니 여러 스레드에서 접근할 때 문제가 발생했다.
스레드 a가 findByTopicIdAndReportedUser()을 수행하고 존재하지 않으니 insert를 하려고 할때 스레드 b가 끼어들어 스레드 a가 insert commit을 하기 전에 findByTopicIdAndReportedUser()을 수행하게 되면 insert가 두 번 발생하게 되고 이후 findByTopicIdAndReportedUser()에서 2개 이상의 레코드가 존재해 exception이 발생하는 것이다.

하나의 스레드에서 1. 조회 -> 2. 생성 / 수정 을 수행할 때 다른 스레드가 끼어들지 못하도록 하기 위해 synchronized 키워드를 메서드에 붙여줬다.

@Transactional
public synchronized void report(ReportRequest request) {
    reportRepository.findByTopicIdAndReportedUser(request.getTopicId(), request.getReportedUser())
                    .ifPresentOrElse(
                      report -> {
                        report.increaseCount();
                        sendMessage(report.getCount(), request.getTopicId(),
                          request.getReportedUser());
                      },
                      () -> reportRepository.save(
                        Report.of(request.getTopicId(), request.getReportedUser(),
                          request.getReporter(), request.getMessage(), request.getReason())));
  }

그런데 jmeter로 테스트할 때 여전히 동일한 문제가 발생했다.
찾아보니 @transactional과 synchronized를 함께 사용할 때 문제가 발생할 수 있다는 것을 보게 되었고 아래처럼 수정해서 문제를 해결했다.

@Override
  public synchronized void report(ReportRequest request) {
    transactionalReport(request);
  }

  // @Transactional과 synchrozied를 함께 적용하기 위해 사용하는 메서드입니다.
  @Transactional   // dirty checking
  protected void transactionalReport(ReportRequest request) { // proxy로 감싸서 실행되기 때문에 private 사용 불가
    reportRepository.findByTopicIdAndReportedUser(request.getTopicId(), request.getReportedUser())
                    .ifPresentOrElse(
                      report -> {
                        report.increaseCount();
                        sendMessage(report.getCount(), request.getTopicId(),
                          request.getReportedUser());
                      },
                      () -> reportRepository.save(
                        Report.of(request.getTopicId(), request.getReportedUser(),
                          request.getReporter(), request.getMessage(), request.getReason())));
  }
profile
공부하고 알게 된 내용을 기록하는 블로그

0개의 댓글