스프링부트 SseEmitter 사용

안민기·2023년 12월 1일

일기 어플 프로젝트 중 실시간 알림을 구현해야 하는 경우가 생겼다.

실시간 알림의 경우 구현하는 방법이 주로 세가지인데

  1. pooling
  2. websocket
  3. Server-sent-event

가 있다. pooling의 경우 클라이언트가 서버에 일정 주기로 계속 데이터를 요청하는 방법으로 이렇게 지속적인 요청을 보내는 경우 리소스 낭비가 심하기에 패스했다. websocket의 경우 서버와 클라이언트를 지속적인 TCP 라인으로 연결하여 양방향 통신을 하는 방식인데 알람기능은 단방향 통신만으로 구현 가능하므로 패스했다.

그래서 선택한 것이 SSE(Server-sent-event)인데, 클라이언트와 서버를 연결한 후 서버에서 필요할 때 데이터를 보내주는 방식이다. HTTP 프로토콜을 사용하므로 구현이 간편하다는 장점이 있다.

스프링부트의 경우 SseEmitter API를 통해 SSE 통신을 지원하고 있다.

SSEController.java

package com.mydiary.my_diary_server.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.mydiary.my_diary_server.service.NotificationService;

import lombok.RequiredArgsConstructor;

@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor
public class SSEController {
	private final NotificationService notificationService;
	
	@GetMapping(value="/sub/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public SseEmitter subscribe(@PathVariable Long id)
	{
		return notificationService.subscribe(id);
	}
	
	@PostMapping("/send-data/{id}")
	public void sendData(@PathVariable Long id)
	{
		notificationService.notify(id, "data");
	}
}

Get 요청을 통해 클라이언트와 서버가 연결하고 데이터를 보낼 땐 send-data/{id}로 post를 보낼 수 있게 설정했다. 테스트를 위해 id는 임의의 값을 이용해 통신하도록 했다.

EmitterRepository

package com.mydiary.my_diary_server.repository;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import lombok.RequiredArgsConstructor;

@Repository
@RequiredArgsConstructor
public class EmitterRepository {
	
	private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
	
	public void save(Long id, SseEmitter emitter)
	{
		emitters.put(id, emitter);
	}
	
	public void deleteById(Long id)
	{
		emitters.remove(id);
	}
	
	public SseEmitter get(Long id)
	{
		return emitters.get(id);
	}
}

SseEmitter를 처리하기 위해 구현한 EmitterRepository 부분이다. ConturrentHashMap 형태로 모든 emitter를 저장하게 했다.

NotificationService

package com.mydiary.my_diary_server.service;

import java.io.IOException;

import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.mydiary.my_diary_server.repository.EmitterRepository;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class NotificationService {
	private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
	private final EmitterRepository emitterRepository;
	
	public SseEmitter subscribe(Long userId)
	{
		SseEmitter emitter = createEmitter(userId);
		
		sendToClient(userId, "EventScream Created. " + userId);
		return emitter;
	}
	
	public void notify(Long userId, Object event)
	{
		sendToClient(userId, event);
	}
	
	public void sendToClient(Long id, Object Data)
	{
		SseEmitter emitter = emitterRepository.get(id);
		if(emitter != null)
		{
			try {
				emitter.send(SseEmitter.event().id(String.valueOf(id)).name("sse").data(Data));
			}catch(IOException exp)
			{
				emitterRepository.deleteById(id);
				emitter.completeWithError(exp);
			}
			
		}
	}

	private SseEmitter createEmitter(Long id)
	{
		SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
		emitterRepository.save(id, emitter);
		
		emitter.onCompletion(() -> emitterRepository.deleteById(id));
		emitter.onTimeout(() -> emitterRepository.deleteById(id));
		return emitter;
	}
}

실질적으로 통신을 처리하는 부분이다. SSE연결은 처음 연결하고 아무 이벤트도 없으면 다시 재연결 요청이 오기때문에 subscribe 부분에 sendToClient를 한번 실행한다.

테스트도 완료. 추후 확장성을 위해 프로젝트에서는 Firebase를 통해 알람을 구현하기로 했으나 실시간 알람을 보낼 수 있는 기반을 얻은 것 같다.

profile
개발 블로그

0개의 댓글