앞서 진행했던 kafka 연동을 이어서 작업한다.
producer에 대한 설정이 끝났기 때문에 남은 Cosumer 및 WebSocket 설정 진행.
Consumer 의 설정까지 완료 했음으로 kafka 토픽을 구독할 Listener를 구현
@KafkaListene
어노테이션을 통해 메시지를 구독하고 소비할 수 있다.
SimpMessagingTemplate 의 convertAndSend
메소드를 통해 해당 토픽 ( "/topic/group" ) 을 구독중인 클라이언트에 메시지를 전달
@Component
class MessageConsumer(
private var template: SimpMessagingTemplate
) {
val log: Logger = LoggerFactory.getLogger(MessageConsumer::class.java)
@KafkaListener(
topics = [KAFKA_TOPIC], // 토픽 이름
groupId = "chatting" // 그룹 id
)
fun consume(message: Message) {
log.info("Kafka Consume Message: ${message.text}, Author: ${message.author}")
template.convertAndSend("/topic/group", message);
}
}
클라이언트와 서버간의 채팅 구현을 위한 WebSocket 의 설정을 진행
addEndpoint
: websocket 이 연결되는 경로 지정.
setApplicationDestinationPrefixes
: 클라이언트로 부터 메시지를 받는 api의 prefix
enableSimpleBroker
: 등록된 경로를 바탕으로 메시지 브로커가 클라이언트를 구분지어 메시지를 전달한다.
@Configuration
@EnableWebSocketMessageBroker // WebSocket 서버 활성화
class WebSocketConfig: WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws-chat").withSockJS()
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.setApplicationDestinationPrefixes("/app")
registry.enableSimpleBroker("/topic")
}
}
설정이 다 끝났다면 이제 클라이언트로 부터 메시지를 전받을 api를 Controller 를 작성한다
클라이언트가 Post 요청으로 부터 /chat/send
경로로 메시지를 보낸다. 전달받은 메시지는 kafka producer가 해당 토픽으로 메시지를 발행한다.
@RestController
class ChatController(
private var kafkaTemplate: KafkaTemplate<String, Message>
) {
val log: Logger = LoggerFactory.getLogger(ChatController::class.java)
@PostMapping("/chat/send")
fun sendMessage(@RequestBody message: Message) {
message.createTimestamp()
log.info("message : ${message.text}, auth : ${message.author}")
kafkaTemplate.send(KAFKA_TOPIC, message)
// post 요청으로 전달받은 메시지를 해당 카프카 토픽에 생산
}
}
참고자료
https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc