두 개의 별도 프로젝트(kafka-basic, sss-basic)를 생성한 다음, 각각 필요 부분을 수정했다.
처음에 전체 로직을 정확히 이해 못해서 돌고 돌은 것이 아쉽다. 이해가 선행되면 코딩이 줄어든다.
프로젝트 설명하기 전에 이전 포스팅들과 참고 블로그들을 따라해서 kafka와 sse 프로젝트가 준비되어 있어야 한다.
이전 포스팅
https://velog.io/@jinvicky/카프카-Sse-예제-1-카프카-설치
https://velog.io/@jinvicky/카프카-Sse-예제-2-Sse-알림-기능
코드 출처
kafka
https://velog.io/@hanblueblue/Kafka-설치실행-및-테스트
sse
https://dkswnkk.tistory.com/702
application.yml 설정을 잘 보자.
KafkaController
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
@Autowired
CustomProducer producer;
@PostMapping("/send-alert")
public void producer(@RequestBody Map<String, String> message) {
String msg = message.get("msg");
producer.send(msg);
}
}
CustomProducer, KafkaConfig
카프카 코드 참고 블로그와 동일하다.
Kafka에서 Consumer는 sse가 가져갔기 때문에 Consumer 부분을 삭제했다.
application.yml
producer에 대한 설정만 한다.
server:
port: 8082
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: dev-topic
bootstrap-servers: localhost:9092
zipkin:
sender:
type: kafka
마찬가지로 기존 코드 참고 블로그에서 변경사항만 추가하겠다.
NotificationController
... 기존 코드 ...
//카프카의 컨슈머로서 토픽에 대한 카프카 리스너를 등록한다.
@KafkaListener(topics = "dev-topic")
public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Payload String payload ) {
//토픽을 받아서
log.info("컨슈머가 받은 메세지 : " + payload);
notificationService.notifyToAll(payload);
}
EmitterRepostiroy
예제에서는 id를 파람으로 받아 그 사용자에게만 메세지를 보냈지만,
조금 변형해서 /subscribe로 커넥션을 맺은 사용자들에게 일괄 전송하기 위해서 Map 반환 메서드를 추가했다.
//모든 Emitter를 저장하는 ConcurrentHashMap
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* emitters 반환
* @return Map<Long, SseEmitter> - 이벤트 Emitter
* */
public Map<Long, SseEmitter> getEmitters () {
return emitters;
}
NotificationService
Map을 foreach로 해서 sendToClient(key, msg)를 반복한다.
... 기존 코드 ...
/**
* 접속한 사용자들 모두에게 alert 메세지 전송
* */
public void notifyToAll(Object msg) {
Map<Long, SseEmitter> emitters = emitterRepository.getEmitters();
emitters.forEach((key, value) -> {
sendToClient(key, msg);
});
}
index.html
id 파라미터 값을 추가하기 위해서 new URL().searchParams를 사용한다.
자세한 내용은 아래 참고
https://developer.mozilla.org/en-US/docs/Web/API/URL/searchParams
eventSource 객체를 사용해서 이벤트를 등록한다.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
test
</body>
<script>
const paramObj = new URL(window.location.href).searchParams;
const userId = paramObj.get("id");
const eventSource = new EventSource(`http://localhost:8080/notifications/subscribe/${userId}`);
eventSource.addEventListener('sse', event => {
console.log("???", event);
})
</script>
</html>
인텔리제이에서 index.html을 브라우저에서 열기로 실행하고
&id=로 각각 고유한 id를 부여했다.
먼저 포스트맨으로 kafka-basic으로 request를 보낸다.
이후 고유 id를 가진 index.html에서 메세지와 id를 확인할 수 있다.
https://velog.io/@max9106/Spring-SSE-Server-Sent-Events를-이용한-실시간-알림
https://devbksheen.tistory.com/entry/Kafka-Spring-Boot에-Kafka를-연동