Kafka 를 사용한 채팅프로그램 만들기 #2

kimView·2022년 1월 12일
0
post-thumbnail

앞서 진행했던 kafka 연동을 이어서 작업한다.
producer에 대한 설정이 끝났기 때문에 남은 Cosumer 및 WebSocket 설정 진행.

2. kafka 연동

Cosumer

Consumer 의 설정까지 완료 했음으로 kafka 토픽을 구독할 Listener를 구현

MessageConsumer.kt

@KafkaListene 어노테이션을 통해 메시지를 구독하고 소비할 수 있다.
SimpMessagingTemplate 의 convertAndSend 메소드를 통해 해당 토픽 ( "/topic/group" ) 을 구독중인 클라이언트에 메시지를 전달

@Component
class MessageConsumer(
    private var template: SimpMessagingTemplate
) {

    val log: Logger = LoggerFactory.getLogger(MessageConsumer::class.java)

    @KafkaListener(
        topics = [KAFKA_TOPIC], // 토픽 이름
        groupId = "chatting"    // 그룹 id
    )
    fun consume(message: Message) {
        log.info("Kafka Consume Message: ${message.text}, Author: ${message.author}")
        template.convertAndSend("/topic/group", message);
    }

}

WebSocket 설정

클라이언트와 서버간의 채팅 구현을 위한 WebSocket 의 설정을 진행

WebSocketConfig.kt

addEndpoint : websocket 이 연결되는 경로 지정.
setApplicationDestinationPrefixes : 클라이언트로 부터 메시지를 받는 api의 prefix
enableSimpleBroker : 등록된 경로를 바탕으로 메시지 브로커가 클라이언트를 구분지어 메시지를 전달한다.

@Configuration
@EnableWebSocketMessageBroker // WebSocket 서버 활성화
class WebSocketConfig: WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws-chat").withSockJS()
    }

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.setApplicationDestinationPrefixes("/app")
        registry.enableSimpleBroker("/topic")
    }
}

ChatController

설정이 다 끝났다면 이제 클라이언트로 부터 메시지를 전받을 api를 Controller 를 작성한다
클라이언트가 Post 요청으로 부터 /chat/send 경로로 메시지를 보낸다. 전달받은 메시지는 kafka producer가 해당 토픽으로 메시지를 발행한다.

@RestController
class ChatController(
    private var kafkaTemplate: KafkaTemplate<String, Message>
) {
    val log: Logger = LoggerFactory.getLogger(ChatController::class.java)

    @PostMapping("/chat/send")
    fun sendMessage(@RequestBody message: Message) {
        message.createTimestamp()
        log.info("message : ${message.text}, auth : ${message.author}")
        kafkaTemplate.send(KAFKA_TOPIC, message)
        // post 요청으로 전달받은 메시지를 해당 카프카 토픽에 생산
    }

}

참고자료

https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc

profile
개인 공부용

0개의 댓글