동시성 문제를 해결해 보자 - 1

PGD·3일 전
0

문제 상황

도메인

현재 Healthfriend라는 프로젝트에서 백엔드 개발로 참여하고 있다. Healthfriend는 같이 운동하고 싶어하는 사람들끼리 매칭해 주는 서비스로 요약할 수 있는데, 이 글에서 다루고자 하는 문제는 "매칭"의 동시성 문제다.

우선 매칭이 어떤 방식으로 이루어지는지에 대해 먼저 이해하는 편이 좋겠다. 매칭 서비스는 다음과 같은 플로우를 거친다.

  1. 한 유저가 다른 유저에게 매칭을 신청한다. 매칭을 신청한 유저를 requester, 매칭 신청을 받은 유저를 requestee라고 하겠다.
  2. requestee는 자신에게 온 매칭 신청을 조회할 수 있다. 매칭 신청 내역을 조회한 후, 이를 수락할 수도 있고 거절할 수도 있다. 거절한다면 플로우는 여기서 종료되고, 수락한다면 다음 플로우로 나아간다.
  3. 매칭 신청에는 만남 장소와 만남 시간이 있다. 만나서 같이 운동할 수 있도록 매칭해 주는 서비스를 제공하는 게 Healthfriend 서비스의 기본 골자다. 만남 시간은 requester가 정한다. 매칭 수락 이후 requester와 requestee는 만남 시간에 만남 장소에서 같이 운동한다.
  4. 시스템에서는 만남 시간이 지난 후 requester와 requestee 각각에게 후기 요청 알림을 보낸다. requester와 requestee 각각 서로에게 후기를 남기면 매칭은 종료된다.

문제

여기서 해결해야 할 문제는 1번이다. 매칭이 종료되지도 않았는데 requester가 동일한 requestee에게 동일한 매칭 신청을 보내면 안 된다.

그렇기 때문에 중복된 매칭 신청이 있는지 확인하는 로직을 다음과 같이 구현했다.

public Long requestMatching(MatchingRequestDto requestDto) {
    if (this.matchingRepository.existsDuplicateMatchingRequest(requestDto.getRequesterId(), LocalDate.now())) {
        throw new OutOfLimitMatchingRequestException("매칭 중복");
    }
    
    try {
        Matching savedMatching = this.matchingRepository.save(
                new Matching(
                        new Member(requestDto.getRequesterId()),
                        new Member(requestDto.getTargetId()),
                        requestDto.getMeetingPlace(),
                        requestDto.getMeetingPlaceAddress(),
                        requestDto.getMeetingTime()
                )
        );
        return savedMatching.getMatchingId();
    } catch (DataIntegrityViolationException e) {
        throw new MemberNotFoundException(e);
    }
}

로직은 간단하다. if (this.matchingRepository.existsDuplicateMatchingRequest(requestDto.getRequesterId(), LocalDate.now())) 이 구문에서 MatchingRepository에 중복된 매칭이 있는지 확인하는 것이다.

(지금 생각해 보면 비즈니스 로직이 Repository 레이어에 노출되고 있는 느낌이다. 이 부분에 대해서도 고민해 봐야겠다. 우선 이 문제는 글과 무관하므로 지금은 차치한다)

Tomcat Servlet Container에서는, Tomcat뿐만 아니라 수많은 Servlet Container에서는 각각의 요청을 서로 다른 thread에서 처리한다. 즉, requestMatching 메소드에 여러 thread가 동시에 접근할 수 있다는 것이다.

동시성 문제가 발생하지 않을 수 없다. 만약 requester가 실수로, 혹은 고의로 매칭 신청 버튼을 광클해서 거의 동시에 복수의 매칭 신청 요청을 전송한다면?

requestMatching 메소드 첫 부분 if문 안의 조건식이 true가 나와서 throw new OutOfLimitMatchingRequestException 부분까지 도달하여 예외가 발생하려면 MatchingRepository 안에 이미 중복된 Matching이 저장된 상태여야 한다.

그러기 위해서는 이전 요청을 처리하는 thread가 try 안의 this.matchingService.save 메소드를 호출하는 부분까지 도달해서 매칭을 저장한 상태여야 한다. 그래야 이후 thread가 requestMatching 로직을 처리할 때 중복된 매칭이 MatchingRepository에 존재한다는 것을 확인하고 OutOfLimitMatchingRequestException을 발생시킬 테니까.

