/목차
클라이언트 요청이 없더라도 서버에서 데이터를 줘야하는 경우가 있다
우리는 알람 기능을 구현하는데, 클라이언트가 알람을 달라는 요청을 지속적으로 보낼 수 있지만(polling 방식) 계속적인 request는 서버의 부담이 점점 늘어가고, 매 요청마다 새로운 알람이 있는지 물어보는 것은 polling방식보다 서버의 부담이 덜하겠지만 그래도 DB connection등 서버의 부담이 아예 없는건 아니다
위에 설명한 폴링 방식인데 이벤트가 발생하면 그 이후에 request로 이벤트에 대한 응답을 받는 방식이다 이건 계속 request를 보내는 것이기 때문에 서버의 부담이 생긴다 그리고 이벤트가 발생하고 다음 요청까지 이벤트의 반영이 안되기 때문에 실시간을 보장하지 못한다
계속적으로 요청하는게 아니라 커넥션의 유지 시간을 길게 가진다 그래서 그 커넥션동안 이벤트가 발생하면 지속되고 있는 커넥션으로 response를 보내고 다시 커넥션을 유지할 request를 보내는 방식이다 이렇게 하면 실시간은 보장하지만 이벤트가 자주 발생하는 경우에는 서버에 부담은 폴링 방식과 큰 차이가 없다
HTTP 프로토콜을 사용하며 서버의 이벤트를 클라이언트로 실시간, 지속적으로 보내는 기술이다 또한 서버에서 클라이언트 단방향으로만 보낼 수 있다
클라이언트가 주기적으로 서버에 요청을 보낼 필요가 없어서 폴링, 긴 폴링 방식보다 서버 부하가 적고 실시간성을 유지하지만 지속적으로 연결해야되서 이 또한 네트워크 연결을 소비하는 거고 연결하는 클라이언트가 많다면 이 또한 서버의 부담이 증가하게 된다
SSE와의 차이점은 HTTP프로토콜이 아닌 별도의 프로토콜을 사용하며 양방향 통신이다 웹 소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답하면 된다 하지만 서버만 이벤트를 전송하고 채팅도 아닌데 알림에서 양방향 통신은 비효율적인거 같다
구독 (Subscribe): 클라이언트가 알림을 받기 위해 서버에 구독 요청을 합니다. 이때, 사용자의 ID(userId
)와 마지막으로 받은 이벤트의 ID(lastEventId
)를 파라미터로 전달합니다.
1-1) subscribe()
메소드에서는 먼저 고유한 emitterId
를 생성합니다. (userId
와 현재 시간을 조합)
1-2) 새로운 SseEmitter
객체를 생성하고, 이 객체를 emitterRepository
에 저장합니다.
1-3) SseEmitter
연결이 종료되거나 타임아웃될 경우 해당 emitterId
를 제거하는 로직을 설정합니다.
1-4) 초기 연결 상태 확인 및 HTTP 503 에러 방지를 위해 더미 이벤트("EventStream Created")를 전송합니다.
1-5) 만약 클라이언트가 놓친 이벤트(lastEventId
이후의 이벤트들)가 있으면 그것들도 보내줍니다.
알림 발송 (Send Notification): 어떤 사건이 발생하여 특정 사용자에게 알림을 보내야 할 때 호출됩니다.
2-1) 새로운 알림 객체(Notification
)를 생성하고 데이터베이스에 저장합니다.
2-2) 해당 사용자(receiver, 즉 알림 수신자)에게 등록된 모든 SseEmitter들을 찾습니다.
2-3) 각각의 SseEmitter에 대해, 새로운 알림 데이터와 함께 'sse'라는 이름의 이벤트를 전송합니다.
재전송 (Resend Lost Data): 만약 클라이언트가 일부 데이터를 놓쳐서 받지 못했다면 그것들도 재전송하는 로직입니다.
emitterId
: 이는 특정 SseEmitter
객체를 식별하기 위한 ID입니다. 여기서 사용된 방식에 따르면, 각 사용자마다 고유의 emitterId
가 생성되며, 이를 통해 사용자의 SSE 연결을 관리하게 됩니다.+) SseEmitter
는 단지 서버에서 클라이언트로 이벤트를 전송하는 도구일 뿐이며, 알림의 '읽음' 상태를 추적하거나 관리하지 않습니다.
eventId
: 이는 서버에서 클라이언트로 전송되는 개별 이벤트를 식별하기 위한 ID입니다. 각각의 알림 메시지나 더미 메시지 등은 고유의 eventId
를 가집니다.
따라서, 일반적으로 하나의 emitterId
(하나의 SSE 연결)가 여러 개의 eventId
(여러 개의 이벤트)를 가질 수 있습니다.
@RequiredArgsConstructor
@Service
@Transactional(readOnly = true)
public class VoteService {
private final UserRepository userRepository;
private final VoteRepository voteRepository;
private final VoteValidator voteValidator;
private final PageableConverter pageableConverter;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public void doVote(DoVoteInfo info) {
Vote vote = voteRepository.findById(info.getVoteId()).orElseThrow(VoteNotFoundException::new);
User user = userRepository.findById(info.getUserId()).orElseThrow(UserNotFoundException::new);
voteValidator.validateParticipateVote(vote, user);
VoteResult voteResult = new VoteResult(vote.getId(), user.getId(), info.getChoice());
voteResultRepository.save(voteResult);
eventPublisher.publishEvent(new DoVoteEvent(this, vote.getId()));
}
}
투표를 하면 eventPublisher가 publishEvent를 해준다
@Getter
public class DoVoteEvent extends ApplicationEvent {
private final Long voteId;
public DoVoteEvent(Object source, Long voteId) {
super(source);
this.voteId = voteId;
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationSender {
private final VoteRepository voteRepository;
private final UserRepository userRepository;
private final VoteResultRepository voteResultRepository;
private final NotificationService notificationService;
@TransactionalEventListener
@Async
public void handleCommentCreated(CommentCreatedEvent event) {
sendNotificationForNewComments(event.getVoteId());
}
@TransactionalEventListener
@Async
public void handleDoVote(DoVoteEvent event) {
sendNotificationsForVoters(event.getVoteId());
}
public void sendNotificationForNewComments(Long voteId) {
Vote vote = voteRepository.findById(voteId).orElseThrow(VoteNotFoundException::new);
User receiver = userRepository.findById(vote.getPostedUserId()).orElseThrow(UserNotFoundException::new);
String title = vote.getTitle();
String content = "투표에 댓글이 달렸습니다.";
String url = "vote/" + voteId;
notificationService.send(receiver, Notification.NotificationType.COMMENT, title, content, url);
log.info("Thread: {}, Notification sent to user: {}, type: {}, content: {}, url: {}",
Thread.currentThread().getName(), receiver.getId(), Notification.NotificationType.COMMENT, title, content, url);
}
public void sendNotificationsForVoters(Long voteId) {
Vote vote = voteRepository.findById(voteId).orElseThrow(VoteNotFoundException::new);
Long count = voteResultRepository.countByVoteId(voteId);
if (count % 10 == 0 && count != 0) {
String title = vote.getTitle();
String content = "투표에 " + count + "명 이상이 참여했어요!";
String url = "vote/" + voteId;
List<VoteResult> voteResultList = voteResultRepository.findByVoteId(voteId);
sendNotificationsToVoters(title, content, url, voteResultList);
}
}
private void sendNotificationsToVoters(String title, String content, String url, List<VoteResult> voteResultList) {
for (VoteResult result : voteResultList) {
CompletableFuture.runAsync(() -> {
User receiver = userRepository.findById(result.getVotedUserId()).orElseThrow(UserNotFoundException::new);
notificationService.send(receiver, Notification.NotificationType.VOTE, title, content, url);
log.info("Thread: {}, Notification sent to user: {}, type: {}, content: {}, url: {}",
Thread.currentThread().getName(), receiver.getId(), Notification.NotificationType.COMMENT, title, content, url);
});
}
}
}
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //스프링이 제공하는 스레드 풀 구현체
taskExecutor.setCorePoolSize(1); //핵심 스레드 수 1로 설정 스레드 풀의 최소 스레드 수
taskExecutor.setMaxPoolSize(5); //최대 스레드 수 5로 설정 풀이 생성하는 최대 스레드 수
taskExecutor.setQueueCapacity(10); //작업 대기열의 크기 10
taskExecutor.setThreadNamePrefix("async-thread-"); //각 스레드의 이름 설정
taskExecutor.initialize();
return taskExecutor;
}
}
@Repository
@RequiredArgsConstructor
public class EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
public Map<String, SseEmitter> findAllEmitterStartWithByUserId(String userId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public Map<String, Object> findAllEventCacheStartWithByMemberId(String userId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public void deleteById(String id) {
emitters.remove(id);
}
}
emitters 에다가 여기에다가 save에서 받은 emitter넣어준다
발송해달라는 요청이들어오면 여기서 꺼내서 발송을 해준다 그러면 userConnection에 넣은 스레드다르고 꺼내는 스레드 다르고 발송하는 스레드가 다 다르다 -> 추후에 나올 onCompletion, onTimeout 콜백은 별도의 스레드에서 호출되서 SseEmitter가 담긴 컬렉션에 연산할거면 스레드 세이프해야 한다 그렇다면 스레드끼리 병합을 시켜줄 수 있는데 이런 스레드 세이프한 자료구조를 concurrent hash map이라고 한다
그럼 eventCache는 뭐냐면 유저가 창을 닫거나 로그아웃을 해서 연결이 해제되었는데 그 사이에 생긴 알림(누락된 알림)을 저장하는 곳이다
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
// SSE 연결 지속 시간 설정
private final EmitterRepository emitterRepository;
private final NotificationRepository notificationRepository;
private final UserRepository userRepository;
@Value("${admin.id}")
private String adminUserId;
// [1] subscribe()
public SseEmitter subscribe(Long userId, String lastEventId) {
String emitterId = makeTimeIncludeId(userId);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
String eventId = makeTimeIncludeId(userId);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + userId + "]");
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, userId, emitterId, emitter);
}
return emitter;
}
@Transactional
public void send(User receiver, Notification.NotificationType notificationType, String title, String content, String url) {
Notification notification = notificationRepository.save(createNotification(receiver, notificationType, title, content, url));
String receiverId = String.valueOf(receiver.getId());
String eventId = receiverId + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByUserId(receiverId);
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendNotification(emitter, eventId, key, NotificationDto.from(notification));
}
);
}
private String makeTimeIncludeId(Long userId) { // (3)
return userId + "_" + System.currentTimeMillis();
}
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.name("sse")
.data(data)
);
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
private boolean hasLostData(String lastEventId) { // (5)
return !lastEventId.isEmpty();
}
private void sendLostData(String lastEventId, Long userId, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(userId));
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
private Notification createNotification(User receiver, Notification.NotificationType notificationType, String title, String content, String url) {
return Notification.builder()
.receiver(receiver)
.notificationType(notificationType)
.title(title)
.content(content)
.url(url)
.isRead(false)
.build();
}
public List<NotificationDto> getNotificationDtos(Long userId) {
return getNotificationsByUserId(userId).stream()
.map(NotificationDto::from)
.collect(Collectors.toList());
}
@Transactional
public void setNotificationsAsRead(Long notificationId) {
Notification notification = notificationRepository.findById(notificationId).orElseThrow(NotificationNotFoundException::new);
notification.setIsReadTrue();
}
private List<Notification> getNotificationsByUserId(Long userId) {
User user = userRepository.findById(userId).orElseThrow(UserNotFoundException::new);
return notificationRepository.findNotificationsByUser(user);
}
private void validateAdminId(Long adminId) {
if (adminId != adminUserId) {
throw new AdminAuthorityException();
}
}
private List<Notification> removeDuplicates(List<Notification> notifications) {
return notifications.stream()
.collect(Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(n -> n.getNotificationType().toString() + n.getUrl()))), ArrayList::new));
}
}
subscribe()는 클라이언에게 받은 SSE연결 요청(구독)을 받는다 이전에 받지 못한 정보가 있으면 lastEventId로 받아올 수 있다 emmiterId는 userId + 시간으로 해서 emitter에 저장하고(한 회원이 여러개로 로그인했을때를 위해 시간까지 ex)컴터, 폰, 테블릿 또한 같은 브라우저에서 여러개 로그인도 대응 가능 sse는 HTTP/1.1에서 브라우저당 6개, HTTP/2에서 브라우저당 100개) onCompletion은 SseEmitter가 타임아웃이 일어났을 때 해당 emitter를 지워주고 completion은 SseEmitter가 완료되었을때도 해당 emitter를 지워준다 그리고 sendNotification으로 연결됬다는 알림을 하나 보내준다 이유는 처음에 SSE연결을 할 때 아무런 이벤트도 보내지 않으면 재연결 요청 혹은 연결 요청에 오류가 발생한다(503 Service Unavailable) 그래서 첫 SSE를 보낼때 더비 데이터를 넣어야 한다
그리고 hasLostData로 lastEvent가 있는지 확인하고 있다면 이벤트 캐시를 가져와 lastEventId 와 비교하여 누락된 데이터를 전송
send()는 Notification 알림 엔티티 만들어서 저장하고 userId로 SseEmitter가져온다 그리고 eventCache에 혹여나 이 알림이 누락될 수 있으니 저장하고 sendNotification한다 forEach사용하는 이유는 아까 말햇듯이 한 회원이 여러곳에 로그인했을 때 전부 보내기 위해서
Nginx는 기본적으로 Upstream으로 요청을 보낼때 HTTP 1.0 버전을 사용한다 SSE는 1.1이상이여야 하고 HTTP 1.1은 지속 연결이 기본이기 때문에 헤더를 따로 설정해줄 필요가 없다.
Nginx에서 백엔드 WAS로 요청을 보낼 때는 HTTP 1.0을 사용하고 Connection: close 헤더를 사용한다
SSE는 지속 연결이 되어 있어야 동작하는데 Nginx에서 지속 연결을 닫아버려 제대로 동작하지 않는다
proxy_set_header Connection '';
proxy_http_version 1.1;
또 Nginx의 proxy buffering 기능이 있다. SSE 통신에서 서버는 기본적으로 응답에 Transfer-Encoding: chunked 이다 SSE는 서버에서 동적으로 생성된 컨텐츠를 스트리밍하기 때문에 본문의 크기를 미리 알 수 없기 때문이다
Nginx는 서버의 응답을 버퍼에 저장해두었다가 버퍼가 차거나 서버가 응답 데이터를 모두 보내면 클라이언트로 전송한다 그러면 SSE를 사용하는 이유중 하나인 실시간성을 보장받지 못할 수 있다
SSE 응답을 반환하는 API의 헤더에 X-Accel-Buffering: no를 붙여주면 SSE 응답만 버퍼링을 하지 않도록 설정할 수 있다.
참고자료