카프카 + Sse 예제 (3) - 카프카, Sse 연동

jinvicky·2023년 10월 23일
0

Spring & Java

목록 보기
9/23

두 개의 별도 프로젝트(kafka-basic, sss-basic)를 생성한 다음, 각각 필요 부분을 수정했다.

처음에 전체 로직을 정확히 이해 못해서 돌고 돌은 것이 아쉽다. 이해가 선행되면 코딩이 줄어든다.

Overview


  1. 카프카에서 producer 코드를 작성한다.
  2. sse에서 카프카의 consumer 코드를 작성한다.
  3. sse에서 producer로부터 받은 메세지를 sseEmitter를 통해 접근 사용자들에게만 전송한다.
  4. sse에서 index.html에서 console.log()로 내용을 확인한다.
  5. 두 프로젝트를 같이 실행한 다음에 테스트한다.

프로젝트 설명하기 전에 이전 포스팅들과 참고 블로그들을 따라해서 kafka와 sse 프로젝트가 준비되어 있어야 한다.

이전 포스팅
https://velog.io/@jinvicky/카프카-Sse-예제-1-카프카-설치
https://velog.io/@jinvicky/카프카-Sse-예제-2-Sse-알림-기능

코드 출처
kafka
https://velog.io/@hanblueblue/Kafka-설치실행-및-테스트
sse
https://dkswnkk.tistory.com/702


application.yml 설정을 잘 보자.

Kafka 코드 추가


KafkaController

@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {

    @Autowired
    CustomProducer producer;

    @PostMapping("/send-alert")
    public void producer(@RequestBody Map<String, String> message) {
        String msg = message.get("msg");
        producer.send(msg);
    }
}

CustomProducer, KafkaConfig
카프카 코드 참고 블로그와 동일하다.
Kafka에서 Consumer는 sse가 가져갔기 때문에 Consumer 부분을 삭제했다.

application.yml
producer에 대한 설정만 한다.

server:
  port: 8082

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: dev-topic
    bootstrap-servers: localhost:9092
  zipkin:
    sender:
      type: kafka

Sse 코드 추가


마찬가지로 기존 코드 참고 블로그에서 변경사항만 추가하겠다.

NotificationController

... 기존 코드 ...

  //카프카의 컨슈머로서 토픽에 대한 카프카 리스너를 등록한다.
    @KafkaListener(topics = "dev-topic")
    public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Payload String payload ) {
        //토픽을 받아서
        log.info("컨슈머가 받은 메세지 : " + payload);
        notificationService.notifyToAll(payload);
    }

EmitterRepostiroy
예제에서는 id를 파람으로 받아 그 사용자에게만 메세지를 보냈지만,
조금 변형해서 /subscribe로 커넥션을 맺은 사용자들에게 일괄 전송하기 위해서 Map 반환 메서드를 추가했다.


 //모든 Emitter를 저장하는 ConcurrentHashMap
    private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
	/**
     * emitters 반환
     * @return Map<Long, SseEmitter> - 이벤트 Emitter
     * */
    public Map<Long, SseEmitter> getEmitters () {
        return emitters;
    }

NotificationService
Map을 foreach로 해서 sendToClient(key, msg)를 반복한다.

... 기존 코드 ...
/**
     * 접속한 사용자들 모두에게 alert 메세지 전송
     * */
    public void notifyToAll(Object msg) {
        Map<Long, SseEmitter> emitters = emitterRepository.getEmitters();
        emitters.forEach((key, value) -> {
           sendToClient(key, msg);
        });
    }

index.html
id 파라미터 값을 추가하기 위해서 new URL().searchParams를 사용한다.
자세한 내용은 아래 참고
https://developer.mozilla.org/en-US/docs/Web/API/URL/searchParams

eventSource 객체를 사용해서 이벤트를 등록한다.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
test
</body>
<script>
    const paramObj = new URL(window.location.href).searchParams;
    const userId = paramObj.get("id");
    const eventSource = new EventSource(`http://localhost:8080/notifications/subscribe/${userId}`);

    eventSource.addEventListener('sse', event => {
        console.log("???", event);
    })

</script>
</html>

Test


인텔리제이에서 index.html을 브라우저에서 열기로 실행하고
&id=로 각각 고유한 id를 부여했다.

먼저 포스트맨으로 kafka-basic으로 request를 보낸다.

이후 고유 id를 가진 index.html에서 메세지와 id를 확인할 수 있다.

Reference


https://velog.io/@max9106/Spring-SSE-Server-Sent-Events를-이용한-실시간-알림
https://devbksheen.tistory.com/entry/Kafka-Spring-Boot에-Kafka를-연동

profile
Front-End와 Back-End 경험, 지식을 공유합니다.

0개의 댓글