기존에 SSE방식으로 전달하는 알림방식은, username에 해당하는 사람에게만 전송하는 방식으로 처리를 한다.
댓글을 작성하면 해당 구독자에게만 알림을 보낸다.
다만 둘이 양방향 소통은 아니고, publisher(Editor) -> subscriber(Writer), 혹은 그 반대로 삼아 전달한다.
뭔가 진행방식은 양방향인데, 양방향으로 계속 소켓을 열어두기에는 리소스 소모가 조금 크다고 보았다.
처음엔 리소스 절감과 일방향 소통이라고 생각해 sse 하나만으로 상관이없겠다고 생각했다.
하지만 Editor가 댓글을쓰면 Writer에게 알림이 가야하고, Writer가 답글을 쓰면 Editor에게 알림이 가야한다.
양방향 소통 방식의 그림은 띄지만, 실시간 양방향은 아니다.
따라서 둘이 소통할수있는 하나의 channel을 둬서, 아키텍처를 생성하려고한다.
Redis 설정은 이전장에서 설명을 했으므로, 이번엔 spring 쪽을 다루려고한다.
RedisConfig
package com.example.practice.redis.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Slf4j
@Configuration
@EnableRedisRepositories
public class RedisConfig { // spring <-> redis 연결설정
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int redisPort;
@Value("${spring.data.redis.password}")
private String redisPassword;
//lettuce
@Bean
public RedisConnectionFactory redisConnectionFactory(){
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(redisHost,redisPort);
config.setPassword(redisPassword);
return new LettuceConnectionFactory(config);
}
// redisTemplate: default Value Type = JSON
/* If you want to use String Type, you can change ValueSerializer
/ to StringRedisSerializer or Use StringRedisTemplate*/
@Bean
public RedisTemplate<?, ?> redisTemplate(){
RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory()); // connection
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key
// Java Object <-> JSON <-> String value
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
/*
* Redis pub/sub 메시지 처리 Listener
* */
@Bean
public RedisMessageListenerContainer redisMessageListener(){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
return container;
}
}
RedisConfig에는 연결관련 설정을 해준다.
RedisTemplate가 거의 핵심역할이라고 생각한다. 여기서 Redis의 <key, value> 형태를 java Object로 변환할수있게하고, 반대도 가능하게 해주는 설정을 가진다.
RedisController
package com.example.practice.redis.controller;
import com.example.practice.redis.dto.MessageDto;
import com.example.practice.redis.service.RedisPubService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
@RestController
@RequiredArgsConstructor
@Slf4j
@RequestMapping("/api/v1/redis/pubsub")
public class RedisPubSubController {
private final RedisPubService redisSubscribeService;
@PostMapping("/send")
public void sendMessage(@RequestParam(required = true) String channel,
@RequestBody MessageDto message) {
log.info("Redis Pub MSG Channel = {}", channel);
redisSubscribeService.pubMsgChannel(channel, message);
}
@PostMapping("/cancel")
public void cancelSubChannel(@RequestParam String channel) {
redisSubscribeService.cancelSubChannel(channel);
}
}
실험할 api다. /send api를 사용하면 해당 채널에 메시지를 전송할수있다.
MessageDto
package com.example.practice.redis.dto;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Getter
@NoArgsConstructor
public class MessageDto implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
private String sender;
private String roomId; // 타겟 channel
@Builder
public MessageDto(String message, String sender, String roomId) {
this.message = message;
this.sender = sender;
this.roomId = roomId;
}
}
package com.example.practice.redis.service;
import com.example.practice.redis.dto.MessageDto;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> template;
public RedisPublisher(RedisTemplate<String, Object> template) {
this.template = template;
}
/*
* 특정 채널에 메시지를 전송한다.
* */
/*
* Object Publish
* */
public void publish(ChannelTopic topic, MessageDto messageDto) {
template.convertAndSend(topic.getTopic(), messageDto);
}
/*
* String Publish
* */
public void publish(ChannelTopic topic, String data) {
template.convertAndSend(topic.getTopic(), data);
}
}
RedisTemplate를 사용해 메시지를 전송한다. 파라미터는 채널과 메세지이다.
RedisPubService
package com.example.practice.redis.service;
import com.example.practice.redis.dto.MessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class RedisPubService {
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisPublisher redisPublisher;
// 각 Channel 별 Listener
private final RedisSubscribeListener redisSubscribeListener;
/*
* Channel 별 Message 전송
* @Param
* */
public void pubMsgChannel(String channel, MessageDto messageDto) {
// 1. 요청한 Channel을 구독
redisMessageListenerContainer.addMessageListener(redisSubscribeListener,
new ChannelTopic(channel));
// 2. Message 전송
redisPublisher.publish(new ChannelTopic(channel), messageDto);
}
/*
* Channel 구독 취소
* @Param channel
* */
public void cancelSubChannel(String channel) {
redisMessageListenerContainer.removeMessageListener(redisSubscribeListener);
}
}
package com.example.practice.redis.service;
import com.example.practice.redis.dto.MessageDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class RedisSubscribeListener implements MessageListener {
private final RedisTemplate<String, Object> template;
private final ObjectMapper objectMapper;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String publishMessage = template
.getStringSerializer().deserialize(message.getBody());
MessageDto messageDto = objectMapper.readValue(publishMessage, MessageDto.class);
log.info("Redis Subscribe Channel : ", messageDto.getRoomId());
log.info("Redis SUB Message : {}", publishMessage);
// Return || Another Method Call (etc.sae to DB)
// TODO
/*
* 여기 알림이 들어갈 예정이다.
*
* */
} catch (JsonProcessingException e) {
log.error(e.getMessage());
}
}
}
redis를 일단 하나만 사용했는데, message가 많이 발생하는걸 대비해 clustering을 먼저 할 생각이다.
redis pub/sub는 데이터를 저장하지않는다. 하지만, 전체 알림확인을 위해 db에 데이터를 저장하긴해야하는데,
kafka를 사용해서 두개의 consumer를 만들생각이다. 하나는 db에 연결하고, 하나는 위와같은 알림 consumer.
좋은 코드 알려주시는 분들 항상 감사합니다.