ZSet 대신 Set을 도입해 역직렬화 문제를 해결했지만, 그 결과 요청 큐와 Completable 저장공간의 동기화 문제가 발생했다.
따라서 아래와 같이 두 공간에 대한 데이터 저장 작업의 원자성을 보장하기 위해 아래와 같이 synchroized를 적용하였다.
@Service
public class RequestProcessor {
private final RequestFutureStore futureStore;
private final RequestQueue requestQueue;
// 동기화를 위한 공유 락 객체
private final Object lock = new Object();
public RequestProcessor(
RequestFutureStore futureStore,
@Qualifier("redisQueue") RequestQueue requestQueue
) {
this.futureStore = futureStore;
this.requestQueue = requestQueue;
}
public void registerAndEnqueue(
String requestId, ReservationReqDto request, CompletableFuture<List<ReservationGetResDto>> future
) {
synchronized (lock) {
futureStore.registerFuture(requestId, future);
boolean enqueued = requestQueue.enqueue(request);
if (!enqueued) {
futureStore.removeFuture(requestId);
throw new RuntimeException("Failed to enqueue request");
}
}
}
}
이후 Redis, ConcurrentHashMap 각각의 용량을 확인하는 로직을 추가하였다.
@Override
public boolean enqueue(ReservationReqDto request) {
String luaScript = "local current = redis.call('SCARD', KEYS[1]) " +
"if current < tonumber(ARGV[1]) then " +
" redis.call('SADD', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(Long.class);
// capacity와 request 값을 파라미터로 전달
Long result = redisTemplate.execute(
redisScript,
List.of(QUEUE_KEY),
String.valueOf(capacity),
request
);
return result == 1;
}
public void registerFuture(String requestId, CompletableFuture<List<ReservationGetResDto>> future) {
if (capacity <= futureMap.size()) { throw new RuntimeException("큐 용량 초과"); }
futureMap.put(requestId, future);
}
데이터 조회 & 추가 작업의 원자성을 위해 도입한 luaScript 실행 시 에러가 발생함
redis-cli 로그를 확인해보니 SCARD, EVALSHA가 실행되고 있었음
"EVALSHA" "ac1cb0ae6b44cd8c9a3e377227751189ea7aac10" "1" "reservation:request:queue" "\"25\"" "{\"@class\":\"org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl\",\"restaurantId\":1,\"requestId\":\"36eb4a30-a7fc-42d9-89e0-ff0219cfde9a\"}"
"SCARD" "reservation:request:queue"
두 명령어 모두 의도한 대로 동작하고 있는 것 같았음
하지만 계속 Error in executing이 발생했고, 확인해보니 스크립트 파라미터로 전달되는 값들에 직렬화 문제가 발생하고 있었음
스크립트 및 스크립트 매개변수들을 수정해서 직렬화 문제를 해결할 수도 있겠지만, 파생될 수 있는 역직렬화 문제 등을 생각하면 공부를 더 한 후에 접근하는 것이 맞다고 판단함
또한 비동기를 위해 별도의 저장공간을 도입해 발생하게 된 동기화 문제도 해결하기 어려운 상황
따라서 먼저 지금까지 구현한 코드의 실행 과정을 정리하고 각 과정에서 발생할 수 있거나 발생한 동기화 문제들을 정리해 볼 예정
+-----------------------+
| 메인 스레드 시작 |
+-----------+-----------+
|
v
+-----------------------+
| ExecutorService 생성 |
| (스레드 풀: 250개) |
+-----------+-----------+
|
v
+-----------------------+
| 250개의 스레드 생성 |
+-----------+-----------+
|
v
+-------------------------------+
| 각 스레드의 작업 |
+-----------+-------------------+
|
v
+--------------------------------------+
| 1. requestId 및 future 생성 |
+--------------------------------------+
| 2. registerAndEnqueue 호출 |
| - synchronized(lock) 진입 |
| - futureStore 용량 체크 및 등록 |
| - requestQueue 용량 체크 및 큐잉 |
| - 동기화 블록 종료 |
+--------------------------------------+
| 3. processQueue 호출 |
| - synchronized 메서드 진입 |
| - 큐에서 요청 pop |
| - 요청의 process 메서드 호출 |
| - future 완료 및 제거 |
+--------------------------------------+
| 4. future.get()으로 결과 대기 |
| 5. 결과 처리 및 카운트 증가 |
| 6. latch.countDown() 호출 |
+-----------+-------------------+
|
v
+------------------------------+
| 메인 스레드 종료 |
| - latch.await() 대기 |
| - 결과 검증 및 스레드 풀 종료 |
+------------------------------+
서비스 코드들
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.example.easytable.reservation.service.queueing.RequestFutureStore;
import org.example.easytable.reservation.service.ReservationService;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class ReservationGetByRestaurantReqDtoImpl implements ReservationReqDto {
private Long restaurantId;
private String requestId;
@Override
public void process(ReservationService service, RequestFutureStore futureStore) {
CompletableFuture<List<ReservationGetResDto>> future = futureStore.getFuture(requestId);
if (future == null) {
throw new RuntimeException("CompletableFuture 조회 실패");
}
List<ReservationGetResDto> result = service.getReservationByRestaurant(restaurantId);
try {
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
futureStore.removeFuture(requestId);
}
}
@Override
public String toString() {
return "ReservationGetByRestaurantReqDtoImpl{" +
"restaurantId=" + restaurantId +
", requestId='" + requestId + '\'' +
'}';
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.easytable.reservation.dto.request.ReservationReqDto;
import org.example.easytable.reservation.service.ReservationService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Qualifier("redisQueue")
@RequiredArgsConstructor
@Slf4j
public class RequestRedisQueueImpl implements RequestQueue {
private static final String QUEUE_KEY = "reservation:request:queue";
private final RedisTemplate<String, ReservationReqDto> redisTemplate;
private final RequestFutureStore requestFutureStore;
private final ReservationService service;
@Value("${queue-capacity:25}")
private int capacity;
@Override
public boolean enqueue(ReservationReqDto request) {
Long size = redisTemplate.opsForSet().size(QUEUE_KEY);
if(size == null) { throw new RuntimeException("큐 조회 실패"); }
if(size >= capacity) { throw new RuntimeException("Redis 큐 용량 초과"); }
Long result = redisTemplate.opsForSet().add(QUEUE_KEY, request);
log.debug("queued Request: {}", request);
return result != null;
}
@Override
// @Scheduled(fixedDelay = 10000)
public synchronized void processQueue() {
ReservationReqDto request = redisTemplate.opsForSet().pop(QUEUE_KEY);
if (request == null) {
throw new RuntimeException("ReservationReqDto 조회 실패");
}
request.process(service, requestFutureStore);
}
}
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class RequestFutureStore {
// Reservation 외의 다른 곳에서도 사용할 경우 제네릭 타입 수정할 것
private final ConcurrentHashMap<
String, CompletableFuture<List<ReservationGetResDto>>> futureMap = new ConcurrentHashMap<>();
// @Value("${queue-capacity:25}")
private final int capacity = 25;
public void registerFuture(String requestId, CompletableFuture<List<ReservationGetResDto>> future) {
if (capacity <= futureMap.size()) { throw new RuntimeException("RequestFutureStore 용량 초과"); }
futureMap.put(requestId, future);
}
public CompletableFuture<List<ReservationGetResDto>> getFuture(String requestId) {
return futureMap.get(requestId);
}
public CompletableFuture<List<ReservationGetResDto>> removeFuture(String requestId) {
return futureMap.remove(requestId);
}
}
@Component
public class RequestRedisProcessor {
private final RequestFutureStore futureStore;
private final RequestQueue requestQueue;
// 동기화를 위한 공유 락 객체
private final Object lock = new Object();
public RequestRedisProcessor(
RequestFutureStore futureStore,
@Qualifier("redisQueue") RequestQueue requestQueue
) {
this.futureStore = futureStore;
this.requestQueue = requestQueue;
}
public void registerAndEnqueue(
String requestId, ReservationReqDto request, CompletableFuture<List<ReservationGetResDto>> future
) {
synchronized (lock) {
futureStore.registerFuture(requestId, future);
boolean enqueued = requestQueue.enqueue(request);
if (!enqueued) {
futureStore.removeFuture(requestId);
throw new RuntimeException("Failed to enqueue request");
}
}
}
}
테스트 코드
import lombok.extern.slf4j.Slf4j;
import org.example.easytable.config.MockRequestQueue;
import org.example.easytable.config.QueueingTestConfig;
import org.example.easytable.exception.CustomException;
import org.example.easytable.exception.ErrorCode;
import org.example.easytable.reservation.dto.request.ReservationCreateReqDtoImpl;
import org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl;
import org.example.easytable.reservation.dto.request.ReservationPostReqDto;
import org.example.easytable.reservation.dto.response.ReservationGetResDto;
import org.example.easytable.reservation.service.queueing.RequestCollectionProcessor;
import org.example.easytable.reservation.service.queueing.RequestFutureStore;
import org.example.easytable.reservation.service.queueing.RequestRedisProcessor;
import org.example.easytable.reservation.service.queueing.RequestQueue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(SpringExtension.class)
@SpringBootTest
@ContextConfiguration(classes = {QueueingTestConfig.class})
@Slf4j
public class RequestQueueingTest {
private final RequestQueue collectionQueue;
private final RequestQueue redisQueue;
private final MockRequestQueue mockRequestQueue;
private final RequestRedisProcessor redisProcessor;
private final RequestCollectionProcessor collectionProcessor;
private final Long restaurantId = 1L;
private final Long memberId = 1L;
private final ReservationPostReqDto postReqDto = new ReservationPostReqDto(LocalDateTime.now());
@Value("${collectionQueue-capacity:25}")
private int capacity;
private int threadCount;
private RequestFutureStore requestFutureStore;
@Autowired
public RequestQueueingTest(
@Qualifier("mockQueue") MockRequestQueue mockRequestQueue,
@Qualifier("collectionQueue") RequestQueue collectionQueue,
@Qualifier("redisQueue") RequestQueue redisQueue,
RequestRedisProcessor redisProcessor, RequestCollectionProcessor collectionProcessor
) {
this.mockRequestQueue = mockRequestQueue;
this.collectionQueue = collectionQueue;
this.redisQueue = redisQueue;
this.redisProcessor = redisProcessor;
this.collectionProcessor = collectionProcessor;
}
@BeforeEach
public void init() {
requestFutureStore = new RequestFutureStore();
}
@Test
public void enqueueTest() throws InterruptedException {
// given
threadCount = 100;
// when
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCnt = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
mockRequestQueue.enqueue(new ReservationCreateReqDtoImpl(
restaurantId, memberId, postReqDto));
successCnt.incrementAndGet();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
latch.countDown();
}
});
}
latch.await();
executorService.shutdown();
// then
assertEquals(capacity, successCnt.intValue(), mockRequestQueue.getQueueSize());
}
@Test
public void getRequestTest() {
// given
String uuid = UUID.randomUUID().toString();
CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
// when
requestFutureStore.registerFuture(uuid, future);
// then
assertEquals(future, requestFutureStore.getFuture(uuid));
}
@Test
public void processCollectionQueueTest() throws InterruptedException {
// given
threadCount = 250;
Long restaurantId = 1L;
// when
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCnt = new AtomicInteger();
AtomicInteger foundReservationCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
String requestId = UUID.randomUUID().toString();
CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
log.info("generated requestId: " + requestId);
try {
collectionProcessor.registerAndEnqueue(
requestId, new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId), future);
collectionQueue.processQueue();
List<ReservationGetResDto> foundReservations = future.get(10, TimeUnit.SECONDS);
log.info("current future:" + future +
" successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
successCnt.incrementAndGet();
foundReservationCount.addAndGet(foundReservations.size());
} catch (TimeoutException e) {
log.error("시간 초과: {}", e.getMessage());
future.cancel(true);
} catch (Exception e) {
log.error(e.getMessage());
future.completeExceptionally(e);
} finally {
latch.countDown();
}
});
}
latch.await();
executorService.shutdown();
// then
// 큐를 대기하도록 구현하는 경우 threadCount와 비교하도록 수정할 것
assertEquals(capacity, successCnt.intValue());
assertEquals(0, foundReservationCount.intValue());
}
@Test
public void processRedisQueueTest() throws InterruptedException {
// given
threadCount = 250;
Long restaurantId = 1L;
// when
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCnt = new AtomicInteger();
AtomicInteger foundReservationCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
String requestId = UUID.randomUUID().toString();
CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
log.info("generated requestId: " + requestId);
try {
redisProcessor.registerAndEnqueue(
requestId, new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId), future);
redisQueue.processQueue();
List<ReservationGetResDto> foundReservations = future.get(10, TimeUnit.SECONDS);
log.info("current future:" + future +
" successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
successCnt.incrementAndGet();
foundReservationCount.addAndGet(foundReservations.size());
} catch (TimeoutException e) {
log.error("시간 초과: {}", e.getMessage());
future.cancel(true);
} catch (Exception e) {
log.error(e.getMessage());
future.completeExceptionally(e);
} finally {
latch.countDown();
}
});
}
latch.await();
executorService.shutdown();
// then
// 큐를 대기하도록 구현하는 경우 threadCount와 비교하도록 수정할 것
assertEquals(capacity, successCnt.intValue());
assertEquals(0, foundReservationCount.intValue());
}
}
opsForSet().pop(QUEUE_KEY)
는 원자성을 보장하지 않아 동일한 요청이 중복으로 처리되거나 일부 요청이 누락될 수 있음