
위치기반 랜덤채팅 프로젝트인 Rand_Chat Project를 진행하면서 해결했던 사항이다.
Websocket은 양방향 통신 , Sse는 Server-Sent-Event로 서버에서 - > 클라이언트로 흐르는 단방향 통신이다. 
다중인스턴스로 로드밸런싱되고 있는 형태이다. 사람들을 매칭시켜주는 매칭서버가 여러 인스턴스 , 여러 채팅서버 , 회원관리 서버 등 다중서버 환경 및 어느정도 MSA 방식으로 설계되어있다. (완벽한 MSA는 아니다.)예를 들어
사용자1이매칭을 위해매칭대기열을 요청하고 ,매칭 완료 수신을 받기위한Sse 연결을 하였다고 가정하였을 때Sse 연결은매칭서버1이 되었고 ,매칭알람을 보내는 순간어느 인스턴스에서Sse 커넥션이 일어났는지 모른다 . 마찬가지로웹소켓도양방향 통신을 하기위해특정 인스턴스와연결확립을 하였을 텐데 , 어느 인스턴스와 통신을 해야할지 모르는 상황이다.
Redis의 Pub/Sub을 통해 해결 할 수 있었다. Pub/Sub을 Message Brocker로 이용하고 연결이 확립된 인스턴스를 특정하여 다중서버환경에서도 웹소켓이나 Sse 통신이 가능토록 했다. 
출처(https://seungpnag.tistory.com/9)
Pub/Sub 모델에서 모든 인스턴스가 SubScribe을 하고 , 해당 서버 , 채널 정보 등을 Pub/Sub 모델에 저장한 뒤 , Publish가 일어났을 때 Pub/Sub모델에 저장된 정보를 확인하여 특정 인스턴스에 메시지를 전달하는 방식이다. @Bean
public RedisMessageListenerContainer redisContainer(
RedisConnectionFactory connectionFactory, SubsCriber subscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 매칭 채널
container.addMessageListener(subscriber, new PatternTopic(PubSubChannel.MATCHING_CHANNEL.toString()));
container.addMessageListener(subscriber, new PatternTopic(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString()));
return container;
}
SubScribe 하고있다.2. 각 인스턴스별로 Sse커넥션을 관리하는 CuncurrentHashMap과 연결에 관한 정보제공, 등록,제거를 할 수 있는 SseConnectionRegistry 클래스를 만들어 주었다.
public class SseConnectionRegistry {
// 클라이언트 ID (userId)와 SSE 연결을 관리하는 맵 CuncerrnetHashMap 동시성 제어 유리
private static final ConcurrentHashMap<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
// 클라이언트의 SSE 연결을 등록
public static void register(String userId,String channel, SseEmitter emitter) {
sseEmitters.put(userId+":"+channel, emitter);
}
// 클라이언트의 SSE 연결을 조회
public static SseEmitter getEmitter(String userId,String channel) {
return sseEmitters.get(userId+":"+channel);
}
// 클라이언트의 SSE 연결을 제거
public static void removeEmitter(String userId,String channel) {
sseEmitters.remove(userId+":"+channel);
}
// 모든 SSE 연결을 가져옴
public static ConcurrentHashMap<String, SseEmitter> getAllEmitters() {
return sseEmitters;
}
}
3. 클라이언트가 Publish했을 때 즉 , Service단에서 Publish 하여 이벤트를 발생 , 구독자에게 메시지를 전파해주는 Publisher 클래스를 만들었다.
@Service
@RequiredArgsConstructor
public class Publisher {
private final StringRedisTemplate redisTemplate;
//매칭결과
public void sendNotification(String userId, String nickname,String profileImg,String sex,String type,String distance,String channel) {
String payload = "";
//매칭성공 시
if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString())){
if(sex.equals(MembersSex.MAN.toString())){
sex = "남자";
}
else{
sex = "여자";
}
}
// 매칭 성공 or 실패 시
if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString()) || type.equals(SSETYPE.MATCHINGTIMEOUT.toString())){
payload = String.format("{\"userId\":\"%s\",\"nickname\":\"%s\",\"profileImg\":\"%s\",\"sex\":\"%s\",\"type\":\"%s\",\"distance\":\"%s\",\"channel\":\"%s\"}", userId, nickname,profileImg,sex,type,distance,channel);
redisTemplate.convertAndSend(PubSubChannel.MATCHING_CHANNEL.toString(), payload);
}
}
redisTemplate.convertAndSend(PubSubChannel.MATCHING_CHANNEL.toString(), payload);
이 메소드는 첫번째 인자를 구독하고 있는 인스턴스에게 payload와 함께 메시지를 전파하게 된다. 따라서 예상결과는 매칭서버1 , 매칭서버2 등 모든 서버에게 전파가 갈 것이다.
4. 실질적으로 Sse 커넥션 객체를 생성하고 , 메시지를 받은 서버에서 Sse커넥션이 있는 위치를 판별하기 위한 Sse 연결정보를 Redis에 저장하는 역할을 할 SseNotificationService를 만들어 주었다.
@Service
@Slf4j
public class SseNotificationService implements NotificationService {
private final StringRedisTemplate redisTemplate;
public SseNotificationService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
// 특정 회원의 SSE 연결을 Redis에 저장
public void registerConnection(String userId,String channel, String serverInstanceId,String key) {
redisTemplate.opsForValue().set(key + userId+":"+channel, serverInstanceId);
}
// 특정 회원의 SSE 연결 정보 조회
public String getServerInstanceForUser(String userId,String channel,String key) {
return redisTemplate.opsForValue().get(key + userId+":"+channel);
}
// 연결 해제 시 삭제
public void removeConnection(String userId,String channel,String key) {
redisTemplate.delete(key + userId+":"+channel);
}
public SseEmitter connect(String userId,String channel,String key) {
SseEmitter emitter = new SseEmitter(180 * 1000L); // 90초 타임아웃
SseConnectionRegistry.register(userId,channel, emitter);
registerConnection(userId, channel,getCurrentServerInstanceId(),key);
emitter.onCompletion(() -> cleanup(userId,channel,key));
emitter.onTimeout(() -> cleanup(userId,channel,key));
emitter.onError(e -> cleanup(userId,channel,key));
return emitter;
}
private void cleanup(String userId,String channel,String key) {
SseConnectionRegistry.removeEmitter(userId,channel);
removeConnection(userId,channel,key);
}
private String getCurrentServerInstanceId() {
return System.getenv("INSTANCE_ID"); // 서버 인스턴스 ID 반환
}
}
getCurrentServerInstanceId를 통해 현재 인스턴스의 서버ID를 반환받고 , 서버ID와 UserId, Channel명을 조합해 레디스에 저장하고 1번에서 만든 클래스의 SseConnectionRegistry.register를 통해 현재 인스턴스 CuncurrentHashMap에 Sse커넥션 객체를 담는다.
5. Publish되었을때 여러 인스턴스 중 요청자의 Sse커넥션이 담긴 서버를 찾는 로직이 필요하며 , 해당 Sse객체를 통해 요청자에게 전송할 로직이 필요하다. 이를 위해 SubScriber 클래스를 만들었다.
@Slf4j
@Component
public class SubsCriber implements MessageListener {
private final NotificationService connectionService;
public SubsCriber(NotificationService connectionService) {
this.connectionService = connectionService;
}
// Redis 메시지 수신 처리
@Override
public void onMessage(Message message, byte[] pattern) {
try{
String payload = new String(message.getBody());
Map<String, String> data = null;
data = parsePayload(payload);
//필수 값
String userId = data.get("userId");
String channel = data.get("channel");
String serverInstanceId = "";
// 현재 서버에 연결된 클라이언트라면 메시지 전송
if(channel.equals(PubSubChannel.MATCHING_CHANNEL.toString())){
//매칭 채널
serverInstanceId = connectionService.getServerInstanceForUser(userId,PubSubChannel.MATCHING_CHANNEL.toString(), RedisKey.SSE_MATCHING_CONNECTION_KEY);
}
else if (channel.equals(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString())){
serverInstanceId = connectionService.getServerInstanceForUser(userId,PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString(), RedisKey.SSE_MATCHING_ACCEPT_CONNECTION_KEY);
}
log.info("serverTest={}",serverInstanceId);
if (isCurrentInstance(serverInstanceId)) {
//분기
if(channel.equals(PubSubChannel.MATCHING_CHANNEL.toString())){
log.info("t1");
//매칭채널
String nickname = data.get("nickname");
String profileImg = data.get("profileImg");
String sex = data.get("sex");
String type= data.get("type");
String distance = data.get("distance");
matchingResultSendToClient(userId, nickname,profileImg,sex,type,distance);
}else if (channel.equals(PubSubChannel.MATCHING_ACCEPT_CHANNEL.toString())){
log.info("t2");
//매칭 수락 채널
matchingAcceptSendToClient(userId);
}
}
}
catch (Exception e){
throw new RuntimeException(e);
}
}
private Map<String, String> parsePayload(String payload) throws JsonProcessingException {
// JSON 문자열을 Map으로 변환 (간단한 파서 사용)
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(payload, new TypeReference<>() {});
}
private boolean isCurrentInstance(String serverInstanceId) {
// 현재 서버 인스턴스와 비교 (로드밸런싱 환경에서 인스턴스 ID 비교)
return getCurrentInstanceId().equals(serverInstanceId);
}
//매칭 결과 전송
private void matchingResultSendToClient(String userId, String nickname,String profileImg,String sex,String type,String distance) {
// SSE 연결된 클라이언트에게 메시지 전송
SseEmitter emitter = SseConnectionRegistry.getEmitter(userId,PubSubChannel.MATCHING_CHANNEL.toString());
if (emitter != null) {
try {
if(type.equals(SSETYPE.MATCHINGCOMPLETE.toString())){
ResMatchResultDTO resMatchResultDTO = new ResMatchResultDTO();
resMatchResultDTO.setNickname(nickname);
resMatchResultDTO.setProfileImg(profileImg);
resMatchResultDTO.setSex(sex);
resMatchResultDTO.setDistance(distance);
ResponseDTO<ResMatchResultDTO> responseDTO = new ResponseDTO(resMatchResultDTO);
emitter.send(SseEmitter.event().name(PubSubChannel.MATCHING_CHANNEL.toString()).data(responseDTO));
log.info("test result= {}",responseDTO);
log.info("emmiter={}",emitter);
log.info("emmiter str ={}",emitter.toString());
log.info("emmiter id={}",userId);
}
else if(type.equals(SSETYPE.MATCHINGTIMEOUT.toString())){
ResponseErr responseErr = new ResponseErr(ErrorCode.COMMON_SSE_MATCH_1MIN_TIME_OUT);
emitter.send(SseEmitter.event().name(PubSubChannel.MATCHING_CHANNEL.toString()).data(responseErr));
log.info("test result= {}",responseErr);
log.info("emmiter={}",emitter);
log.info("emmiter id={}",userId);
}
} catch (Exception e) {
// 전송 실패 시 처리
SseConnectionRegistry.removeEmitter(userId,PubSubChannel.MATCHING_CHANNEL.toString());
log.info("test result= fail");
}
}
}
private String getCurrentInstanceId() {
// 서버 인스턴스 ID 반환 (필요시 환경변수 또는 설정값 사용)
log.info("serverId={}",System.getenv("INSTANCE_ID"));
return System.getenv("INSTANCE_ID");
}
}
getCurrentInstanceId로 현재 인스턴스의 ID를 반환받고 4번에서 저장한 Redis에서 UserId와 Channel에 해당하는 서버 인스턴스를 반환받는다. 이둘을 비교하여 , 같으면 로직을 수행한다. 예를 들어
서버1,서버2,서버3모두가OnMessage메소드를 실행하게 될 것이고(SubScribe중)userId가 1번,채널명이 매칭채널이라고 하면 ,매칭채널:1에 해당하는서버인스턴스를Redis에서 가져온다.
그 값이서버3이라고 하면 ,서버1,서버2는Sse커넥션을 가져오는 로직은 수행 하지 않을것이고,서버3만이 수행한다.
6. Sse커넥션을 생성 요청하는 컨트롤러 및 Publish할 서비스를 만든다 . (상세로직 생략)
@GetMapping
public SseEmitter matchingConnect(){
String usrId=String.valueOf(SecurityContextGet.getUsrId());
return sseNotificationService.connect(usrId,PubSubChannel.MATCHING_CHANNEL.toString(), RedisKey.SSE_MATCHING_CONNECTION_KEY);
}
publisher.sendNotification(firstUserId,secondeMemberInfo.getNickName(),secondeMemberInfo.getProfileImg(),secondeMemberInfo.getSex().toString(),
SSETYPE.MATCHINGCOMPLETE.toString(),strDistance, PubSubChannel.MATCHING_CHANNEL.toString());
publisher.sendNotification(secondUserId,firstMemberInfo.getNickName(),firstMemberInfo.getProfileImg(),firstMemberInfo.getSex().toString(),
SSETYPE.MATCHINGCOMPLETE.toString(),strDistance, PubSubChannel.MATCHING_CHANNEL.toString());
브로드캐스트 기반 메시징
Redis Pub/Sub은 발행자-구독자(Publish/Subscribe) 패턴을 사용하여, 발행된 메시지를 특정 채널에 구독 중인 모든 클라이언트에게 전달
메시지가 발행되면 해당 채널을 구독 중인 모든 리스너(구독자)가 메시지를 동시에 수신
비동기 통신
메시지 발행과 수신은 비동기로 처리
발행자는 메시지 전송 후 즉시 반환하며, 구독자는 메시지를 수신하는 순간 처리
구독 중심 모델
구독자는 하나 이상의 특정 채널을 구독하거나, 패턴 기반으로 여러 채널을 구독할 수 있음.
채널 패턴 예시:
특정 채널: channel1
패턴: channel* (모든 channel로 시작하는 채널 구독)
중앙 서버 역할
Redis는 Pub/Sub 시스템에서 중앙 서버 역할을 하며, 발행자와 구독자 사이의 메시지 전달을 담당
발행자는 Redis 서버로 메시지를 전송하고, Redis 서버는 이를 구독자에게 중계
실시간 메시징
Pub/Sub은 주로 실시간 메시징 시나리오에 적합
메시지 저장 X
Pub/Sub는 메시지를 저장하지 않음.
메시지가 발행될 때 구독자가 연결되어 있지 않다면, 해당 메시지는 손실.
RabbitMQ 또는 Kafka와 같은 메시지 큐 시스템과 달리 메시지 보존 기능이 없음.
구독자는 지속 연결 필요
구독자는 Redis 서버와 지속적으로 연결되어 있어야 메시지를 수신할 수 있음.
연결이 끊기면 구독이 취소되며, 끊긴 동안 발행된 메시지는 받을 수 없음.