Spring Boot에서 Redis에서는 요청을 어떻게 보낼까

박준수·2024년 9월 19일
0
post-thumbnail
post-custom-banner

ShowPot 프로젝트에서는 Redis Pub/Sub을 사용하였는데, Spring Data Redis와 Lettuce 의존성을 추가하여 구현하였습니다. 이때 Spring Boot 애플리케이션이 Redis로 어떻게 PUBLISH 명령을 보내는지 확인을 해보겠습니다.

Lettuce guide
Redis Pub/Sub

PUB/SUB 특징

  • Redis의 Pub/Sub는 최대 한 번만 메시지를 전송하는 방식을 사용합니다. 즉, Redis 서버에서 메시지를 전송한 후에는 다시 전송할 기회는 없습니다. 따라서 구독자가 메시지를 처리할 수 없는 경우(예: 오류 또는 네트워크 연결 끊김) 메시지는 영원히 손실되는 문제가 발생할 수 있습니다.
  • Pub/Sub는 키 공간과 관련이 없습니다. 데이터베이스 번호를 포함하여 어떤 수준에서도 간섭하지 않도록 만들어졌습니다.

즉, Pub/Sub 시스템이 Redis의 데이터 저장소와는 전혀 다른 영역에서 동작하며, Redis의 데이터베이스 구분(번호)과 관계없이 어디에서나 메시지를 주고받을 수 있다는 것을 의미합니다.

Pub

  • redis pub을 실행하면 config에서 설정한 RedisTemplate의 SingleThreadEventExecutor 클래스의 converAndSend 메서드를 통해 작동이 됩니다.

Spring Data Redis에서는 네이티브 Redis와 어떻게 상호작용하는지 알아보겠습니다.

  • excute 메서드는 Redis와의 작업을 간편하게 처리하기 위해, 템플릿 패턴을 사용하여 연결 관리, 콜백 실행 등의 복잡한 작업을 대신 처리해줍니다. RedisCallback을 사용하여 실제로 수행할 작업을 지정하고, exposeConnection을 통해 네이티브 Redis 연결을 노출할지 여부를 제어할 수 있습니다. true이면 연결 객체를 직접 노출하게 되고, false일 경우 프록시 객체를 사용하여 Spring이 연결을 관리하게 됩니다.

  • pipeline 변수는 redis 서버에 명령을 전송할 때 true일 경우 명령이 한 번에 묵어서 실행되고 일괄적으로 명령을 처리한 후 결과를 반환합니다. (주로 쓰기 전용 작업에서 사용) false일 경우 순차적으로 서버에 전송되고, 명령 실행 즉시 결과를 반환받는 설정입니다. (읽기 작업이나 결과 검증이 필요한 작업에서 사용)
  • T result = action.doInRedis(connToExpose)에서 실제로 Redis 작업이 수행됩니다. 이때 콜백(RedisCallback)은 작업을 수행하고, 결과를 반환합니다. 즉 이 부분에서 우리는 publish 메서드가 실행이 되는 것입니다.
  • 이후 RedisConnectionUtils.releaseConnection(conn, factory)를 통해 Redis 연결을 반환하거나 닫으면서 메서드가 종료됩니다.

  • Lettuce는 Netty (비동기 이벤트 기반 고성능 네트워크 프레임워크) 기반의 Redis 클라이언트입니다. 오직 동기식 연결만 사용한다면 Jedis를 사용하는 것이 나을 수 있습니다.

  • 따라서 connection은 LettuceConnection이 됩니다.

  • Jedis보단 Lettuce를 사용하자
    Lettuce Github
    Netty Github
    Netty Wiki

