SSE(Server-Sent Event)

정현준·2023년 6월 19일
post-thumbnail

이번에 웹 소설 플랫폼 프로젝트를 진행할 때 sse를 사용했는데
공부한 내용 복습도 하고 나중에 다시보면 좋을 것 같아 정리했습니다
(정리한 내용이라 잘못된 내용이 있을 수 있습니다!!)


SSE - Server Sent Event

서버에서 클라이언트로 단방향 실시간 이벤트를 전송하기 위한 기술

http의 비연결성 특성 때문에 서버에서 실시간으로 업데이트 되는 상태를 클라이언트가 받을 수 없는데 sse를 사용하면 http연결을 열어둔 상태에서 클라이언트는 비동기적으로 이벤트를 스트림 형태로 수신할 수 있습니다

스트림 형태라는 말은 지속적으로 전송되는 데이터의 흐름을 말합니다


SSE를 왜 사용했는지?

각 유저가 소설의 각 문장에 자신의 감정을 이모지로 표현할 수 있는데 서버에서 변화하는 상태를 지속적으로 보내기 위해 사용했습니다.


SSE, 웹 소켓 차이

  1. sse는 텍스트 형식으로 데이터를 전송(주로 json 형태로 인코딩) 이벤트 이름, 데이터 필드로 구성
    웹 소켓은 이진 데이터, 텍스트 둘 다 가능
  2. sse 단방향 통신, 웹 소켓은 양방향 통신


코드

1. repository

@Repository
@RequiredArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String id, SseEmitter sseEmitter) {
        emitters.put(id, sseEmitter);
        return sseEmitter;
    }

    @Override
    public Map<String, SseEmitter> findAllStartById(String id) {
        return emitters.entrySet().stream()
            .filter(entry -> entry.getKey().startsWith(id))
            .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
    }

    @Override
    public void deleteAllStartByWithId(String id) {
        emitters.forEach((key, emitter) -> {
            if (key.startsWith(id)) emitters.remove(key);
        });
    }
}
  • emitter 저장, 탐색, 삭제

2. connection

@Component
public class GlobalCounter {

    private int counter;

    public synchronized int incrementCounter() {
        counter++;
        return counter;
    }
}

@Override
public SseEmitter connection(Long episodeId, String id) {

    String uuid = episodeId + "_" + id;

    SseEmitter emitter = new SseEmitter((long) (10 * 60 * 1000));

    emitterRepository.save(uuid, emitter);

    emitter.onCompletion(() -> {
        emitterRepository.deleteAllStartByWithId(uuid);
    });

        
    sendToClient(emitter, uuid, "[connect] id : " + id + ", episodeId : " + episodeId);

    return emitter;
}
  • emitter를 에피소드_고유번호 형태로 저장하고
  • 연결이 끊어지면 repository에서 해당 emitter를 삭제했습니다
  • 연결시간을 10분으로 설정했습니다

3. send event

@Override
public Boolean sendToClient(SseEmitter emitter, String uuid, Object data) {

    try {
        String formattedData = formatData(data);
        emitter.send(SseEmitter.event()
            .id(uuid)
            .name("sse")
            .data(formattedData));

        return true;

    } catch (IOException e) {
       return false;
    }
}

private String formatData(Object data) {
    String rawData = data.toString();
    return "data: " + rawData.toString() + " \n\n";
}
  • 데이터 형태를 변경
{
  event: 이벤트명
  data: 데이터
}

4. service

@Slf4j
@Service
@RequiredArgsConstructor
public class GetEmojiStatusConsumer {

    private final EmitterRepository emitterRepository;
    private final NotificationService notificationService;

    @KafkaListener(id = "emojiStatus", topics = "emojiStatus")
    public void listen(String kafkaMessage) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        EmojiStatusDto emojiStatusDto = mapper.readValue(kafkaMessage, EmojiStatusDto.class);

        Map<String, SseEmitter> result = emitterRepository.findAllStartById(
            String.valueOf(emojiStatusDto.getEpisodeId()) + "_");

        log.info("--------------------------");
        List<Long> success = new ArrayList<>();
        List<Long> fail = new ArrayList<>();

        for (Map.Entry<String, SseEmitter> entry : result.entrySet()) {

            if (notificationService.sendToClient(entry.getValue(), entry.getKey(),
                emojiStatusDto)) {
                success.add(Long.valueOf(entry.getKey().split("_")[1]));
            } else {
                fail.add(Long.valueOf(entry.getKey().split("_")[1]));
            }
        }

        log.info("success : {}", success);
        log.info("fail : {}", fail);
    }
}



이번 프로젝트에 처음으로 msa로 서비스를 구성해서 kafka를 사용해서 좀 힘들었습니다. 일단 결과만 나오면 된다고 생각해서 위의 그림처럼 구현했는데 나중에 더 좋은 방법이 없는지 알아봐야 될 것 같습니다

각 소설의 에피소드를 관리하는 novel서비스와, 이모지를 관리하는 util서비스가 있는데
1. 처음에 유저가 에피소드를 조회하면 novel에서 emitter를 받아옵니다.
2. 에피소드의 각 문장에 이모지를 표시할 수 있는데 이를 관리하는 서비스는 util이기 때문에 util로 데이터를 전송합니다.
3. util에서는 변경된 이모지 상태를 유저들에게 보내야 하는데 변경된 내용에 해당하는 에피소드를 보고 있는 유저들의 정보는 novel에 있기 때문에 kafka로 {episodeId, episodeRow, emoji} 데이터를 보내고
4. 그 데이터를 novel서비스에서 받고 해당 에피소드를 보고 있는 모든 유저들에게 변경된 상태를 전송합니다.


tmi

처음 sse 구현은 생각보다 간단했는데 프론트와 연결하는 과정이 정말 힘들었습니다!!
https://velog.io/@mgk04222/SSE-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%9D%B4%EB%AA%A8%EC%A7%80-%EA%B5%AC%ED%98%84-%ED%94%84%EB%A1%A0%ED%8A%B8%EC%97%94%EB%93%9C%EB%8A%94-%EC%96%B4%EB%96%BB%EA%B2%8C-%EB%AC%B8%EC%A0%9C%EB%A5%BC-%ED%95%B4%EA%B2%B0%ED%95%98%EC%98%80%EB%8A%94%EA%B0%80
여기서 프론트에 관한 내용을 보실 수 있습니다!!

profile
안녕하세요!

1개의 댓글

comment-user-thumbnail
2023년 6월 19일

흥미롭네요!

답글 달기