GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
이벤트의 미디어 타입은 text/event-stream이 표준으로 정해져있습니다. 이벤트는 캐싱하지 않으며 지속적 연결을 사용해야합니다(HTTP 1.1에서는 기본적으로 지속 연결을 사용합니다).
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
@Slf4j
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class AlarmController {
private final AlarmService alarmService;
@PreAuthorize("hasAuthority('alarm.read')")
@Operation(summary = "알람 sse 구독", description = "알람 sse 구독, sseEmitter객체 반환.")
@GetMapping(value = "/alarm/subscribe", produces = "text/event-stream")
public SseEmitter alarmSubscribe(
@ApiParam(hidden = true) @AuthenticationPrincipal UserDetails user,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
HttpServletResponse response) {
//nginx리버스 프록시에서 버퍼링 기능으로 인한 오동작 방지
response.setHeader("X-Accel-Buffering", "no");
LocalDateTime now = CustomTimeUtils.nowWithoutNano();
return alarmService.subscribe(user.getUsername(), lastEventId, now);
}
}
@Slf4j
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Service
public class AlarmService {
@Value("${sse.timeout}")
private String sseTimeout;
private static final String UNDER_SCORE = "_";
private static final String CONNECTED = "CONNECTED";
private final AlarmRepository alarmRepository;
private final UserRepository userRepository;
private final SSEInMemoryRepository sseRepository;
private final RedisTemplate<String, String> redisTemplate;
public SseEmitter subscribe(String username, String lastEventId, LocalDateTime now) {
Long userId = getUserByUsernameOrException(username).getId();
SseEmitter sse = new SseEmitter(Long.parseLong(sseTimeout));
String key = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
now).toCompleteKeyWhichSpecifyOnlyOneValue();
sse.onCompletion(() -> {
log.info("onCompletion callback");
sseRepository.remove(key);
});
sse.onTimeout(() -> {
log.info("onTimeout callback");
//만료 시 Repository에서 삭제 되어야함.
sse.complete();
});
sseRepository.put(key, sse);
try {
sse.send(SseEmitter.event()
.name(CONNECTED)
.id(getEventId(userId, now, SseEventName.ALARM_LIST))
.data("subscribe"));
} catch (IOException exception) {
sseRepository.remove(key);
log.info("SSE Exception: {}", exception.getMessage());
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
// 중간에 연결이 끊겨서 다시 연결할 때, lastEventId를 통해 기존의 받지못한 이벤트를 받을 수 있도록 할 수 있음.
// 한번의 알림이나 새로고침을 받으면 알림을 slice로 가져오기 때문에
// 수신 못한 응답을 다시 보내는 로직을 구현하지 않음.
return sse;
}
/**
* 특정 유저의 특정 sse 이벤트에 대한 id를 생성한다.
* 위 조건으로 여러개 정의 될 경우 now 로 구분한다.
* @param userId
* @param now
* @param eventName
* @return
*/
private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
}
/**
* redis pub시 userId와 sseEventName을 합쳐서 보낸다.
* @param userId
* @param sseEventName
* @return
*/
private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
return userId + UNDER_SCORE + sseEventName.getValue();
}
private User getUserByUsernameOrException(String username) {
return userRepository.findByUsername(username)
.orElseThrow(() -> new NoEntityException(
ErrorCode.ENTITY_NOT_FOUND));
}
}
@Slf4j
@Component
public class SSEInMemoryRepository implements SSERepository{
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@Override
public void put(String key, SseEmitter sseEmitter) {
sseEmitterMap.put(key, sseEmitter);
}
@Override
public Optional<SseEmitter> get(String key) {
return Optional.ofNullable(sseEmitterMap.get(key));
}
@Override
public List<SseEmitter> getListByKeyPrefix(String keyPrefix){
return sseEmitterMap.keySet().stream()
.filter(key -> key.startsWith(keyPrefix))
.map(sseEmitterMap::get)
.collect(Collectors.toList());
}
@Override
public List<String> getKeyListByKeyPrefix(String keyPrefix){
return sseEmitterMap.keySet().stream()
.filter(key -> key.startsWith(keyPrefix))
.collect(Collectors.toList());
}
@Override
public void remove(String key) {
sseEmitterMap.remove(key);
}
}
@RequiredArgsConstructor
@EqualsAndHashCode
public class SseRepositoryKeyRule {
private static final String UNDER_SCORE = "_";
private final Long userId;
private final SseEventName sseEventName;
private final LocalDateTime createdAt;
/**
* SSEInMemoryRepository에서 사용될
* 특정 user에 대한 특정 브라우저,특정 SSEEventName에
* 대한 SSEEmitter를 찾기 위한 key를 생성한다.
* @return
*/
public String toCompleteKeyWhichSpecifyOnlyOneValue() {
String createdAtString = createdAt == null ? "" : createdAt.toString();
return userId + UNDER_SCORE + sseEventName.getValue() + UNDER_SCORE + createdAtString;
}
}
SseEmitter emitter = sseRepository.get(key).get();
try {
emitter.send(SseEmitter.event()
.id(getEventId(userId, now, eventName))
.name(eventName.getValue())
.data(eventName.getValue()));
} catch (IOException e) {
sseRepository.remove(key);
log.error("SSE send error", e);
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
const sse = new EventSource("https://{domain}/connect");
sse.addEventListener('connect', (e) => {
const { data: receivedConnectData } = e;
console.log('connect event data: ',receivedConnectData); // "connected!"
});
위에서 언급했듯이 처음에 SSE 응답을 할 때 아무런 이벤트도 보내지 않으면 재연결 요청을 보낼때나, 아니면 연결 요청 자체에서 오류가 발생합니다.
따라서 첫 SSE 응답을 보낼 시에는 반드시 더미 데이터라도 넣어서 데이터를 전달해야합니다.
SSE 연결 요청을 할 때 헤더에 JWT를 담아서 보내줘야했는데, EventSource 인터페이스는 기본적으로 헤더 전달을 지원하지 않는 문제가 있었습니다. event-source-polyfill 을 사용하면 헤더를 함께 보낼 수 있습니다.
proxy_set_header Connection '';
proxy_http_version 1.1;
클러스터에서 Publish하면 클론을 포함한 모든 노드에게 보냅니다. 따라서 마스터에서 publish한 메시지를 클론(슬레이브)에서 subscribe할 수 있습니다. 마스터 뿐만 아니라 클론에서도 publish할 수 있습니다. 클론에서 publish한 메시지를 다른 클론에서 subscribe할 수 있습니다.
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisMessageSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {}
}
@Slf4j
@Configuration
@RequiredArgsConstructor
public class LettuceConnectionConfig {
@Value("${spring.redis.cluster.nodes}")
private String clusterNodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.password}")
private String password;
private final SSERepository sseRepository;
// private final EntityManagerFactory entityManagerFactory;
// private final DataSource dataSource;
/*
* Class <=> Json간 변환을 담당한다.
*
* json => object 변환시 readValue(File file, T.class) => json File을 읽어 T 클래스로 변환 readValue(Url url,
* T.class) => url로 접속하여 데이터를 읽어와 T 클래스로 변환 readValue(String string, T.class) => string형식의
* json데이터를 T 클래스로 변환
*
* object => json 변환시 writeValue(File file, T object) => object를 json file로 변환하여 저장
* writeValueAsBytes(T object) => byte[] 형태로 object를 저장 writeValueAsString(T object) => string 형태로
* object를 json형태로 저장
*
* json을 포매팅(개행 및 정렬) writerWithDefaultPrettyPrint().writeValueAs... 를 사용하면 json파일이 포맷팅하여 저장된다.
* object mapper로 date값 변환시 timestamp형식이 아니라 JavaTimeModule() 로 변환하여 저장한다.
*/
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.registerModules(new JavaTimeModule(), new Jdk8Module());
return mapper;
}
/**
* RedisStaticMasterReplicaConfiguration를 사용할 경우 pub/sub사용 불가
* @return
*/
@Bean
public RedisClusterConfiguration redisClusterConfiguration() {
List<String> clusterNodeList = Arrays.stream(StringUtils.split(clusterNodes, ','))
.map(String::trim)
.collect(Collectors.toList());
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(clusterNodeList);
redisClusterConfiguration.setMaxRedirects(maxRedirects);
redisClusterConfiguration.setPassword(password);
return redisClusterConfiguration;
}
/*
* Redis Connection Factory library별 특징
* 1. Jedis - 멀티쓰레드환경에서 쓰레드 안전을 보장하지 않는다.
* - Connection pool을 사용하여 성능, 안정성 개선이 가능하지만 Lettuce보다 상대적으로 하드웨어적인 자원이 많이 필요하다.
* - 비동기 기능을 제공하지 않는다.
*
* 2. Lettuce - Netty 기반 redis client library
* - 비동기로 요청하기 때문에 Jedis에 비해 높은 성능을 가지고 있다.
* - TPS, 자원사용량 모두 Jedis에 비해 우수한 성능을 보인다는 테스트 사례가 있다.
*
* Jedis와 Lettuce의 성능 비교 https://jojoldu.tistory.com/418
*/
@Bean
public RedisConnectionFactory redisConnectionFactory(
final RedisClusterConfiguration redisClusterConfiguration) {
final SocketOptions socketOptions =
SocketOptions.builder().connectTimeout(Duration.of(10, ChronoUnit.MINUTES)).build();
final var clientOptions =
ClientOptions.builder().socketOptions(socketOptions).autoReconnect(true).build();
var clientConfig =
LettuceClientConfiguration.builder()
.clientOptions(clientOptions)
.readFrom(REPLICA_PREFERRED);
// if (useSSL) {
// // aws elasticcache uses in-transit encryption therefore no need for providing certificates
// clientConfig = clientConfig.useSsl().disablePeerVerification().and();
// }
return new LettuceConnectionFactory(
redisClusterConfiguration, clientConfig.build());
}
// @Bean // 만약 PlatformTransactionManager 등록이 안되어 있다면 해야함, 되어있다면 할 필요 없음
// public PlatformTransactionManager transactionManager() throws SQLException {
// // 사용하고 있는 datasource 관련 내용, 아래는 JDBC
//// return new DataSourceTransactionManager(dataSource);
//
// // JPA 사용하고 있다면 아래처럼 사용하고 있음
// return new JpaTransactionManager(entityManagerFactory);
// }
@Bean
public RedisTemplate<String, Object> redisTemplate(ObjectMapper objectMapper) {
GenericJackson2JsonRedisSerializer serializer =
new GenericJackson2JsonRedisSerializer(objectMapper);
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
// json 형식으로 데이터를 받을 때
// 값이 깨지지 않도록 직렬화한다.
// 저장할 클래스가 여러개일 경우 범용 JacksonSerializer인 GenericJackson2JsonRedisSerializer를 이용한다
// 참고 https://somoly.tistory.com/134
// setKeySerializer, setValueSerializer 설정해주는 이유는 RedisTemplate를 사용할 때 Spring - Redis 간 데이터 직렬화, 역직렬화 시 사용하는 방식이 Jdk 직렬화 방식이기 때문입니다.
// 동작에는 문제가 없지만 redis-cli을 통해 직접 데이터를 보려고 할 때 알아볼 수 없는 형태로 출력되기 때문에 적용한 설정입니다.
// 참고 https://wildeveloperetrain.tistory.com/32
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(serializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(serializer);
redisTemplate.setEnableTransactionSupport(true); // transaction 허용
return redisTemplate;
}
@Bean
ChannelTopic topic() {
return new ChannelTopic(SseEventName.ALARM_LIST.getValue());
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(redisMessageSubscriber);
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
container.addMessageListener(messageListener(), topic());
log.info("PubSubConfig init");
return container;
}
}
@Transactional
public void send(Long alarmReceiverId, AlarmType alarmType, AlarmArgs alarmArgs, SseEventName sseEventName) {
redisTemplate.convertAndSend(sseEventName.getValue(),
getRedisPubMessage(alarmReceiverId, sseEventName));
}
private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
return userId + UNDER_SCORE + sseEventName.getValue();
}
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisMessageSubscriber implements MessageListener {
private static final String UNDER_SCORE = "_";
private final SSERepository sseRepository;
/**
* 여러 서버에서 SSE를 구현하기 위한 Redis Pub/Sub
* subscribe해두었던 topic에 publish가 일어나면 메서드가 호출된다.
*/
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("Redis Pub/Sub message received: {}", message.toString());
String[] strings = message.toString().split(UNDER_SCORE);
Long userId = Long.parseLong(strings[0]);
SseEventName eventName = SseEventName.getEnumFromValue(strings[1]);
String keyPrefix = new SseRepositoryKeyRule(userId, eventName,
null).toCompleteKeyWhichSpecifyOnlyOneValue();
LocalDateTime now = CustomTimeUtils.nowWithoutNano();
sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key -> {
SseEmitter emitter = sseRepository.get(key).get();
try {
emitter.send(SseEmitter.event()
.id(getEventId(userId, now, eventName))
.name(eventName.getValue())
.data(eventName.getValue()));
} catch (IOException e) {
sseRepository.remove(key);
log.error("SSE send error", e);
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
});
}
/**
* 특정 유저의 특정 sse 이벤트에 대한 id를 생성한다.
* 위 조건으로 여러개 정의 될 경우 now 로 구분한다.
* @param userId
* @param now
* @param eventName
* @return
*/
private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
}
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
container.addMessageListener(messageListener(), topic());
log.info("PubSubConfig init");
return container;
}
@Transactional
public void comment(){
// comment 생성
commentRepository.save(comment);
// 알림 생성
alarmRepository.save(alarm);
// 알림 SSE 응답 발생
redisTemplate.convertAndSend()
}
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {
private final AlarmService alarmService;
/**
* offset을 최신으로 설정.
* https://stackoverflow.com/questions/57163953/kafkalistener-consumerconfig-auto-offset-reset-doc-earliest-for-multiple-listene
* @param alarmEvent
* @param ack
*/
@KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.rdb-group-id}",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRDB")
public void createAlarmInRDBConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
log.info("createAlarmInRDBConsumerGroup");
alarmService.createAlarm(alarmEvent.getUserId(), alarmEvent.getType(), alarmEvent.getArgs());
ack.acknowledge();
}
@KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.redis-group-id}",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRedis")
public void redisPublishConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
log.info("redisPublishConsumerGroup");
alarmService.send(alarmEvent.getUserId(),
alarmEvent.getEventName());
ack.acknowledge();
}
}