그런데 예를 들어, thread1과 thread2가 동시에 requestMatching 메소드에 도달한다고 하자. 이때 thread1과 thread2 이전의 thread는 없다고 가정한다. 다시 말해, requester가 매칭 요청 버튼을 광클한다고 했을 때 전송되는 첫 번째 요청과 두 번째 요청에 대한 thread인 것이다.

thread1과 thread2는 동시에 if문을 통과한다. 이 시점에 thread1과 thread2 둘 다 아직 this.matchingRepository.save 메소드 호출 시점까지 도달하지 못한 상태다. 매칭 신청 레코드가 아직 MatchingRepository에 없으므로 thread1, thread2 둘 다에 대해 if문 조건식은 false를 반환한다. 즉, thread1, thread2 모두 this.matchingRepository.save 메소드 호출 코드까지 도달하게 되는 것이다. 결국 두 개의 매칭 신청이 MatchingRepository에 저장된다. 이 때문에 데이터 무결성이 훼손된다.

기본적으로 데이터베이스 락은 단일 레코드에 대해 작용한다. 서로 다른 레코드를 추가하는 연산에 대해 기본 데이터베이스 락으로는 방지할 수 없다.

해결

이 문제를 해결하기 위해서는 requestMatching 메소드를 동기화하는 것이다. 이에 대해 몇 가지 방법을 생각할 수 있는데, 내가 생각한 방법은 다음 세 가지다.

  • Java synchronized 키워드를 통해 메소드 동기화
  • Transaction의 Isolation 레벨을 조정한다.
  • Redis를 활용해 분산 락을 구현한다.

테스트 코드

문제를 해결했는지 확인하기 위한 테스트 코드는 다음과 같다.

@ActiveProfiles({
        "secret",
        "no-auth",
        "constants",
        "local-dev",
        "priv"
})
@SpringBootTest
// TODO: 추후 SpringBootTest를 제거하고 더 빠른 유닛 테스트로 리팩토링
@Slf4j
class MatchingServiceConcurrencyTest {

    @Autowired
    MatchingService matchingService;

    @Autowired
    MemberRepository memberRepository;

    @Autowired
    MatchingRepository matchingRepository;

    @Autowired
    EntityManager em;

    @Autowired
    PlatformTransactionManager txManager;

    @DisplayName("requestMatching - 동시성 테스트")
    @Test
    void requestMatching_concurrency() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("TEST COUNT: {}", i + 1);
            try {
                runSingleTest_requestMatching_concurrency();
            } finally {
                TransactionStatus status = this.txManager.getTransaction(new DefaultTransactionAttribute());
                this.em.createNativeQuery("DELETE FROM matching").executeUpdate();
                this.em.createNativeQuery("DELETE FROM members").executeUpdate();
                this.txManager.commit(status);
                System.out.println("-------------------");
            }
        }
    }

    void runSingleTest_requestMatching_concurrency() throws InterruptedException {
        // Given
        Member sampleRequester =
                SampleEntityGenerator.generateSampleMember("requester@gmail.com", "requester");
        Member sampleTarget = SampleEntityGenerator.generateSampleMember("target@gmail.com", "target");

        this.memberRepository.save(sampleRequester);
        this.memberRepository.save(sampleTarget);

        ExecutorService executorService = Executors.newCachedThreadPool();

        MatchingRequestDto sameDto = MatchingRequestDto.builder()
                .requesterId(sampleRequester.getId())
                .targetId(sampleTarget.getId())
                .meetingPlace("pp")
                .meetingPlaceAddress("add")
                .meetingTime(LocalDateTime.now().plusDays(1))
                .build();

        // When
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                try {
                    this.matchingService.requestMatching(sameDto);
                } catch (OutOfLimitMatchingRequestException e) {
                    log.info("OutOfLimit");
                }
            });
        }

        // Then
        executorService.shutdown();
        boolean terminatedCorrectly = executorService.awaitTermination(5, TimeUnit.SECONDS);
        log.info("정상 종료?: {}", terminatedCorrectly);

        Page<MatchingListResponseDto> result = this.matchingRepository.findByMemberIdWithConditions(
                sameDto.getRequesterId(),
                MatchingFetchType.ALL,
                MatchingStatusCondition.ALL,
                PageRequest.of(0, 20)
        );

        assertThat(result.getTotalElements()).isEqualTo(1);

        log.info("result size={}", result.getTotalElements());
        log.info("content size={}", result.getContent().size());
        for (MatchingListResponseDto l : result.getContent()) {
            log.info("matchingId={}", l.matchingId());
            log.info("opponent={}\n----------", l.opponentInfo());
        }
    }
}

