이번에 웹 소설 플랫폼 프로젝트를 진행할 때 sse를 사용했는데
공부한 내용 복습도 하고 나중에 다시보면 좋을 것 같아 정리했습니다
(정리한 내용이라 잘못된 내용이 있을 수 있습니다!!)
서버에서 클라이언트로 단방향 실시간 이벤트를 전송하기 위한 기술
http의 비연결성 특성 때문에 서버에서 실시간으로 업데이트 되는 상태를 클라이언트가 받을 수 없는데 sse를 사용하면 http연결을 열어둔 상태에서 클라이언트는 비동기적으로 이벤트를 스트림 형태로 수신할 수 있습니다
스트림 형태라는 말은 지속적으로 전송되는 데이터의 흐름을 말합니다

각 유저가 소설의 각 문장에 자신의 감정을 이모지로 표현할 수 있는데 서버에서 변화하는 상태를 지속적으로 보내기 위해 사용했습니다.
@Repository
@RequiredArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String id, SseEmitter sseEmitter) {
emitters.put(id, sseEmitter);
return sseEmitter;
}
@Override
public Map<String, SseEmitter> findAllStartById(String id) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
@Override
public void deleteAllStartByWithId(String id) {
emitters.forEach((key, emitter) -> {
if (key.startsWith(id)) emitters.remove(key);
});
}
}
@Component
public class GlobalCounter {
private int counter;
public synchronized int incrementCounter() {
counter++;
return counter;
}
}
@Override
public SseEmitter connection(Long episodeId, String id) {
String uuid = episodeId + "_" + id;
SseEmitter emitter = new SseEmitter((long) (10 * 60 * 1000));
emitterRepository.save(uuid, emitter);
emitter.onCompletion(() -> {
emitterRepository.deleteAllStartByWithId(uuid);
});
sendToClient(emitter, uuid, "[connect] id : " + id + ", episodeId : " + episodeId);
return emitter;
}
@Override
public Boolean sendToClient(SseEmitter emitter, String uuid, Object data) {
try {
String formattedData = formatData(data);
emitter.send(SseEmitter.event()
.id(uuid)
.name("sse")
.data(formattedData));
return true;
} catch (IOException e) {
return false;
}
}
private String formatData(Object data) {
String rawData = data.toString();
return "data: " + rawData.toString() + " \n\n";
}
{
event: 이벤트명
data: 데이터
}
@Slf4j
@Service
@RequiredArgsConstructor
public class GetEmojiStatusConsumer {
private final EmitterRepository emitterRepository;
private final NotificationService notificationService;
@KafkaListener(id = "emojiStatus", topics = "emojiStatus")
public void listen(String kafkaMessage) throws IOException {
ObjectMapper mapper = new ObjectMapper();
EmojiStatusDto emojiStatusDto = mapper.readValue(kafkaMessage, EmojiStatusDto.class);
Map<String, SseEmitter> result = emitterRepository.findAllStartById(
String.valueOf(emojiStatusDto.getEpisodeId()) + "_");
log.info("--------------------------");
List<Long> success = new ArrayList<>();
List<Long> fail = new ArrayList<>();
for (Map.Entry<String, SseEmitter> entry : result.entrySet()) {
if (notificationService.sendToClient(entry.getValue(), entry.getKey(),
emojiStatusDto)) {
success.add(Long.valueOf(entry.getKey().split("_")[1]));
} else {
fail.add(Long.valueOf(entry.getKey().split("_")[1]));
}
}
log.info("success : {}", success);
log.info("fail : {}", fail);
}
}

이번 프로젝트에 처음으로 msa로 서비스를 구성해서 kafka를 사용해서 좀 힘들었습니다. 일단 결과만 나오면 된다고 생각해서 위의 그림처럼 구현했는데 나중에 더 좋은 방법이 없는지 알아봐야 될 것 같습니다
각 소설의 에피소드를 관리하는 novel서비스와, 이모지를 관리하는 util서비스가 있는데
1. 처음에 유저가 에피소드를 조회하면 novel에서 emitter를 받아옵니다.
2. 에피소드의 각 문장에 이모지를 표시할 수 있는데 이를 관리하는 서비스는 util이기 때문에 util로 데이터를 전송합니다.
3. util에서는 변경된 이모지 상태를 유저들에게 보내야 하는데 변경된 내용에 해당하는 에피소드를 보고 있는 유저들의 정보는 novel에 있기 때문에 kafka로 {episodeId, episodeRow, emoji} 데이터를 보내고
4. 그 데이터를 novel서비스에서 받고 해당 에피소드를 보고 있는 모든 유저들에게 변경된 상태를 전송합니다.
처음 sse 구현은 생각보다 간단했는데 프론트와 연결하는 과정이 정말 힘들었습니다!!
https://velog.io/@mgk04222/SSE-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%9D%B4%EB%AA%A8%EC%A7%80-%EA%B5%AC%ED%98%84-%ED%94%84%EB%A1%A0%ED%8A%B8%EC%97%94%EB%93%9C%EB%8A%94-%EC%96%B4%EB%96%BB%EA%B2%8C-%EB%AC%B8%EC%A0%9C%EB%A5%BC-%ED%95%B4%EA%B2%B0%ED%95%98%EC%98%80%EB%8A%94%EA%B0%80
여기서 프론트에 관한 내용을 보실 수 있습니다!!
흥미롭네요!