알림 구현방식에는 웹소켓 방식을 고려하기도하는데, 사실 서버에서 발생하는 이벤트를 일방적으로 전송만하면되서, SSE 구현방식이 더 적합하다고 보았다.
AlarmController
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/api")
public class AlarmController {
private final AlarmService alarmService;
@GetMapping(value = "/alarm/subscribe", produces = "text/event-stream")
public SseEmitter alarmSubscribe(
@RequestParam("username") String username,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
HttpServletResponse response){
response.setHeader("X-Accel-Buffering", "no");
LocalDateTime now = LocalDateTime.now();
return alarmService.subscribe(username, lastEventId, now);
}
}
우선적으로 DB에 있는 회원을 상대로 알람을 구독하고, 자신이 받는 로직으로 테스트를 해볼것이다. 시큐리티는 적용하지않는다.
AlarmService
package com.example.practice.service.alarm;
import com.example.practice.domain.sse.SseEventName;
import com.example.practice.domain.member.Member;
import com.example.practice.exception.ErrorCode;
import com.example.practice.exception.NoEntityException;
import com.example.practice.exception.SseException;
import com.example.practice.repository.alarm.AlarmRepository;
import com.example.practice.repository.alarm.SseInMemoryRepository;
import com.example.practice.repository.alarm.SseRepositoryKeyRule;
import com.example.practice.repository.member.MemberRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Service
public class AlarmService {
private Long sseTimeout = 60L * 60 * 1000;
private static final String UNDER_SCORE = "_";
private static final String CONNECTED = "CONNECTED";
private final AlarmRepository alarmRepository;
private final MemberRepository memberRepository;
private final SseInMemoryRepository sseRepository;
//private final RedisTemplate<String, String> redisTemplate;
public SseEmitter subscribe(String username, String lastEventId, LocalDateTime now) {
//Long userId = getMemberByUsernameOrException(username).getId();
Long userId = memberRepository.findByUsername(username).get().getId();
System.out.println(userId);
SseEmitter sse = new SseEmitter(sseTimeout);
String key = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
now).toCompleteKeyWhichSpecifyOnlyOneValue();
sse.onCompletion(()->{
log.info("onCompletion callback");
sseRepository.remove(key);
});
sse.onTimeout(()->{
log.info("onTimeout callback");
sse.complete();
});
sseRepository.put(key, sse);
try {
sse.send(SseEmitter.event()
.name(CONNECTED)
.id(getEventId(userId, now, SseEventName.ALARM_LIST))
.data("subscribe"));
} catch (IOException exception) {
sseRepository.remove(key);
log.info("SSE Excetpion: {}", exception.getMessage());
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
return sse;
}
public void send(String username) {
Long userId = memberRepository.findByUsername(username).get().getId();
System.out.println(userId);
LocalDateTime now = LocalDateTime.now();
String keyPrefix = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
null).toCompleteKeyWhichSpecifyOnlyOneValue();
sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key ->{
SseEmitter emitter = sseRepository.get(key).get();
try{
emitter.send(SseEmitter.event()
.id(getEventId(userId,now,SseEventName.ALARM_LIST))
.name(CONNECTED)
.data("알림성공"));
}
catch (IOException e){
sseRepository.remove(key);
log.error("SSE send error", e);
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
});
}
private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
}
/*private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
return userId + UNDER_SCORE + sseEventName.getValue();
}
private Member getMemberByUsernameOrException(String username) {
return memberRepository.findByUsername(username)
.orElseThrow(() -> new NoEntityException(
ErrorCode.ENTITY_NOT_FOUND
));
}*/
}
향후 Redis pub/sub -> kafka를 사용하려 고도화를 하려고한다. redis관련 코드는 주석처리하였다.
SSE Timeout 시간은 1시간이다. (60분 x 60초 x 1000ms)
SSE Eventname은 "CONNECT"이다. 이에 대해서 통신을 주고받는다.
subscribe()
- 우선 username을 바탕으로 기본키를 받아온다.
기본키+ENUM+시간 을 묶어 key로 만들어준다. 이것은 SseEmitter의 key에 해당한다.
- SseEmitter는 Map형태로 메모리에 저장되는데, <key,sseEmitter> 형태를갖는다.
- subscribe 함수를 통해 sseemitter를 생성해, 메모리에 저장한다.
send()
send함수에서 메모리에 저장되어있는 key+Enum으로 시작하는 모든 emitter을 찾아 이벤트 발생시, 알림을 보낸다.
알림을 보내고싶은 api에 넣는 방식으로 이렇게 사용하면된다.
@GetMapping("/test") // react 연동
public String hello(){
alarmService.send("hasung");
return "테스트입니다";
}
package com.example.practice.repository.alarm;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@Component
public class SseInMemoryRepository implements SseRepository{
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@Override
public void put(String key, SseEmitter sseEmitter) {
sseEmitterMap.put(key, sseEmitter);
}
@Override
public Optional<SseEmitter> get(String key) {
return Optional.ofNullable(sseEmitterMap.get(key));
}
@Override
public List<SseEmitter> getListByKeyPrefix(String keyPrefix) {
return sseEmitterMap.keySet().stream()
.filter(key -> key.startsWith(keyPrefix))
.map(sseEmitterMap::get)
.collect(Collectors.toList());
}
@Override
public List<String> getKeyListByKeyPrefix(String keyPrefix) {
return sseEmitterMap.keySet().stream()
.filter(key -> key.startsWith(keyPrefix))
.collect(Collectors.toList());
}
@Override
public void remove(String key) {
sseEmitterMap.remove(key);
}
}
Map<String,SseEmitter> sseEmitters = emitterRepository.findAllEmittersStartWithId(subscriberUUID);
sseEmitters.forEach(
(key, emitter) -> {
// 데이터 캐시 저장 (유실된 데이터 처리 위함)
emitterRepository.saveEventCache(key, notificationResponseDto);
sendToClient(emitter, key, "alertComment", notificationResponseDto);
// 알림 레디스에 저장
alarmRedisService.save(AlarmRedisRequestDto.builder()
// .pubUUID(publisherUUID)
.pubName(commentNickname)
.subUUID(subscriberUUID)
.directoryName(directoryName)
.build());
}
);
sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key ->{
SseEmitter emitter = sseRepository.get(key).get();
try{
emitter.send(SseEmitter.event()
.id(getEventId(userId,now,SseEventName.ALARM_LIST))
.name(CONNECTED)
.data("알림성공"));
}
catch (IOException e){
sseRepository.remove(key);
log.error("SSE send error", e);
throw new SseException(ErrorCode.SSE_SEND_ERROR);
}
});