ExecutorService로 다중 thread를 발생시키고 각 thread에서 requestMatching 메소드를 호출한다. 이때 병렬 처리의 랜덤성을 고려하여 다중 thread가 requestMatching을 호출하는 테스트를 10번 반복하여 테스트의 신뢰성을 높인다.

또한 runSingleTest_requestMatching_concurrency가 한 번 종료될 때마다 데이터베이스에 저장된 값을 초기화한다. 초기화는 DELETE를 통해 레코드를 삭제함으로써 수행된다. DELETE 연산은 벌크 연산이고 EntityManager를 통한 벌크 연산은 transaction 내에서 수행되어야 하기 때문에 PlatformTransactionManager를 통해 transaction을 실행하고 커밋한다.

runSingleTest_requestMatching_concurrency 호출에 대해서 실제로 저장되는 Matching의 레코드는 단 1개여야 한다. 이를 검증하기 위해 this.matchingRepository.findByMemberIdWithConditions 메소드를 통해 MatchingRepository에서 Matching 데이터를 가져오고, 가져온 레코드의 개수가 1개인지 확인한다. 확인은 assertThat(result.getTotalElements()).isEqualTo(1); 구문을 통해 이루어진다.

Matching 데이터를 가져오는 코드 간략한 설명

Page<MatchingListResponseDto> result = this.matchingRepository.findByMemberIdWithConditions(
        sameDto.getRequesterId(),
        MatchingFetchType.ALL,
        MatchingStatusCondition.ALL,
        PageRequest.of(0, 20)
);

테스트 코드에서 위 코드가 의미하는 바는 다음과 같다.

  • requesterId는 유저 (Member) 엔티티의 ID다. 해당 유저가 신청한, 혹은 신청을 받은 Matching을 가져온다.
  • MatchingFetchType내가 신청한 매칭을 가져올지, 아니면 신청을 받은 매칭을 가져올지, 아니면 둘 다 가져올지 결정하는 조건이다.
  • MatchingStatusCondition은 어떤 상태의 매칭을 가져올지 결정하는 조건이다. 매칭 상태는 수락 대기, 수락한 상태, 거절한 상태, 진행 중인 상태, 종료된 상태 등이 있다. ALL을 조건으로 설정했으니 매칭 상태에 관계 없이 모두 가져온다.
  • PageRequest.of(0, 20)은 페이징 정보다.

일단 실행

동시성 문제를 해결하지 않고 그냥 실행해 보자. 다음 결과가 나타난다.

org.opentest4j.AssertionFailedError: 
expected: 1L
 but was: 10L
Expected :1L
Actual   :10L

1개만 저장되어야 테스트 성공이지만, 예상한 바와 같이 1개보다 더 많은 레코드가 저장되었다.

Separation of concern

나는 동시성 처리를 requestMatching 메소드에서 수행하고 싶지 않다. requestMatching은 서비스 객체의 메소드이고, 여기서는 비즈니스 로직에만 집중하고 싶다.

Separation of concern을 위해, AOP를 활용해 동시성 처리를 Service 로직으로부터 분리시켰다.

이를 위해 다음 두 가지를 구현하였는데, 하나는 Weaving 대상 메소드를 마킹해 줄 어노테이션이고, 다른 하나는 Advice를 제공하는 Aspect 객체다.

  • SynchronizedOperation
@Target({
        ElementType.TYPE,
        ElementType.METHOD
})
@Retention(RetentionPolicy.RUNTIME)
public @interface SynchronizedOperation {
}
  • SynchronizedOperationAspect
@Aspect
@Component
@RequiredArgsConstructor
public class SynchronizedOperationAspect {
    private final SynchronizedExecutor executor;

