ShowPot 프로젝트에서는 Redis Pub/Sub을 사용하였는데, Spring Data Redis와 Lettuce 의존성을 추가하여 구현하였습니다. 이때 Spring Boot 애플리케이션이 Redis로 어떻게 PUBLISH 명령을 보내는지 확인을 해보겠습니다.
즉, Pub/Sub 시스템이 Redis의 데이터 저장소와는 전혀 다른 영역에서 동작하며, Redis의 데이터베이스 구분(번호)과 관계없이 어디에서나 메시지를 주고받을 수 있다는 것을 의미합니다.
Spring Data Redis에서는 네이티브 Redis와 어떻게 상호작용하는지 알아보겠습니다.
RedisCallback
을 사용하여 실제로 수행할 작업을 지정하고, exposeConnection
을 통해 네이티브 Redis 연결을 노출할지 여부를 제어할 수 있습니다. true이면 연결 객체를 직접 노출하게 되고, false일 경우 프록시 객체를 사용하여 Spring이 연결을 관리하게 됩니다.pipeline
변수는 redis 서버에 명령을 전송할 때 true일 경우 명령이 한 번에 묵어서 실행되고 일괄적으로 명령을 처리한 후 결과를 반환합니다. (주로 쓰기 전용 작업에서 사용) false일 경우 순차적으로 서버에 전송되고, 명령 실행 즉시 결과를 반환받는 설정입니다. (읽기 작업이나 결과 검증이 필요한 작업에서 사용)T result = action.doInRedis(connToExpose)
에서 실제로 Redis 작업이 수행됩니다. 이때 콜백(RedisCallback
)은 작업을 수행하고, 결과를 반환합니다. 즉 이 부분에서 우리는 publish 메서드가 실행이 되는 것입니다.RedisConnectionUtils.releaseConnection(conn, factory)
를 통해 Redis 연결을 반환하거나 닫으면서 메서드가 종료됩니다.Lettuce는 Netty (비동기 이벤트 기반 고성능 네트워크 프레임워크) 기반의 Redis 클라이언트입니다. 오직 동기식 연결만 사용한다면 Jedis를 사용하는 것이 나을 수 있습니다.
따라서 connection은 LettuceConnection이 됩니다.
Jedis보단 Lettuce를 사용하자
Lettuce Github
Netty Github
Netty Wiki
이제 publish 메서드에 대해 알아보겠습니다.
statefulConnection.async()
를 호출하여 비동기 명령 실행 인터페이스를 반환합니다.synchronizer
는 비동기 호출을 조정하고 동기화하는 역할을 하고, BaseRedusAsyncCommands의 publish 메서드를 함수형 인터페이스로 호출하게 됩니다.AbstractRedisAsyncCommands
구현체에서 commandBuilder.publish()
는 channel
과 message
를 인수로 받아 RedisCommand
객체를 반환합니다.
preProcessCommand
메서드는 주어진 명령을 전처리합니다. 이를 통해 특정 명령(SELECT, READONLY 등)에 맞는 후처리 작업을 준비합니다. 하지만 PUBLISH 명령어는 해당 메서드에 조건분기가 없으므로 후처리 작업 없이 RedisCommand 객체를 반환합니다.potentiallyEnableMulti
메서드는 트랜잭션 명령(MULTI)이 감지되면 트랜잭션 모드를 활성화합니다. 그러나 Redis Config, PUBLISH 메서드는 트랜잭션 명령 설정을 하지 않았기에 넘어갑니다.
validateWrite
메서드를 통해 쓰기 작업이 가능한지 검사합니다. 기존 큐 사이즈에 현재 명령의 수를 더한 값이 요청한 큐의 사이즈보다 클 때 예외가 발생하고, 연결이 안된 상태에서 버커 크기, 연결된 상태에서 버퍼 크기 등을 검사해서 작업이 가능한지 확인합니다.incrementWriters()
메서드에서는 현재 스레드가 베타적 락을 소유하고 있는지 확인하고 없으면 락을 획득합니다. 이때 JAVA의 ReentrantLock을 가지고 lock을 겁니다. 무한 루프(for (;;)
)는 쓰기 카운터가 음수일 경우(다른 스레드가 베타적 락을 소유하고 있는 경우), 계속해서 공유 락을 얻기 위해 기다리게 만듭니다. 공유 락을 얻은 후 ReentrantLock을 해제합니다. 이렇게 여러 쓰레드가 동시에 공유 락을 획득하는 경우에도 동시성 문제 없이 얻을 수 있습니다.if (autoFlushCommands) {
if (isConnected()) {
writeToChannelAndFlush(command);
} else {
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
java.nio.channels.SocketChannel
객체를 반환합니다. (Java의 NIO (Non-blocking I/O)에서 사용하는 기본 네트워크 통신 채널)QUEUE_SIZE.incrementAndGet(this)
에서 전송 대기열의 크기를 증가시킵니다. 이 명령이 전송되기 위해 큐에 들어갔다는 것을 기록합니다. channelWriteAndFlush(command)
: Redis 명령을 네트워크 채널로 전송하고 즉시 플러시합니다. ChannelFuture는 아직 발생하지 않은 I/O 작업을 나타냅니다. 이는 모든 작업이 Netty에서 비동기식이므로 요청된 작업이 아직 수행되지 않았을 수 있음을 의미합니다.AbstractChannel(io.lettuce.core.protocol
) → DeafaultChannelPipeLine(io.netty.channel
) → AbstractChannelHandlerContext(io.netty.channel
) 순으로 writeFlush 메서드가 실행이 됩니다. newPromise()
메서드는 이 메시지 전송 작업의 성공 또는 실패 여부를 추적할 수 있는 DefaultChannelPromise
객체를 반환하는데, 이는 Netty의 기본 Promise
구현체입니다.
next
컨텍스트의 이벤트 루프를 가져옵니다. EventExecutor
는 Netty에서 비동기 작업을 실행하는 객체입니다. 현재 스레드가 이벤트 루프에 포함되어 있지 않아 safeExecute(executor, task, promise, m, !flush)
작업을 이벤트 루프에 제출합니다. 실패할 경우 task.cancel()
을 호출하여 작업을 취소하고 대기 중인 바이트를 감소시킵니다.
Spring Boot에서 Redis를 사용할 때 Spring Data Redis와 Lettuce 의존성을 추가하여 Redis Pub/Sub에서 Publish 메서드가 어떻게 Redis로 요청이 가는지 확인할 수 있었습니다. Lettuce는 Netty 기반의 Redis 클라이언트로 비동기적으로 명령을 보낼 수 있었습니다. 명령을 보낼 때 내부적으로 Lock도 걸고 현재 스레드가 이벤트 루프 스레드인지 확인하고 작업 큐에 넣는 등 생각보다 여러 작업이 들어간 걸 알 수 있었습니다.