Redis Queue 동시성 문제 체크포인트

김형준·2025년 2월 20일
0

ZSet 대신 Set을 도입해 역직렬화 문제를 해결했지만, 그 결과 요청 큐와 Completable 저장공간의 동기화 문제가 발생했다.

따라서 아래와 같이 두 공간에 대한 데이터 저장 작업의 원자성을 보장하기 위해 아래와 같이 synchroized를 적용하였다.

@Service
public class RequestProcessor {
    private final RequestFutureStore futureStore;
    private final RequestQueue requestQueue;
    // 동기화를 위한 공유 락 객체
    private final Object lock = new Object();

    public RequestProcessor(
            RequestFutureStore futureStore,
            @Qualifier("redisQueue") RequestQueue requestQueue
    ) {
        this.futureStore = futureStore;
        this.requestQueue = requestQueue;
    }

    public void registerAndEnqueue(
            String requestId, ReservationReqDto request, CompletableFuture<List<ReservationGetResDto>> future
    ) {
        synchronized (lock) {
            futureStore.registerFuture(requestId, future);
            boolean enqueued = requestQueue.enqueue(request);
            if (!enqueued) {
                futureStore.removeFuture(requestId);
                throw new RuntimeException("Failed to enqueue request");
            }
        }
    }
}

이후 Redis, ConcurrentHashMap 각각의 용량을 확인하는 로직을 추가하였다.

@Override
    public boolean enqueue(ReservationReqDto request) {
        String luaScript = "local current = redis.call('SCARD', KEYS[1]) " +
                "if current < tonumber(ARGV[1]) then " +
                "  redis.call('SADD', KEYS[1], ARGV[2]) " +
                "  return 1 " +
                "else " +
                "  return 0 " +
                "end";

        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(luaScript);
        redisScript.setResultType(Long.class);

        // capacity와 request 값을 파라미터로 전달
        Long result = redisTemplate.execute(
                redisScript,
                List.of(QUEUE_KEY),
                String.valueOf(capacity),
                request
        );

        return result == 1;
    }

public void registerFuture(String requestId, CompletableFuture<List<ReservationGetResDto>> future) {
        if (capacity <= futureMap.size()) { throw new RuntimeException("큐 용량 초과"); }
        futureMap.put(requestId, future);
    }

데이터 조회 & 추가 작업의 원자성을 위해 도입한 luaScript 실행 시 에러가 발생함

redis-cli 로그를 확인해보니 SCARD, EVALSHA가 실행되고 있었음

"EVALSHA" "ac1cb0ae6b44cd8c9a3e377227751189ea7aac10" "1" "reservation:request:queue" "\"25\"" "{\"@class\":\"org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl\",\"restaurantId\":1,\"requestId\":\"36eb4a30-a7fc-42d9-89e0-ff0219cfde9a\"}"
"SCARD" "reservation:request:queue"

두 명령어 모두 의도한 대로 동작하고 있는 것 같았음

하지만 계속 Error in executing이 발생했고, 확인해보니 스크립트 파라미터로 전달되는 값들에 직렬화 문제가 발생하고 있었음

스크립트 및 스크립트 매개변수들을 수정해서 직렬화 문제를 해결할 수도 있겠지만, 파생될 수 있는 역직렬화 문제 등을 생각하면 공부를 더 한 후에 접근하는 것이 맞다고 판단함

또한 비동기를 위해 별도의 저장공간을 도입해 발생하게 된 동기화 문제도 해결하기 어려운 상황

따라서 먼저 지금까지 구현한 코드의 실행 과정을 정리하고 각 과정에서 발생할 수 있거나 발생한 동기화 문제들을 정리해 볼 예정

작성한 코드의 실행 과정(25/02/20 기준)

+-----------------------+
|    메인 스레드 시작    |
+-----------+-----------+
            |
            v
+-----------------------+
|  ExecutorService 생성  |
|  (스레드 풀: 250)   |
+-----------+-----------+
            |
            v
+-----------------------+
|  250개의 스레드 생성   |
+-----------+-----------+
            |
            v
+-------------------------------+
|       각 스레드의 작업        |
+-----------+-------------------+
            |
            v
