이벤트가 [서버 -> 클라이언트] 방향으로만 흐르는 단방향 통신 채널
버스 탑승자가 예약할 경우 버스 기사에게 알림이 발생하는 기능을 개발 중입니다.
- 알림 대상: 버스 기사 <- 버스 탑승자
1. sse 구독
@Service
@AllArgsConstructor
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
public SseEmitter subscribe(String dId, String lastEventId) {
// 1
String id = dId + "_" + System.currentTimeMillis();
// 2
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onTimeout(() -> emitterRepository.deleteById(id));
emitter.onError((e) -> emitterRepository.deleteById(id));
// 3
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventName = "503 에러 방지용 이벤트";
EventDTO eventDTO = new EventDTO("EventStream Created.", dId);
sendToClient(emitter, id, eventDTO, eventName, 1);
// 4
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheWithId(String.valueOf(dId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue(), "미수신한 이벤트 목록", 1));
}
return emitter;
}
}
2. 클라이언트에게 데이터 요청 준비
private void sendToClient(SseEmitter emitter, String key, Object data, String name, int option) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
try {
emitter.send(SseEmitter.event()
.id(key)
.name(name)
.data(data, MediaType.APPLICATION_JSON)
.reconnectTime(0));
} catch (Exception e) {
if (option == 1)
emitterRepository.deleteById(key);
else sseRatingRepository.deleteById(key);
System.out.println(e.getMessage());
}
});
}
3. 실제 클라이언트에게 알림 전송
public void send(String dId, Long rId, String startPoint, String endPoint, LocalDate rTime ) {
Notification notification = createNotification(rId, startPoint, endPoint, rTime);
//관련된 SseEmitter 모두 가져오기
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllStartWithById(dId);
sseEmitters.forEach(
(key, emitter) -> {
// 데이터 캐시 저장(유실된 데이터 처리하기 위함)
emitterRepository.saveEventCache(key, notification);
// 데이터 전송
sendToClient(emitter, key, notification, "사용자에 의한 예약 발생", 1);
log.info("[driverId: " + dId + "]: " + notification);
}
);
}
잘 안 보일 수도 있지만...
버스 기사 id : 1 (url 안에 있음)
로그인 후 계속 대기(?) 중
직접 포스트맨으로 결과를 얻어보고 싶었지만 연결을 끊은 후에만 모든 이벤트 정보들이 출력됨 그래서 로그로 찍은 거!!
내가 원했던 건 예약 요청이 되면 바로바로 그 알림 메시지? 보여주는 거였는데... 당연히 return을 해야 값이 넘어오는데 sse 연결이 끊어지지 않는 한 값이 안 넘어오는 게 맞는건가...!?
- 참고
데이터 보낼 때
data:{"startSeq":1,"endSeq":2,"rTime":"2022-11-24"}
이렇게 data만 보내는 것도 가능함
로그인 후 원래 response값은 버스 기사 정보와 버스 시간표인데 현재 sse 테스트를 위해 이 부분을 생략함 추후 수정 예정임