이제 publish 메서드에 대해 알아보겠습니다.

  • LettuceConnection에서 publish 메서드 입니다. channel은 메시지를 보낼 Reids 채널의 이름, message는 전송할 메시지의 내용입니다. 반환값은 해당 채널을 구독하고 있는 클라이언트 수를 반환합니다. (물론 구독하고 있는 클라이언트의 수 이지, 구독자가 메시지를 받았는지는 확인할 수 없는 것입니다.)

  • invoke() 메서드는 Spring Data Redis에서 Lettuce 클라이언트를 통해 Redis에 비동기적으로 연결하고 명령을 실행하기 위한 비동기 연결 객체를 반환합니다. 따라서 비동기 방식으로 Redis 명령을 보내고 그 결과를 기다리는 동안 애플리케이션이 다른 작업을 계속 수행할 수 있게 해줍니다.

  • getAsyncConnection() 메서드에서는 현재 단일 Redis 인스턴스에 연결되어 있기 때문에, statefulConnection.async()를 호출하여 비동기 명령 실행 인터페이스를 반환합니다.

  • doInvoke() 메서드는 현재 연결의 상태(예: 트랜잭션 중인지, 파이프라인 중인지)에 따라 적절한 비동기 연결을 제공합니다. 현재는 일반 비동기 처리가 됩니다.

  • just 메서드에서 synchronizer는 비동기 호출을 조정하고 동기화하는 역할을 하고, BaseRedusAsyncCommands의 publish 메서드를 함수형 인터페이스로 호출하게 됩니다.

AbstractRedisAsyncCommands 구현체에서 commandBuilder.publish()channelmessage를 인수로 받아 RedisCommand 객체를 반환합니다.

  • RedisCommand 객체를 비동기적으로 처리할 수 있도록 래핑을 하고 dispatch 명령으로 Redis 서버로 보내게 됩니다.

  • preProcessCommand 메서드는 주어진 명령을 전처리합니다. 이를 통해 특정 명령(SELECT, READONLY 등)에 맞는 후처리 작업을 준비합니다. 하지만 PUBLISH 명령어는 해당 메서드에 조건분기가 없으므로 후처리 작업 없이 RedisCommand 객체를 반환합니다.

  • potentiallyEnableMulti 메서드는 트랜잭션 명령(MULTI)이 감지되면 트랜잭션 모드를 활성화합니다. 그러나 Redis Config, PUBLISH 메서드는 트랜잭션 명령 설정을 하지 않았기에 넘어갑니다.

  • 상위 클래스인 RedisChannelHandler의 dispatch 메서드를 실행합니다.


  • DefaultEndPoint의 write 메서드가 오버라이딩 됩니다. validateWrite 메서드를 통해 쓰기 작업이 가능한지 검사합니다. 기존 큐 사이즈에 현재 명령의 수를 더한 값이 요청한 큐의 사이즈보다 클 때 예외가 발생하고, 연결이 안된 상태에서 버커 크기, 연결된 상태에서 버퍼 크기 등을 검사해서 작업이 가능한지 확인합니다.
  • 이후 try catch 문에서 공유 락의 incrementWriters() 메서드를 통해 쓰기 작업이 시작되기 전에 쓰기 작업에 대한 락을 설정하고 쓰기 카운터를 증가시킵니다. 이를 통해 동시성 문제를 방지하며, 다중 스레드 환경에서 안정적으로 명령을 처리할 수 있게 합니다.

  • incrementWriters() 메서드에서는 현재 스레드가 베타적 락을 소유하고 있는지 확인하고 없으면 락을 획득합니다. 이때 JAVA의 ReentrantLock을 가지고 lock을 겁니다. 무한 루프(for (;;))는 쓰기 카운터가 음수일 경우(다른 스레드가 베타적 락을 소유하고 있는 경우), 계속해서 공유 락을 얻기 위해 기다리게 만듭니다. 공유 락을 얻은 후 ReentrantLock을 해제합니다. 이렇게 여러 쓰레드가 동시에 공유 락을 획득하는 경우에도 동시성 문제 없이 얻을 수 있습니다.