    @Around("@annotation(com.hf.healthfriend.global.concurrency.SynchronizedOperation)")
    public Object aroundLogic(ProceedingJoinPoint joinPoint) throws Throwable {
        return this.executor.executeWithLock(() -> {
            try {
                return joinPoint.proceed(joinPoint.getArgs());
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        });
    }
}

SynchronizedOperationAspectSynchronizedOperation 어노테이션이 붙은 메소드를 찾아 weaving한다.

requestMatching 메소드에 @SynchronizedOperation 어노테이션을 달아줌으로써 requestMatching 메소드가 weaving 대상으로 등록된다.

@SynchronizedOperation
public Long requestMatching(MatchingRequestDto requestDto) {
    ...
}

완전한 Separation of concern이 이루어진 것은 아니다.

requestMatching 서비스 로직은 "동시성 처리"라는 cross-cutting concern으로부터 완전히 분리된 상태가 아니다. 해당 서비스 객체가 @SyncrhonizedOperation에 대해 의존하고 있기 때문이다. 이는 나름대로의 trade-off를 고려한 것이다. XML 등을 통해 동시성 처리 대상 객체와 메소드를 메타데이터화한다면 서비스 로직으로부터 동시성 처리 의존성을 완전히 제거할 수 있을 것이다. 그러나 그렇게 한다면 다음 문제가 발생할 수 있다.

  1. XML, 프로퍼티 파일 등 메타데이터화하는 데 비용이 발생한다.
  2. 메타데이터가 Java 소스코드로부터 분리된 곳에서 관리되기 때문에 유지보수에 비용이 발생한다.

이는 XML 기반 Spring Bean 관리와 어노테이션 기반 Spring Bean 관리의 차이라는 맥락과 비슷하다. Spring Bean을 어노테이션으로 등록한다면 소스코드와 Spring Framework 사이에 coupling이 발생하지만 어노테이션을 활용하는 편이 보통 생산성이 높다.

다형성을 통한 동시성 처리 로직 확장

앞서 동시성 처리를 해결하는 방법을 몇 가지 제시한 바를 다시 언급하자면,

  1. Java synchronized 키워드를 통해 메소드 동기화
  2. Transaction의 Isolation 레벨을 조정한다.
  3. Redis를 활용해 분산 락을 구현한다.

이렇게 여러 가지가 있다. 그런데 로컬에서는 2번 방법을 쓰고 싶은데 클라우드 배포 환경에서는 3번 방법을 쓰고 싶을 수 있다. 그러면 로컬에서 개발하다가 클라우드에 배포할 때 소스코드를 변경해야 하나?

제대로 된 Spring 개발자라면 이와 같은 짓거리는 하지 않을 것이다. Spring은 Profile이라는 것을 제공해 준다. Profile에 대한 설명은 이 글의 범주를 넘어서는 것이기 때문에 Spring Profile에 대해서 알고 있으리라 믿겠다. Spring Profile을 활용해 객체지향의 다형성을 제대로 활용할 수 있다는 점만 알고 있으면 된다.

위 코드를 보면 알 수 있듯, SynchronizedOperationAspect 객체는 SynchronizedExecutor 타입의 객체에 의존하고 있다. 이제 SynchronizedExecutor가 무엇인지 설명할 차례다.

  • SynchronizedExecutor
/**
 * 동시성 문제를 해결하기 위해 Lock을 사용하여 비즈니스 로직 메소드 처리
 */
public interface SynchronizedExecutor {

    Object executeWithLock(Supplier<Object> targetLogic) throws Throwable;
}

SynchronizedExecutor는 하나의 메소드를 정의한 인터페이스다. 이 인터페이스를 구현한 객체들은 각자 나름의 방법으로 동시성 문제를 처리하는 방법을 구현해야 한다.

Spring Profile을 통해 외부 설정을 변경함으로써 SynchronizedExecutor 구현체를 유연하게 변경할 수 있다. Transaction의 Isolation level을 조정함으로써 동시성 문제를 해결한 SynchronizedExecutor 구현체가 있을 수 있고, Redis 분산 락을 활용해 동시성 문제를 해결한 SyncrhonizedExecutor 구현체가 있을 수 있다.

그리고 각 SynchronizedExecutor 구현체에 Profile을 설정함으로써 소스코드 변경 없이 실행 환경에 따라 각자 다른 Spring Bean이 등록되도록 설정할 수 있다.

어떤 SynchronizedExecutor 구현체가 있는지 설명함으로써 동시성 문제를 어떻게 해결할 수 있는지 설명할 수 있을 것이다. 이것에 대해선 다음 글에서 설명하겠다.

profile
student

0개의 댓글