+--------------------------------------+
| 1. requestId 및 future 생성          |
+--------------------------------------+
| 2. registerAndEnqueue 호출           |
|    - synchronized(lock) 진입         |
|    - futureStore 용량 체크 및 등록   |
|    - requestQueue 용량 체크 및 큐잉  |
|    - 동기화 블록 종료                |
+--------------------------------------+
| 3. processQueue 호출                 |
|    - synchronized 메서드 진입        |
|    - 큐에서 요청 pop                 |
|    - 요청의 process 메서드 호출      |
|    - future 완료 및 제거             |
+--------------------------------------+
| 4. future.get()으로 결과 대기        |
| 5. 결과 처리 및 카운트 증가          |
| 6. latch.countDown() 호출            |
+-----------+-------------------+
            |
            v
+------------------------------+
|    메인 스레드 종료           |
|  - latch.await() 대기        |
|  - 결과 검증 및 스레드 풀 종료 |
+------------------------------+

서비스 코드들

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.example.easytable.reservation.service.queueing.RequestFutureStore;
import org.example.easytable.reservation.service.ReservationService;

import java.util.List;
import java.util.concurrent.CompletableFuture;

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class ReservationGetByRestaurantReqDtoImpl implements ReservationReqDto {
    private Long restaurantId;
    private String requestId;

    @Override
    public void process(ReservationService service, RequestFutureStore futureStore) {
        CompletableFuture<List<ReservationGetResDto>> future = futureStore.getFuture(requestId);
        if (future == null) {
            throw new RuntimeException("CompletableFuture 조회 실패");
        }

        List<ReservationGetResDto> result = service.getReservationByRestaurant(restaurantId);
        try {
            future.complete(result);
        } catch (Exception e) {
            future.completeExceptionally(e);
        } finally {
            futureStore.removeFuture(requestId);
        }
    }

    @Override
    public String toString() {
        return "ReservationGetByRestaurantReqDtoImpl{" +
            "restaurantId=" + restaurantId +
            ", requestId='" + requestId + '\'' +
            '}';
    }
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.easytable.reservation.dto.request.ReservationReqDto;
import org.example.easytable.reservation.service.ReservationService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Component
@Qualifier("redisQueue")
@RequiredArgsConstructor
@Slf4j
public class RequestRedisQueueImpl implements RequestQueue {
    private static final String QUEUE_KEY = "reservation:request:queue";

    private final RedisTemplate<String, ReservationReqDto> redisTemplate;
    private final RequestFutureStore requestFutureStore;
    private final ReservationService service;

    @Value("${queue-capacity:25}")
    private int capacity;

    @Override
    public boolean enqueue(ReservationReqDto request) {
        Long size = redisTemplate.opsForSet().size(QUEUE_KEY);
        if(size == null) { throw new RuntimeException("큐 조회 실패"); }
        if(size >= capacity) { throw new RuntimeException("Redis 큐 용량 초과"); }
        Long result = redisTemplate.opsForSet().add(QUEUE_KEY, request);
        log.debug("queued Request: {}", request);
        return result != null;
    }

    @Override
    // @Scheduled(fixedDelay = 10000)
    public synchronized void processQueue() {
        ReservationReqDto request = redisTemplate.opsForSet().pop(QUEUE_KEY);
        if (request == null) {
            throw new RuntimeException("ReservationReqDto 조회 실패");
        }

        request.process(service, requestFutureStore);
    }
}
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class RequestFutureStore {
    // Reservation 외의 다른 곳에서도 사용할 경우 제네릭 타입 수정할 것
    private final ConcurrentHashMap<
            String, CompletableFuture<List<ReservationGetResDto>>> futureMap = new ConcurrentHashMap<>();

    // @Value("${queue-capacity:25}")
    private final int capacity = 25;

    public void registerFuture(String requestId, CompletableFuture<List<ReservationGetResDto>> future) {
        if (capacity <= futureMap.size()) { throw new RuntimeException("RequestFutureStore 용량 초과"); }
        futureMap.put(requestId, future);
    }

    public CompletableFuture<List<ReservationGetResDto>> getFuture(String requestId) {
        return futureMap.get(requestId);
    }

    public CompletableFuture<List<ReservationGetResDto>> removeFuture(String requestId) {
        return futureMap.remove(requestId);
    }
}
@Component
public class RequestRedisProcessor {
    private final RequestFutureStore futureStore;
    private final RequestQueue requestQueue;
    // 동기화를 위한 공유 락 객체
    private final Object lock = new Object();

    public RequestRedisProcessor(
            RequestFutureStore futureStore,
            @Qualifier("redisQueue") RequestQueue requestQueue
    ) {
        this.futureStore = futureStore;
        this.requestQueue = requestQueue;
    }

    public void registerAndEnqueue(
            String requestId, ReservationReqDto request, CompletableFuture<List<ReservationGetResDto>> future
    ) {
        synchronized (lock) {
            futureStore.registerFuture(requestId, future);
            boolean enqueued = requestQueue.enqueue(request);
            if (!enqueued) {
                futureStore.removeFuture(requestId);
                throw new RuntimeException("Failed to enqueue request");
            }
        }
    }
}

테스트 코드

import lombok.extern.slf4j.Slf4j;
import org.example.easytable.config.MockRequestQueue;
import org.example.easytable.config.QueueingTestConfig;
import org.example.easytable.exception.CustomException;
import org.example.easytable.exception.ErrorCode;
import org.example.easytable.reservation.dto.request.ReservationCreateReqDtoImpl;
import org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl;
import org.example.easytable.reservation.dto.request.ReservationPostReqDto;
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.example.easytable.reservation.service.queueing.RequestCollectionProcessor;
import org.example.easytable.reservation.service.queueing.RequestFutureStore;
import org.example.easytable.reservation.service.queueing.RequestRedisProcessor;
import org.example.easytable.reservation.service.queueing.RequestQueue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

@ExtendWith(SpringExtension.class)
@SpringBootTest
@ContextConfiguration(classes = {QueueingTestConfig.class})
@Slf4j
public class RequestQueueingTest {
    private final RequestQueue collectionQueue;
    private final RequestQueue redisQueue;
    private final MockRequestQueue mockRequestQueue;
    private final RequestRedisProcessor redisProcessor;
    private final RequestCollectionProcessor collectionProcessor;

    private final Long restaurantId = 1L;
    private final Long memberId = 1L;
    private final ReservationPostReqDto postReqDto = new ReservationPostReqDto(LocalDateTime.now());

    @Value("${collectionQueue-capacity:25}")
    private int capacity;
    private int threadCount;
    private RequestFutureStore requestFutureStore;

    @Autowired
    public RequestQueueingTest(
        @Qualifier("mockQueue") MockRequestQueue mockRequestQueue,
        @Qualifier("collectionQueue") RequestQueue collectionQueue,
        @Qualifier("redisQueue") RequestQueue redisQueue,
        RequestRedisProcessor redisProcessor, RequestCollectionProcessor collectionProcessor
    ) {
        this.mockRequestQueue = mockRequestQueue;
        this.collectionQueue = collectionQueue;
        this.redisQueue = redisQueue;
        this.redisProcessor = redisProcessor;
        this.collectionProcessor = collectionProcessor;
    }

    @BeforeEach
    public void init() {
        requestFutureStore = new RequestFutureStore();
    }

    @Test
    public void enqueueTest() throws InterruptedException {
        // given
        threadCount = 100;

        // when
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCnt = new AtomicInteger();

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    mockRequestQueue.enqueue(new ReservationCreateReqDtoImpl(
                            restaurantId, memberId, postReqDto));
                    successCnt.incrementAndGet();
                } catch (Exception e) {
                    log.error(e.getMessage());
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executorService.shutdown();

        // then
        assertEquals(capacity, successCnt.intValue(), mockRequestQueue.getQueueSize());
    }

    @Test
    public void getRequestTest() {
        // given
        String uuid = UUID.randomUUID().toString();
        CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();

        // when
        requestFutureStore.registerFuture(uuid, future);

        // then
        assertEquals(future, requestFutureStore.getFuture(uuid));
    }

    @Test
    public void processCollectionQueueTest() throws InterruptedException {
        // given
        threadCount = 250;
        Long restaurantId = 1L;

        // when
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCnt = new AtomicInteger();
        AtomicInteger foundReservationCount = new AtomicInteger();

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                String requestId = UUID.randomUUID().toString();
                CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
                log.info("generated requestId: " + requestId);

                try {
                    collectionProcessor.registerAndEnqueue(
                        requestId, new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId), future);
                    collectionQueue.processQueue();

                    List<ReservationGetResDto> foundReservations = future.get(10, TimeUnit.SECONDS);
                    log.info("current future:" + future +
                        " successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
                    successCnt.incrementAndGet();
                    foundReservationCount.addAndGet(foundReservations.size());
                } catch (TimeoutException e) {
                    log.error("시간 초과: {}", e.getMessage());
                    future.cancel(true);
                } catch (Exception e) {
                    log.error(e.getMessage());
                    future.completeExceptionally(e);
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executorService.shutdown();

        // then
        // 큐를 대기하도록 구현하는 경우 threadCount와 비교하도록 수정할 것
        assertEquals(capacity, successCnt.intValue());
        assertEquals(0, foundReservationCount.intValue());
    }

    @Test
    public void processRedisQueueTest() throws InterruptedException {
        // given
        threadCount = 250;
        Long restaurantId = 1L;

        // when
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCnt = new AtomicInteger();
        AtomicInteger foundReservationCount = new AtomicInteger();

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                String requestId = UUID.randomUUID().toString();
                CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
                log.info("generated requestId: " + requestId);

                try {
                    redisProcessor.registerAndEnqueue(
                            requestId, new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId), future);
                    redisQueue.processQueue();

                    List<ReservationGetResDto> foundReservations = future.get(10, TimeUnit.SECONDS);
                    log.info("current future:" + future +
                        " successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
                    successCnt.incrementAndGet();
                    foundReservationCount.addAndGet(foundReservations.size());
                } catch (TimeoutException e) {
                    log.error("시간 초과: {}", e.getMessage());
                    future.cancel(true);
                } catch (Exception e) {
                    log.error(e.getMessage());
                    future.completeExceptionally(e);
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executorService.shutdown();

        // then
        // 큐를 대기하도록 구현하는 경우 threadCount와 비교하도록 수정할 것
        assertEquals(capacity, successCnt.intValue());
        assertEquals(0, foundReservationCount.intValue());
    }
}

문제 상황

  • registerAndEnqueue()와 processQueue() 사이의 동기화 불일치
    • 현재 코드 상에서 registerAndEnqueue()와 processQueue()는 서로 다른 방식의 lock을 적용 중
    • 이로 인해 lock 범위가 달라지고, 데이터 불일치가 발생할 수 있음
    • Redis 분산 lock을 사용할 수도 있었지만, api/v2/reservations에 대한 모든 요청들이 같은 lock을 사용해 동시성이 지켜지도록 구현해야 했음
    • 따라서 현재 @LockKey를 적용해야 하는 @RedissonLock은 사용 불가능
    • registerAndEnqueue()와 processQueue()를 하나의 lock으로 묶으면 성능 하락의 여지가 큼
  • Redis Set 관련 문제
    • Spring Data Redis의 opsForSet().pop(QUEUE_KEY)는 원자성을 보장하지 않아 동일한 요청이 중복으로 처리되거나 일부 요청이 누락될 수 있음
    • Set은 순서도 보장되지 않아 pop 시 큐에 들어간 순서와 상관없이 요청을 얻게 됨
    • ZSet, List 등 순서와 원자성이 보장되는 자료구조를 사용할 수도 있었으나, 요청 DTO 객체 저장 시 (역)직렬화 문제에 크게 부딪힘
      • record → class, final 키워드 제거, @JsonProperty & @JsonCreator 도입, 커스텀 Serializer 적용, Gson 등 (역)직렬화 문제 해결을 위한 다양한 수단들을 동원해봤지만 소용없었음
  • CompletableFuture 관련 문제
    • 현재 요청에 대한 처리 결과를 비동기적으로 받아오도록 구현하기 위해 CompletableFuture를 도입함
    • Callable, DeferredResult에 비해 비교적 다양한 기능을 제공하며, Reactor/WebFlux처럼 러닝 커브 및 비동기를 위한 복잡한 재설계를 요구하지 않음
    • 하지만 이 객체를 이용해 Spring MVC 구조에서 요청들을 큐잉할 때 문제가 생김
      • Redis에 요청을 queueing할 때, 다시 요청을 꺼낸 후 처리하고 결과를 반환하기 위해서는 CompletableFuture가 처리 시점에 같이 있어야 함
      • 이를 위해 요청과 CompletableFuture를 포함하는 별도의 DTO를 정의하고 이를 Redis에 queueing함
      • 하지만 CompletableFuture 객체에는 getter/setter로 접근할 수 없는 필드가 존재하고, 따라서 Redis에 저장하기 위한 Jackson (역)직렬화 과정에서 항상 실패함
      • 이를 해결하기 위해 CompletableFuture 객체를 따로 queueing 가능한 자료구조 RequestFutureStore를 만듬
      • 그 결과 Redis queue와 RequestFutureStore 간 동기화를 위해 신경써야 할 점이 많아짐
        • 그 중 일부 사항에 대해서는 어떤 식으로 동기화를 관리해야 하는지 감이 잡히지 않음

0개의 댓글

관련 채용 정보