매칭 알고리즘 개선-트랜잭션 분리, 낙관적락, Bulk Update, Kafka Event 기반으로 변경

taehee kim·2023년 3월 28일
0

1.리팩토링 주요 내용

1) 매칭을 맺어주는 로직을 하나의 트랜잭션에서 모두 포함시킨 방식에서 트랜잭션 구간을 분리 하고 최소화하는 방식으로 변경.

2) 매칭 취소등의 이유로 데이터 일관성 문제가 발생하지 않도록 Write Lock을 활용하던 방식에서 Version을 활용하는 낙관적 락 방식으로 변경

3) 여러 Entity의 상태를 동시 변경해야하는 경우 JPA의 변경 감지가 아닌 Bulk Update로 개선

4) Polling 방식으로 매칭 알고리즘을 동작시키던 방식에서 매칭 신청 시 매칭 알고리즘을 동작시키는 Kafka Event를 produce하는 방식으로 변경

2. 매칭 알고리즘

  • 매칭 알고리즘은 다음과 같이 동작한다. 여기서 1, 2, 3, 4는 같은 트랜잭션에서 동작한다.
  1. 전체 매칭 신청 내역 조회
  2. 매칭 조건으로 정렬(인덱스를 활용하기 부적합하기 때문에 Application에서 정렬해야함.
  3. 매칭 최소인원을 만족하면 매칭 그룹으로 묶어줌.
  4. 3에서 맺어준 매칭 그룹을 db에 반영 하고 매칭 신청 기록 무효화.
  5. 매칭된 유저에게 알림 보냄.(SSE, Kafka 활용)

3.리팩토링 주요 내용 상세

3-1. 매칭을 맺어주는 로직을 하나의 트랜잭션에서 모두 포함시킨 방식에서 트랜잭션 구간을 분리 하고 최소화하는 방식으로 변경

1, 2, 3, 4를 같은 하나의 트랜잭션 내에 묶어서 처리한다면 다음과 같은 문제가 생길 수 있다.

  1. 2, 3은 메모리 내에서 정렬과 반복문등의 cpu 작업으로 DB와 상관없이 시간복잡도가 높은 작업이다. 또한 DB와 상관없는 작업이기 때문에 트랜잭션 내에서 이루어질 필요는 없다. 2, 3과정을 트랜잭션에 포함시키면 DB Connection과 Lock을 의미없이 오랜시간 유지해야한다.
  2. 기존 코드의 경우 1에서 조회 시 비관적 락을 획득하고 1, 2, 3, 4의 긴 하나의 트랜잭션에서 write lock을 가지고 있게 된다. 이는 성능적으로 매우 큰 문제가 될 수 있다. 이를 낙관적락으로 변경하더라도 어느 한 매칭 신청내역만 변경되더라도 매칭이 맺어진 모든 트랜잭션이 commit하려는 시점에 롤백되어야한다.

어떻게 변경해야할까

  • 1에서는 조회만 하기 때문에 @Transactional(readOnly=true)로 하나의 트랜잭션을 구성하여 조회하면된다.
  • 2, 3은 1에서 트랜잭션을 걸지않고 조회한 데이터를 기반으로 실행한다.
  • 4에서는 여러 매칭 그룹을 결정했을 때 DB로 이를 반영하게 된다. 중요한 점은 각 매칭 그룹마다 트랜잭션을 모두 분리하여 진행해야한다.
  • 만약 매칭된 그룹이 5그룹이라면 총 트랜잭션은 6번 걸리게된다.

기존 코드

  • 1, 2, 3, 4가 하나의 트랜잭션으로 처리된다.
  • 매칭이 가능한 모든 목록을 만들고 한번에 반영하기 때문에 트랜잭션이 길고 트랜잭션 진행중에 다른 트랜잭션에 의해서 DB상태가 변경되어 데이터 일관성 문제가 생길 수 있다.
  • 이를 방지하기 위해 Lock을 활용하면 성능문제가 발생한다.
public class MatchMakingTasklet implements Tasklet {

    private final SlackBotService slackBotService;

    private final MatchMakingTaskletService matchMakingTaskletService;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
        throws Exception {

        List<List<String>> matchedMembersEmailList = matchMakingTaskletService.matchMaking();

        /**
         *  slack 알림 보내기. 비동기.
         *  트랜잭션안에 포함시키면 예외 발생 가능하기때문에 분리.
         */
        for (List<String> emails : matchedMembersEmailList) {
            slackBotService.createSlackMIIM(emails);
        }
        return RepeatStatus.FINISHED;
    }
}
@Slf4j
@RequiredArgsConstructor
@Service
public class MatchMakingTaskletService {


    private final RandomMatchRepository randomMatchRepository;
    private final MatchMemberRepository matchMemberRepository;
    private final MemberRepository memberRepository;
    private final MatchRepository matchRepository;

    private final MatchConditionRepository matchConditionRepository;
    private final MatchConditionMatchRepository matchConditionMatchRepository;


    /**
     * 테스트 케이스
     * 1. 여러 조건이 들어오는 경우 매칭이 하나의 조건으로 채결되면 다른 신청 무효화 - 안됨
     * 2. Meal 만 매칭이 가능.
     * 3. 만료시간 제대로 되는지 확인.
     * 4. 매칭 조건 다른 것들 섞여서 생성해도 잘 되는지 체크
     * 5. 같은 조건 먼저 신청한사람이 먼저 매칭되도록
     * @return
     */
    @Transactional
    public List<List<String>> matchMaking() {
        //배치 시작 시간 기준
        LocalDateTime now = CustomTimeUtils.nowWithoutNano();

        //1. 랜덤 매칭 테이블에서 매칭 대기중인 데이터를 가져온다.
        List<MealRandomMatch> mealRandomMatches = randomMatchRepository.findMealPessimisticWriteByCreatedAtBeforeAndIsExpired(
            now.minusMinutes(
                RandomMatch.MAX_WAITING_TIME), false);

        List<StudyRandomMatch> studyRandomMatches = randomMatchRepository.findStudyPessimisticWriteByCreatedAtBeforeAndIsExpired(
            now.minusMinutes(
                RandomMatch.MAX_WAITING_TIME), false);
        //2. 랜덤 매칭 테이블에서 매칭을 조건에 따라 분류한다.
        //2-1. 매칭 조건에 따라 정렬한다.
        //매칭 조건 그 다음 생성 순서를 기준으로
        mealRandomMatches.sort(new MatchConditionComparator());
        studyRandomMatches.sort(new StudyRandomMatch.MatchConditionComparator());
        log.info("mealRandomMatches : {}", mealRandomMatches.toString());
        List<List<String>> matchedMembersEmailList = new ArrayList<>();
        //2-2. 매칭 조건에 따라 매칭을 진행한다.
        List<RandomMatch> matchedRandomMatches = new ArrayList<>();
        for (int i = 0; i < mealRandomMatches.size(); i++) {
            MealRandomMatch mealRandomMatch = mealRandomMatches.get(i);
            //만료된 매치은 제외
            if (mealRandomMatch.getIsExpired()) {
                continue;
            }
            if (matchedRandomMatches.isEmpty()) {
                matchedRandomMatches.add(mealRandomMatch);
            } else if ( matchedRandomMatches.size() > 0) {
                RandomMatch randomMatchForCompare = matchedRandomMatches.get(0);
                if (!mealRandomMatch.isMatchConditionEquals(randomMatchForCompare)) {
                    matchedRandomMatches.clear();
                    matchedRandomMatches.add(mealRandomMatch);
                } else {
                    matchedRandomMatches.add(mealRandomMatch);
                    //매칭 조건 맞춰지면 매칭 진행
                    if (matchedRandomMatches.size() == RandomMatch.MATCH_COUNT) {
                        Match match = makeMatchInRDB(matchedRandomMatches, now);

                        //RandomMatch 테이블 isExpired = true로 변경
                        //매칭 된 유저의 모든 신청 조건 무효화
                        matchedRandomMatches.stream()
                            .map(RandomMatch::getMember)
                            .forEach(member -> {
                                mealRandomMatches.stream()
                                    .filter(mrm -> mrm.getMember().equals(member))
                                    .forEach(RandomMatch::expire);
                            });
                        //
                        matchedRandomMatches.clear();
                        //매칭 맺어진 멤버의 email추가.
                        matchedMembersEmailList.add(getMatchedParticipantsEmails(
                            match));
                    }
                }
            }
        }

       
        return matchedMembersEmailList;
    }

    private void expireMatched(List<MealRandomMatch> mealRandomMatches,
        List<RandomMatch> matchedRandomMatches) {

    }

    private Match makeMatchInRDB(List<RandomMatch> matchedRandomMatches, LocalDateTime now) {
        //RDB에 Match에 저장, MatchMember저장
        RandomMatch randomMatch = matchedRandomMatches.get(0);
        Match match = Match.of(MatchStatus.MATCHED, randomMatch.getContentCategory(),
            MethodCategory.RANDOM, null, RandomMatch.MATCH_COUNT);
        matchRepository.save(match);
        for (RandomMatch matchedRandomMatch : matchedRandomMatches) {
            matchedRandomMatch.updateMatch(match);
        }
        //member
        List<Member> members = matchedRandomMatches.stream()
            .map(RandomMatch::getMember)
            .collect(Collectors.toList());
        createAndSaveMatchMembers(match, members);
        //matchCondition
        List<MatchCondition> matchConditions = createAndSaveMatchCondition(match);



        return match;
    }

    private void createAndSaveMatchMembers(Match match, List<Member> members) {
        members
            .forEach(member -> {
                matchMemberRepository.save(MatchMember.of(match, member, false));
            });
    }

    private List<String> getMatchedParticipantsEmails(Match match) {
        return match.getMatchMembers().stream()
            .map(matchMember -> matchMember.getMember().getUser().getEmail())
            .collect(Collectors.toList());
    }

    private List<MatchCondition> createAndSaveMatchCondition(Match match) {
        RandomMatch randomMatch = match.getRandomMatches().get(0);
        List<MatchCondition> matchConditions = new ArrayList<>();
        matchConditions.add(matchConditionRepository.findByValue(randomMatch.getPlace().toString())
            .orElseThrow(() ->
                new IllegalStateException(
                    "MatchCondition이 존재하지 않습니다. value : " + randomMatch.getPlace().toString())));

        if (randomMatch instanceof MealRandomMatch) {
            matchConditions.add(matchConditionRepository.findByValue(
                    ((MealRandomMatch) randomMatch).getWayOfEating().toString())
                .orElseThrow(() ->
                    new IllegalStateException("MatchCondition이 존재하지 않습니다. value : "
                        + ((MealRandomMatch) randomMatch).getWayOfEating().toString())));
        } else if (randomMatch instanceof StudyRandomMatch) {
            matchConditions.add(matchConditionRepository.findByValue(
                    ((StudyRandomMatch) randomMatch).getTypeOfStudy().toString())
                .orElseThrow(() ->
                    new IllegalStateException("MatchCondition이 존재하지 않습니다. value : "
                        + ((StudyRandomMatch) randomMatch).getTypeOfStudy().toString())));
        }

        matchConditionMatchRepository.saveAll(matchConditions.stream()
            .map((matchCondition) ->
                MatchConditionMatch.of(match, matchCondition)
            )
            .collect(Collectors.toList()));
        return matchConditions;

    }

}

변경 후 코드

  • getValidRandomMatchesSortedByMatchCondition에서 1의 작업을 수행하고
  • getMatchedGroupList에서 트랜잭션 외부에서 2, 3작업을 수행하여 매칭될 것들을 묶어준다.
  • makeMatchInRDB에서 4의 작업을 수행하는데 각각의 매칭 그룹마다 별개의 트랜잭션을 활용한다. 누군가 배치 작업 도중 취소를 하더라도 그 매칭만 롤백되고 나머지 매칭은 그대로 진행된다.
@Slf4j
@RequiredArgsConstructor
@Component
//동시성 문제가 없고, jobParameter 사용이 불필요 하기 때문에 singletonScope가 더 적합하다고 생각.
//@StepScope
public class MatchMakingTasklet implements Tasklet {

    private final SlackBotService slackBotService;

    private final MatchMakingTaskletService matchMakingTaskletService;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
        throws Exception {
        //배치 시작 시간 기준
        LocalDateTime now = CustomTimeUtils.nowWithoutNano();

        getMatchedGroupList(now).forEach((matchedRandomMatches) -> {
            //3. 매칭이 완료된 데이터를 트랜잭션을 걸고 DB에 저장한다, 반영 중 취소등의 변경이 발생했을 경우 예외가 발생한다..
            try {
                Match match = matchMakingTaskletService.makeMatchInRDB(matchedRandomMatches, now);
                slackBotService.createSlackMIIM(match.getMatchMembers().stream()
                    .map(MatchMember::getMember)
                    .map(Member::getUser)
                    .map(User::getEmail)
                    .collect(Collectors.toList()));
            } catch (RuntimeException e) {
                log.error("해당 매칭이 실패했습니다.");
                throw e;
            }
            /**
             *  slack 알림 보내기. 비동기.
             *  트랜잭션안에 포함시키면 안됨.
             */
        });
        return RepeatStatus.FINISHED;
    }


    /**
     * 정렬된 매칭 리스트를 매칭 조건에 따라 그룹화한다.
     *
     * @param now
     * @return
     */
    private List<List<RandomMatch>> getMatchedGroupList(
        LocalDateTime now) {
        //1. 랜덤 매칭 테이블에서 매칭 대기중인 데이터를 가져온다.

        List<RandomMatch> validAndSortedByRandomMatchConditionRandomMatches = matchMakingTaskletService.getValidRandomMatchesSortedByMatchCondition(
            now);

        List<List<RandomMatch>> matchedRandomMatchesList = new ArrayList<>();
        List<RandomMatch> matchedRandomMatches = new ArrayList<>();
        //2-2. 매칭 조건에 따라 매칭을 진행한다.

        validAndSortedByRandomMatchConditionRandomMatches.forEach((randomMatch) -> {
            if (isMatchNotExpected(matchedRandomMatches, randomMatch)) {
                return;
            }
            //매칭 조건이 같은 요소가 이미 있는 경우
            matchedRandomMatches.add(randomMatch);
            //매칭인원이 모두 모인 경우
            if (matchedRandomMatches.size() == RandomMatch.MATCH_COUNT) {
                matchedRandomMatchesList.add(new ArrayList<>(matchedRandomMatches));

                Set<Long> matchedMemberIdSet = matchedRandomMatches.stream()
                    .map(RandomMatch::getMember)
                    .map(Member::getId)
                    .collect(Collectors.toSet());
                expireMatchedOnlyInMemory(validAndSortedByRandomMatchConditionRandomMatches,
                    matchedRandomMatches,
                    matchedMemberIdSet);

                matchedRandomMatches.clear();
            }
        });
        return matchedRandomMatchesList;
    }

    private boolean isMatchNotExpected(List<RandomMatch> matchedRandomMatches,
        RandomMatch randomMatch) {
        //만료된 매칭은 제외
        if (randomMatch.getIsExpired()) {
            return true;
        }
        //아직 같은 조건의 매칭이 하나도 없는 경우 하나 넣고 다음 반복으로넘어감
        if (matchedRandomMatches.isEmpty()) {
            matchedRandomMatches.add(randomMatch);
            return true;
        }
        if (!randomMatch.isMatchConditionEquals(matchedRandomMatches.get(0))) {
            matchedRandomMatches.clear();
            matchedRandomMatches.add(randomMatch);
            return true;
        }
        return false;
    }

    /**
     * 매칭 된 유저의 다른 모든 신청 조건 무효화 memberIdSet에 포함되고, 같은 contentCategory 트랜잭션이 걸려있지 않아서 DB에서는 변경 없음
     * Collection에서만 무효화
     *
     * @param validAndSortedByRandomMatchConditionRandomMatches
     * @param matchedRandomMatches
     * @param matchedMemberIdSet
     */
    private void expireMatchedOnlyInMemory(
        List<RandomMatch> validAndSortedByRandomMatchConditionRandomMatches,
        List<RandomMatch> matchedRandomMatches, Set<Long> matchedMemberIdSet) {
        validAndSortedByRandomMatchConditionRandomMatches.stream()
            .filter(mrm -> matchedMemberIdSet.contains(mrm.getMember().getId())
                && mrm.getRandomMatchCondition().getContentCategory().equals(
                matchedRandomMatches.get(0).getRandomMatchCondition()
                    .getContentCategory()))
            .forEach(RandomMatch::expire);
    }
@Slf4j
@RequiredArgsConstructor
@Service
public class MatchMakingTaskletService {


    private final RandomMatchRepository randomMatchRepository;
    private final MatchMemberRepository matchMemberRepository;
    private final MemberRepository memberRepository;
    private final MatchRepository matchRepository;

    private final MatchConditionRepository matchConditionRepository;
    private final MatchConditionMatchRepository matchConditionMatchRepository;


    /**
     * 테스트 케이스 1. 여러 조건이 들어오는 경우 매칭이 하나의 조건으로 채결되면 다른 신청 무효화 - 안됨 2. Meal 만 매칭이 가능. 3. 만료시간 제대로 되는지
     * 확인. 4. 매칭 조건 다른 것들 섞여서 생성해도 잘 되는지 체크 5. 같은 조건 먼저 신청한사람이 먼저 매칭되도록
     *
     * @return
     */


    @Transactional(readOnly = true)
    public List<RandomMatch> getValidRandomMatchesSortedByMatchCondition(LocalDateTime now) {
        List<RandomMatch> randomMatches = randomMatchRepository.findByCreatedAtAfterAndIsExpiredAndMemberIdAndContentCategory(
            RandomMatchSearch.builder()
                .createdAt(now.minusMinutes(RandomMatch.MAX_WAITING_TIME))
                .isExpired(false)
                .build());
        randomMatches.sort(new RandomMatch.MatchConditionComparator());
        return randomMatches;
    }

    @Transactional
    public Match makeMatchInRDB(List<RandomMatch> matchedRandomMatches, LocalDateTime now) {
        //RDB에 Match에 저장, MatchMember저장
        Match match = createAndSaveMatch(matchedRandomMatches);
        //RandomMatch에 isExpired = true로 업데이트
        randomMatchRepository.bulkUpdateOptimisticLockIsExpiredToTrueByIds(
            matchedRandomMatches.stream()
                .map(randomMatch -> RandomMatchBulkUpdateDto.builder()
                    .id(randomMatch.getId())
                    .version(randomMatch.getVersion())
                    .build())
                .collect(Collectors.toSet()));
        //matchMember
        createAndSaveMatchMembers(match, matchedRandomMatches);
        //matchCondition
        createAndSaveMatchCondition(match, matchedRandomMatches);
        return match;
    }

    private Match createAndSaveMatch(List<RandomMatch> matchedRandomMatches) {
        Match match = Match.of(MatchStatus.MATCHED,
            matchedRandomMatches.get(0).getRandomMatchCondition().getContentCategory(),
            MethodCategory.RANDOM, null, RandomMatch.MATCH_COUNT);
        matchRepository.save(match);
        return match;
    }

    private void createAndSaveMatchMembers(Match match, List<RandomMatch> matchedRandomMatches) {
        matchedRandomMatches.stream()
            .map(RandomMatch::getMember)
            .forEach(member -> {
                matchMemberRepository.save(MatchMember.of(match, member, false));
            });
    }

    private List<MatchCondition> createAndSaveMatchCondition(Match match, List<RandomMatch> matchedRandomMatches) {
        RandomMatch randomMatch = matchedRandomMatches.get(0);
        List<MatchCondition> matchConditions = new ArrayList<>();
        String errorMessage = "MatchCondition이 존재하지 않습니다. value : ";
        matchConditions.add(matchConditionRepository.findByValue(randomMatch.getRandomMatchCondition().getPlace().toString())
            .orElseThrow(() ->
                new IllegalStateException(
                    errorMessage + randomMatch.getRandomMatchCondition().getPlace().toString())));
        matchConditions.add(matchConditionRepository.findByValue(
                (randomMatch).getRandomMatchCondition().getWayOfEating().toString())
            .orElseThrow(() ->
                new IllegalStateException(errorMessage
                    + (randomMatch).getRandomMatchCondition().getWayOfEating().toString())));
        matchConditions.add(matchConditionRepository.findByValue(
                (randomMatch).getRandomMatchCondition().getTypeOfStudy().toString())
            .orElseThrow(() ->
                new IllegalStateException(errorMessage
                    + (randomMatch).getRandomMatchCondition().getTypeOfStudy().toString())));
        matchConditionMatchRepository.saveAll(matchConditions.stream()
            .map((matchCondition) ->
                MatchConditionMatch.of(match, matchCondition)
            )
            .collect(Collectors.toList()));
        return matchConditions;

    }


3-2.매칭 취소등의 이유로 데이터 일관성 문제가 발생하지 않도록 Write Lock을 활용하던 방식에서 Version을 활용하는 낙관적 락 방식으로 변경

3-1에서 트랜잭션을 분리하면서 생긴 데이터 일관성 문제

  • 1에서 데이터를 조회하고 2, 3에서 이를 처리하게 되는 과정에서 매칭 신청자가 매칭을 취소하더라도 1에서 조회한 데이터에는 취소가 반영되어 있지 않기 때문에 4에서 매칭이 된것으로 DB에 반영된다.
  • 이를 막기 위해서는 1에서 조회한 데이터가 4에서 반영하는 시점까지 변경 된적이 없다는 것을 보장받아야할 것이다.
  • 만약 그렇지 않다면 4에서 해당 매칭을 맺지 않고 Rollback한다.(이 때문에 각각의 매칭 그룹마다 트랜잭션을 분리했다.) 이 로직을 낙관적 락이라고 한다.

낙관적락 구현

  • 원래 낙관적락은 @Version 어노테이션을 활용하면 Jpa에서 손쉽게 구현할 수 있지만 이는 변경 감지로 update될 때만 적용되기 때문에 update쿼리를 작성하는 경우에는 직접 구현해야한다.
  • bulk update쿼리에서 변경할 Entity의 id와 조회 시점의 version값을 인자로 받은 후 update쿼리 실행 이전에 해당 entity들을 먼저 조회하여 version값이 변하였는지 확인한다. 이때 마지막 조회 쿼리와 update쿼리 사이에서 데이터가 변경되지 않도록 짧은 구간의 비관적락을 적용한다.
  • 만약 version값이 변했다면 2, 3구간에서 entity가 변경된 것이기 때문에 이 경우 해당 매칭을 반영하지 않고 예외를 발생시킨다.
	public class RandomMatchBulkUpdateDto {

    private Long id;
    private Long version;
	}

/**
     * 1. OptimisticLock을 통해 Lost Update가 발생하지 않도록함.
     * 2. 영속성 컨텍스트를 비워 준 후(최신 DB상태를 조회 하기 위해) Write_Lock과 함께 해당 Id의  Update이전 마지막 조회 시점의 version을 가져와서 Write_Lock을 건 후 트랜잭션 내에서 조회 했을 때version이 바뀌었거나 엔티티가 삭제되었는지 확인하고
     *   문제가 있는 경우 OptimisticLockException발생.
     * 3. 정상 Update가 가능한 경우 version을 1 증가 시키고 isExpired를 true로 변경.
     * 4. 벌크성 수정 쿼리는 영속성 컨텍스트를 무시하고 실행되므로, 영속성 컨텍스트를 초기화함.
     * @param randomMatchBulkUpdateDtos
     */
    @Override
    public void bulkUpdateOptimisticLockIsExpiredToTrueByIds(
        Set<RandomMatchBulkUpdateDto> randomMatchBulkUpdateDtos){
        // 영속성 컨텍스트를 초기화 하여 새로 조회한다.
        em.flush();
        em.clear();
        List<Long> idList = randomMatchBulkUpdateDtos.stream().map(RandomMatchBulkUpdateDto::getId)
            .collect(
                Collectors.toList());
        //verifyVersion하기 이전에 randomMatches가 변경 될 경우를 막기 위해
        // findWithPessimisticLockByIds를 통해 Write_Lock을 건다.
        List<RandomMatch> randomMatches = findWithPessimisticLockByIds(
            idList);

        verifyVersion(randomMatches, randomMatchBulkUpdateDtos);

        queryFactory.update(randomMatch)
            .set(randomMatch.isExpired, true)
            .set(randomMatch.version, randomMatch.version.add(1))
            .where(isRandomMatchIdsIn(idList))
            .execute();

        em.flush();
        em.clear();
    }

    private void verifyVersion(List<RandomMatch> randomMatches,
        Set<RandomMatchBulkUpdateDto> randomMatchBulkUpdateDtos) {
        if (randomMatches.size() != randomMatchBulkUpdateDtos.size()) {
            throw new OptimisticLockException("Optimistic Lock Exception");
        }
        Map<Long, Long> idVersionMap = randomMatchBulkUpdateDtos.stream()
            .collect(Collectors.toMap(RandomMatchBulkUpdateDto::getId,
                RandomMatchBulkUpdateDto::getVersion));
        randomMatches.forEach(rm -> {
            if (!idVersionMap.get(rm.getId()).equals(rm.getVersion())) {
                throw new OptimisticLockException("Optimistic Lock Exception");
            }
        });

    }

위에서의 구현의 문제점

  • 만약 DB격리 수준이 Repetable read 로 되어있는 상태에서 다음과 같이 한 트랜잭션에 조회를 먼저하고 이를 바탕으로 update를 하는 경우 낙관적락이 전혀 적용되지 않는다.
@Transactional
public void method{
findBy...();
bulkUpdateOptimisticLockIsExpiredToTrueByIds()

}
  • 왜냐하면 Repetable read에서는 초기에 조회한 내용이 실제 DB에서 변경되더라도 이 내용이 반영되지 않고 초기 조회한 내용만 읽기 때문에 bulkupdate메서드 이전에 조회한 내용과 bulkupdate메서드 내에서 조회한 내용이 다를 수가 없기 때문이다.
  • 하지만 여기서는 조회와 수정쿼리가 하나의 트랜잭션에 있지 않다는 특수성과 단일 엔티티에 대한 update가 아닌 bulk update이기 때문에 이렇게 구현할 수 밖에 없었다.

3-3. 변경 감지로 인한 update를 bulkUpdate쿼리로 변경한다.

  • 변경 감지는 entity 개수만큼 update쿼리가 생성된다.
  • 이는 조회에서의 N+1문제와 비슷한 성능 문제를 일으키기 때문에 bulkUpdate쿼리를 따로 작성하여만 한다.

bulkUpdate쿼리의 문제점

  • bulkUpdate쿼리는 영속성 컨텍스트와 무관하게 동작한다. 이 때문에 영속성 컨텍스트를 clear해주는 방식으로 작성하였다.

리팩토링 이전

//RandomMatch 테이블 isExpired = true로 변경
                        //매칭 된 유저의 모든 신청 조건 무효화
                        matchedRandomMatches.stream()
                            .map(RandomMatch::getMember)
                            .forEach(member -> {
                                mealRandomMatches.stream()
                                    .filter(mrm -> mrm.getMember().equals(member))
                                    .forEach(RandomMatch::expire);
                            });

리팩토링 이후

@Override
    public void bulkUpdateOptimisticLockIsExpiredToTrueByIds(
        Set<RandomMatchBulkUpdateDto> randomMatchBulkUpdateDtos){
        // 영속성 컨텍스트를 초기화 하여 새로 조회한다.
        em.flush();
        em.clear();
        List<Long> idList = randomMatchBulkUpdateDtos.stream().map(RandomMatchBulkUpdateDto::getId)
            .collect(
                Collectors.toList());
        List<RandomMatch> randomMatches = findWithPessimisticLockByIds(
            idList);

        verifyVersion(randomMatches, randomMatchBulkUpdateDtos);

        queryFactory.update(randomMatch)
            .set(randomMatch.isExpired, true)
            .set(randomMatch.version, randomMatch.version.add(1))
            .where(isRandomMatchIdsIn(idList))
            .execute();

        em.flush();
        em.clear();
    }

    private void verifyVersion(List<RandomMatch> randomMatches,
        Set<RandomMatchBulkUpdateDto> randomMatchBulkUpdateDtos) {
        if (randomMatches.size() != randomMatchBulkUpdateDtos.size()) {
            throw new OptimisticLockException("Optimistic Lock Exception");
        }
        Map<Long, Long> idVersionMap = randomMatchBulkUpdateDtos.stream()
            .collect(Collectors.toMap(RandomMatchBulkUpdateDto::getId,
                RandomMatchBulkUpdateDto::getVersion));
        randomMatches.forEach(rm -> {
            if (!idVersionMap.get(rm.getId()).equals(rm.getVersion())) {
                throw new OptimisticLockException("Optimistic Lock Exception");
            }
        });

    }

    private List<RandomMatch> findWithPessimisticLockByIds(List<Long> ids){
        return queryFactory.select(randomMatch)
            .from(randomMatch)
            .join(randomMatch.member, member)
            .where(isRandomMatchIdsIn(ids))
            .setLockMode(LockModeType.PESSIMISTIC_WRITE)
            .fetch();
    }

    private BooleanExpression isRandomMatchIdsIn(Collection<Long> randomMatchIds) {
        return randomMatchIds == null ? null : randomMatch.id.in(randomMatchIds);
    }


    private BooleanExpression isMemberId(Long memberId) {
        return memberId == null ? null : member.id.eq(memberId);
    }

    private BooleanExpression isExpired(Boolean isExpired) {
        return isExpired == null ? null: randomMatch.isExpired.eq(isExpired);
    }

    private BooleanExpression isCreatedAtAfter(LocalDateTime createdAt) {
        return createdAt == null ? null: randomMatch.createdAt.after(createdAt);
    }

    private BooleanExpression isContentCategory(ContentCategory contentCategory) {
        return contentCategory == null ? null : randomMatch.randomMatchCondition.contentCategory.eq(contentCategory);
    }

3-4. Polling 방식으로 매칭 알고리즘을 동작시키던 방식에서 매칭 신청 시 매칭 알고리즘을 동작시키는 Kafka Event를 produce하는 방식으로 변경

  • 밑에 있는 방법들의 문제점을 해결할 수 있기 때문이다.

Polling방식의 문제점

  1. polling자체가 비효율적이다. 상태변화가 없더라도 DB를 조회해야한다.
  2. Scheduler나 Batch는 테스트를 하기가 상대적으로 힘들다.
  3. WAS를 여러 대 두는 방식으로 아키텍처를 구성하고 있다. 만약 api내에 Scheduling이 존재하면 배포 될 때 특정 WAS한대에서만 Scheduling하는 것으로 처리해야하는데 가능한지 여부를 잘 모르겠으며 가능하다해도 추가적인 조취가 필요하다.
  4. 3의 문제 때문에 Polling작업을 처리하기 위한 서버를 따로 둔다. 문제는 이 서버에 장애가 발생했을 때 감지하거나 자동적으로 새로 시작하게 만들기 힘들다.

매칭 신청API 호출시 동기적으로 매칭 알고리즘을 실행시키는 것의 문제점

  1. 매칭 신청 API호출 시 동기적으로 매칭 알고리즘을 실행시키는 경우 사용자가 매칭 알고리즘 수행이 완료도리 때까지 매번 기다려야한다.

매칭 신청 API 호출시 비동기 스레드를 생성하여 매칭 알고리즘을 실행시키는 것의 문제점

  1. @Async 를 통해 비동기 스레드를 생성하면 API요청이 동기적으로 동작하는 문제를 해결할 수 있다.
  2. 하지만 매칭 알고리즘을 수행하는 과정에서 어떤 문제가 발생하여 정상동작이 실패하더라도 이를 재실행하는 것이 보장되지 않는다.

매칭 신청 API 호출시 Kafka Topic에 Event를 produce한다.

  1. 예외가 발생하면 Ack를 보내지 않는 방식으로 매칭 알고리즘이 정상 동작할 때까지 적어도 한번은 동작하도로 할 수 있다.
  2. 위에서의 문제점들을 대부분 해결해준다.
@PreAuthorize("hasAuthority('random-match.create')")
    @Operation(summary = "랜덤 매칭 신청", description = "랜덤 매칭 신청")
    @PostMapping("/random-matches")
    public ResponseEntity<Void> applyRandomMatch(
        @ApiParam(hidden = true) @AuthenticationPrincipal UserDetails user,
        @Validated @Parameter @RequestBody RandomMatchDto randomMatchDto) {
        LocalDateTime now = CustomTimeUtils.nowWithoutNano();
        List<RandomMatch> randomMatch = randomMatchService.createRandomMatch(user.getUsername(),
            randomMatchDto, now);
            //랜덤매칭 이벤트 produce
        randomMatchProducer.send(randomMatchDto.createMatchMakingEvent(now));
        return ResponseEntity.status(HttpStatus.CREATED).build();
    }
@KafkaListener(topics = "${kafka.topic.match-making.name}", groupId = "${kafka.consumer.match-making.rdb-group-id}",
        containerFactory = "kafkaListenerContainerFactoryMatchMakingEvent")
    public void matchMakingConsumerGroup(@Payload MatchMakingEvent matchMakingEvent, Acknowledgment ack) {
        log.info("matchMakingConsumerGroup");
        MatchMakingDto matchMakingDto = matchMakingService.matchMaking(matchMakingEvent.getNow());
        // 알림 생성.
        for (List<AlarmEvent> alarmEvents: matchMakingDto.getAlarmEvents()){
            alarmEvents.forEach(alarmProducer::send);
        }
        //slack 알림이 보내지지 않아도 ack를 보내야 함.
        matchMakingDto.getEmails().forEach( emails -> {
            try{
                slackBotService.createSlackMIIM(emails);
            } catch(Exception e){
                log.error("slackBotService.createSlackMIIM(matchMakingDto) error");
            }
        });
        ack.acknowledge();
    }

Producer Consumer설정 기준

단 한번만 실행되야하는가

  • kafka는 설정에 따라 produce시 이벤트가 한번보다 더 생성될 수 있고 consume시에도 단한번 consume하는 것은 보장 되지 않는다.
  • 여러번 실행되어도 상관없다.

event produce 순서를 보장받을 필요가 없다.

  • key값을 지정할 경우 event를 하나의 parition에만 생성하도록 하여 순서를 보장할 수 있다.
  • 하지만 여기서 event는 순서를 보장받을 필요가 없다.

동시에 event가 consume되면 안된다.

  • 매칭 실행 event가 동시에 여러 consumer에서 실행될 필요가 없으며 문제가 발생할 수 있다.
  • 이 경우 key값을 지정하여 하나의 parition에 event가 생성되도록 하는 방법으로 해결하였다.

Kafka설정 내용

/**
     * Partition의 경우 어처피 순서 보장을 위해 key 값을 지정하여 특정 파티션에서만 생성되기 때문에 1로 설정
     * @return
     */
    @Bean
    public NewTopic matchMakingNewTopic() {
        return new NewTopic(matchMakingTopicName, 1, Short.parseShort(replicationFactor));
    }
@Bean
    public ProducerFactory<String, MatchMakingEvent> matchMakingEventProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.ACKS_CONFIG, acksConfig);
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, retry);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);

        return new DefaultKafkaProducerFactory<>(configProps);
    }
@Bean
    public ConsumerFactory<String, MatchMakingEvent> matchMakingEventRedisConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, redisGroupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        return new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(),
            new JsonDeserializer<>(MatchMakingEvent.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MatchMakingEvent> kafkaListenerContainerFactoryMatchMakingEvent() {
        ConcurrentKafkaListenerContainerFactory<String, MatchMakingEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(matchMakingEventRedisConsumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }
profile
Fail Fast

0개의 댓글