이번 프로젝트에서는 대기열 기능을 구현하기 위해 Redis의 Sorted Set(ZSet)을 사용합니다.
ZSet은 중복을 허용하지 않으며, 각 요소에 부여된 score 값을 기준으로 자동 오름차순 정렬이 이루어지는 구조입니다.
이를 통해 순위 조회, 자동 정렬, 범위 검색 등 대기열 시스템에 필요한 다양한 기능을 사용할 수 있습니다.
Redis 설정 코드
@Configuration
public class RedisConfig {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private int port;
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisSerializationContext<String, String> context =
RedisSerializationContext.<String, String>newSerializationContext(new StringRedisSerializer())
.value(new StringRedisSerializer())
.build();
return new ReactiveRedisTemplate<>(lettuceConnectionFactory, context);
}
}
reactiveRedisTemplate.opsForZSet()
Redis의 Sorted Set( ZSet ) 자료구조에 접근하기 위한 메서드
reactiveRedisTemplate.opsForZSet().add(K key, V value, double score)
ZSet에 value를 score 기준으로 저장하는 작업
K key: Redis에서 사용할 Zset의 키 이름 즉, Zset의 이름
V value: Zset에 저장할 값
double score: 값을 정렬할 기준 값, 보통 타임스탬프, 점수, 우선순위 등등 ...
reactiveRedisTemplate.opsForZSet().rank(K key, Object o)
Zset에서 o의 순위를 반환하는 작업 ( 낮은 score 기준 몇 번째인지, 0부터 시작 )
( 높은 score를 상위로 보고 싶다면 reverseRank()를 사용 )
K key: Redis에서 사용할 Zset의 키 이름 즉, Zset의 이름
Object o: 순위를 조회하려는 ZSet 안의 값( value )
reactiveRedisTemplate.opsForZSet().popMin(K key, long count)
Zset에서 score가 가장 낮은 항목을 하나 또는 여러 개 꺼내는 작업
K key: Redis에서 사용할 Zset의 키 이름 즉, Zset의 이름
long count: 꺼낼 항목 개수
reactiveRedisTemplate.opsForZSet().remove(K key, Object o)
Zset에서 o를 삭제하는 작업
K key: Redis에서 사용할 Zset의 키 이름 즉, Zset의 이름
Object o: 삭제하려는 ZSet 안의 값( value )
Mono.just(…)
값 하나를 감싸서 Mono형으로 만들어주는 함수
Mono.just("hello") // Mono<String>
Mono.just(42) // Mono<Integer>
Mono.just(-1L) // Mono<Long>
map()
전달된 값을 다른 값으로 변환할 때 사용
코드 예시
Mono<Boolean> existsInWaitQueue = reactiveRedisTemplate.opsForZSet()
.rank(queueType + WAIT_QUEUE, userId)
.map(rank -> true) // 순위 값이 있다면 true로 반환
.defaultIfEmpty(false);
flatMap()
비동기 작업을 이어서 연결, Mono → Mono로 바꿀 때, 변환 결과도 Mono일 경우 사용
코드 예시
allowed가 true이면 → 바로 Mono.just(-1L) 반환 → Mono 안에서 Mono를 반환해야 하므로 flatMap 사용
public Mono<Long> searchUserRanking(String userId, String queueType) {
return isAllowedUser(queueType, userId)
.flatMap(allowed -> {
if (allowed) {
return Mono.just(-1L);
}
return reactiveRedisTemplate.opsForZSet().rank(queueType + WAIT_QUEUE, userId)
.flatMap(rank -> {
if (rank == null) {
return Mono.error(new ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.USER_NOT_FOUND_IN_THE_QUEUE));
}
return Mono.just(rank + 1);
});
})
filter()
조건에 맞는 값만 필터링
코드 예시
return reactiveRedisTemplate.opsForZSet()
.add(queueType + WAIT_QUEUE, userId, enterTimestamp)
.filter(i -> i)
.switchIfEmpty(Mono.error(new ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.ALREADY_REGISTERED_USER)))
switchIfEmpty()
이전 연산 결과가 Mono.empty()일 경우 대체 동작을 수행
코드 예시
return reactiveRedisTemplate.opsForZSet()
.add(queueType + WAIT_QUEUE, userId, enterTimestamp)
.filter(i -> i)
.switchIfEmpty(Mono.error(new ReserveException(HttpStatus.BAD_REQUEST, ErrorCode.ALREADY_REGISTERED_USER)))
filter(i -> i)에서 Mono.empty()를 반환할 경우 예외를 던지게
defaultIfEmpty()
값이 존재하지 않을 경우 대체 값을 지정
코드 예시
Mono<Boolean> existsInWaitQueue = reactiveRedisTemplate.opsForZSet()
.rank(queueType + WAIT_QUEUE, userId)
.map(rank -> true)
.defaultIfEmpty(false); // value가 없다면 false 반환
then()
앞의 작업 완료 후 다른 작업 실행 ( 값을 무시함 )
코드 예시
return reactiveRedisTemplate.opsForZSet().add(queueType + ALLOW_QUEUE, userId, timestamp)
.then(
reactiveRedisTemplate.opsForValue().set(tokenKey, "allowed", Duration.ofMinutes(10))
)
.thenReturn(userId);
count
Flux의 요소 개수를 Mono로 반환
코드 예시
public Mono<Long> allowUser(String queueType, Long count) {
return reactiveRedisTemplate.opsForZSet()
.popMin(queueType + WAIT_QUEUE, count)
.flatMap(member -> {
String userId = member.getValue();
long timestamp = Instant.now().getEpochSecond();
String tokenKey = "token:" + userId + ":TTL";
return reactiveRedisTemplate.opsForZSet()
.add(queueType + ALLOW_QUEUE, userId, timestamp)
.then(
reactiveRedisTemplate.opsForValue()
.set(tokenKey, "allowed", Duration.ofMinutes(10))
)
.thenReturn(userId);
})
.count()
.doOnSuccess(allowedCount -> log.info("허용큐로 이동된 사용자 수: {}", allowedCount));
}
popMin(…)는 반환형이 Flux<TypedTuple>이므로 count()를 통해 Mono으로 반환
subscribe() ⭐️
리액티브 스트림(Mono , Flux)을 구독 비동기 데이터 흐름을 실행하고, 결과를 받아 처리하는 메서드
리액티브 스트림은 lazy( 지연 실행 )이므로 map(), filter() 같은 연산을 해도 subscribe()를 호출하기 전까진 아무 일도 일어나지 않음
Spring WebFlux에서는 컨트롤러에서 Mono나 Flux를 리턴하면 Spring이 자동으로 subscribe()를 호출하므로 직접 호출하지 않아도 되지만, side-effect가 있는 로직에는 사용해야 함
즉, 반환형이 Mono나 Flux를 리턴하지 않는 곳에서 Mono나 Flux를 사용할 경우 subscribe() 사용해야만 함
doOnSuccess()
성공 시 side effect 처리 ( 로깅 등등 )
Mono.just("성공")
.doOnSuccess(val -> log.info("성공 값: " + val))
.subscribe();
Mono.empty()인 경우에도 doOnSuccess(null)이 호출