실제 토픽 1개를 구독해서 웹소켓과 레디스로 채팅이 주고받는 것까지 테스트를 완료했다. 하지만 redis 컨테이너와 MessageListenerAdapter 등 어떻게 메시지를 주고받는지 궁금해서 직접 알아봤다. 그리고 동적으로 채팅방을 생성해서 한개의 채팅방에는 여러 사람이, 한개의 사람은 여러 채팅방에 접속이 가능하므로 어떤 것을 활용할지 함께 고민하고 있다.

위 그림은 내가 직접 정리하면서 그려봤다.
실제 메시지가 레디스로 발행되면, 이 시점에 서버로 전달되고 지정된 토픽으로 게시된다.
이 때 서버는 RedisConnectionFactory를 통해 레디스와 연결을 생성한다.
서버 애플리케이션 내부에는 RedisMessageListenerContainer가 동작하며 특정 채널을 구독하고있다. 그래서 해당 채널에 메시지가 발행되면, 자동으로 수신하게 되어있다.
RedisMessageListenerContainer가 메시지를 수신하면 이를 MessageListenerAdapter로 전달해서 내부에 등록된 리스너를 호출한다. (이 때 리스너가 이해할 수 있는 형태로 변환된다.)
MessageListenerAdapter는 실제 메시지 처리를 담당하는 redisSubscriber를 호출해서 브로드 캐스트를 준비한다.
실제 채팅방에서 웹소켓 세션들을 찾아서 각 사용자에게 메시지를 전달한다.
실제 작성한 코드이다.
/**
* Redis pub/sub 채널 이름
* @return channelTopic ChannelTopic
*/
@Bean
public ChannelTopic channelTopic(){
return new ChannelTopic("SpinnerChatRoom");
}
/**
* redis 컨테이너
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(messageListenerAdapter(), channelTopic());
return container;
}
/**
* MessageListenerAdapter:
* RedisSubscriber(implements MessageListener)를 래핑
*/
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(redisSubscriber);
}
일반적으로 RedisMessageListenerContainer 빈으로 등록해서 애플리케이션 전반에서 채널에 대한 구독과 메시지 처리를 담당한다. 내부적으로 스레드 풀을 사용해 여러 메시지를 동시에 처리할 수 있어서 하나의 컨테이너로 관리가 가능하다.
RedisConnectionFactory는 하나의 싱글톤 빈으로 관리되어 여러사용자, 혹은 컴포넌트가 레디스와 통신할 때 공유되서 사용된다. 내부적으로 커넥션 풀링이나 여러 연결을 관리하여 동시요청 처리가 가능하다.
메시지를 보낼 때는 토픽(채널)에만 발행하기 때문에 보낸 사람은 식별리 안된다. 즉 해당 채널을 구독한 모두에게 메시지가 발행된다. 만약 식별하고 싶다면 로직을 추가하면 된다.
현재 진행중인 사이드프로젝트는 채팅방 생성, 실시간 읽음처리, 알림기능 등 여러가지 기능이 있어서 채팅방에 접속 상태도 실시간으로 체크가 가능해야한다. 또 웹소켓으로 채팅을 주고받다보니 동시에 읽기 기능도 있어야했다. 그래서 고민한게 ConcurrentHashMap과 CopyOnWriteArrayList이다.
ConcurrentHashMap은 스레드 세이프 맵 구현체 중에 하나로 여러 스레드가 동시에 접근해서 데이터를 읽거나 수정할 수 있는 상황에서 높은 동시성과 성능을 보장한다. 이 말은 내부적으로 락을 사용해 제어가 가능한데, 같은 버킷(인덱스) 내에서는 락이 걸려서 동시에 작업이 안되고 다른 버킷에서는 작업이 가능하다는 뜻이다. 또 가장 중요한건 대부분의 읽기 작업에서는 락을 걸지 않기 때문에 읽기 성능이 굉장히 빠르다.
CopyOnWriteArrayList는 주로 읽기가 빈번하고 쓰기가 적은 상황에서 사용되는데 리스트에 요소를 추가할 때는 내부를 복사해서 새로운 배열에 변경 내용을 적용하기 때문에 읽기 작업을 할 때에는 내부 요소가 변경되더라도 문제없이 수행이 가능하다. 또 쓰기 작업을 할 때마다 내부 배열이 복사로 수정하기 때문에 동기화 문제도 적다. 그래서 나는 사용자의 읽음처리와 알림 기능에 활용해보려고 한다...
예를 들어 채팅방에 접속해있다면 추가하고 아니면 제거하는 로직으로 할 계획이다.
물론 더 좋은 방법이 있다면 바꿀 예정이다. 끝.