if (autoFlushCommands) {
    if (isConnected()) {
        writeToChannelAndFlush(command);
    } else {
        writeToDisconnectedBuffer(command);
    }
} else {
    writeToBuffer(command);
}
  • 이 부분에서 명령이 자동으로 플러시 될 수 있는지 확인합니다. 연결된 상태라면 명령을 채널로 바로 전송하고, 플러시를 수행합니다. 연결이 끊긴 상태라면, 명령을 임시 버퍼에 넣고, 자동 플러시가 비활성화 되는 경우에는 플러시 없이 버퍼에 저장합니다.

  • NioSocketChannel의 isActivate 메서드에서 소켓 채널이 열려 있는지 확인하고, 소켓 채널과 연결 상태인지 확인을 합니다. javaChannel() 메서드는java.nio.channels.SocketChannel 객체를 반환합니다. (Java의 NIO (Non-blocking I/O)에서 사용하는 기본 네트워크 통신 채널)
  • SocketChannel (Java Platform SE 8 )

  • 현재 상황에서는 Redis와 연결이 잘 되어 있기에, writeToChannelAndFlush() 메서드를 실행하게 됩니다. QUEUE_SIZE.incrementAndGet(this) 에서 전송 대기열의 크기를 증가시킵니다. 이 명령이 전송되기 위해 큐에 들어갔다는 것을 기록합니다. channelWriteAndFlush(command): Redis 명령을 네트워크 채널로 전송하고 즉시 플러시합니다. ChannelFuture는 아직 발생하지 않은 I/O 작업을 나타냅니다. 이는 모든 작업이 Netty에서 비동기식이므로 요청된 작업이 아직 수행되지 않았을 수 있음을 의미합니다.
  • 비동기로 Redis 명령을 전송한 후, 그 명령이 완료되면 알림을 받기 위한 용도와 네트워크 신뢰성을 보장하기 위해 리스너를 추가합니다. (Pub/Sub은 자체적으로 메시지 재전송하거나 보장하는 메커니즘이 없기에, 단순히 네트워크 통신에서의 신뢰성 설정으로 보입니다.)
  • Netty.docs: User guide for 4.x

AbstractChannel(io.lettuce.core.protocol) → DeafaultChannelPipeLine(io.netty.channel) → AbstractChannelHandlerContext(io.netty.channel) 순으로 writeFlush 메서드가 실행이 됩니다. newPromise() 메서드는 이 메시지 전송 작업의 성공 또는 실패 여부를 추적할 수 있는 DefaultChannelPromise 객체를 반환하는데, 이는 Netty의 기본 Promise 구현체입니다.

  • promise가 유효한지 확인 후 채널 파이프라인에서 적절한 핸들러 컨텍스트를 찾습니다.

  • 메시지와 컨텍스트를 연결하고, next 컨텍스트의 이벤트 루프를 가져옵니다. EventExecutor는 Netty에서 비동기 작업을 실행하는 객체입니다. 현재 스레드가 이벤트 루프에 포함되어 있지 않아 safeExecute(executor, task, promise, m, !flush)작업을 이벤트 루프에 제출합니다. 실패할 경우 task.cancel()을 호출하여 작업을 취소하고 대기 중인 바이트를 감소시킵니다.


  • 따라서 SingleThreadEventExcutor의 excute 메서드가 실행됩니다. 주어진 작업을 적절한 큐에 추가하고, 현재 스레드가 이벤트 루프 스레드가 아니기 때문에, 새로운 스레드를 시작하고, 이벤트 루프를 시작합니다. 이후 이벤트 루프를 깨워 작업을 실행하도록 합니다.

  • 이렇게 메서드가 실행이 되고 다시 execute 메서드로 돌아오게 됩니다. 다음과 같이 log를 찍히게 하였습니다.

결론

Spring Boot에서 Redis를 사용할 때 Spring Data Redis와 Lettuce 의존성을 추가하여 Redis Pub/Sub에서 Publish 메서드가 어떻게 Redis로 요청이 가는지 확인할 수 있었습니다. Lettuce는 Netty 기반의 Redis 클라이언트로 비동기적으로 명령을 보낼 수 있었습니다. 명령을 보낼 때 내부적으로 Lock도 걸고 현재 스레드가 이벤트 루프 스레드인지 확인하고 작업 큐에 넣는 등 생각보다 여러 작업이 들어간 걸 알 수 있었습니다.

profile
방구석개발자
post-custom-banner

0개의 댓글