Kafka를 이용한 사내 프로젝트 진행 전 간단한 공부를 위해 채팅프로그램을 만들어볼 예정이다. java 대신 kotlin으로 진행하여 kotlin 문법도 같이 연습해 본다.
Spring boot : 채팅서버
kafka : 메시지 큐
websocket(stomp) : 클라이언트 - 서버간 체팅 구현
Spring Initializr 를 사용하여 springboot 프로젝트 생성하였다.
intellj 에서 제공되는 Spring Initializr를 사용하였으나 직접 https://start.spring.io 에서 생성하여도 된다.
빌드 관리 도구는 Gradle, 언어는 kotlin 으로 설정.
추가한 Dependencies 는 다음과 같다.
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.6.2"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.6.10"
kotlin("plugin.spring") version "1.6.10"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
}
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
Springboot 서버와 kafka 서버를 연동 해야한다.
연동을 위하여 springboot에 설정을 세팅을 진행.
springboot 에서 kafka를 연동할 때 AutoConfiguration을 이용한 방법과 @Configuration 클래스를 이용한 bean등록 방법이 있다.
그 중 직접 @Configuration 클래스를 작성하는 방법으로 진행한다.
consumer, producer의 주소와 consumer에 사용될 그룹id 작성
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092 // 컨슈머 서버 주소
group-id: chatting
producer:
bootstrap-servers: localhost:9092 // 프로듀서 서버 주소
연결에 앞서 메시지 내용을 저장할 data class 를 생성.
data class Message(
@JsonProperty("author") val author: String?, // 작성자
@JsonProperty("text") val text: String?, // 내용
@JsonProperty("timestamp") var timestamp: String? // 작성 시간
) {
fun createTimestamp() {
this.timestamp = LocalDateTime.now().toString()
}
}
Kafka Producer 설정 클래스
@EnableKafka
@Configuration // @Configuration 어노테이션을 사용하여 설정파일임을 명시하고 bean 등록이 가능하게 한다
class KafkaProducerConfig {
@Value("\${spring.kafka.producer.bootstrap-servers}")
lateinit var bootstrapServer: String
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Message> {
val factory = DefaultKafkaProducerFactory<String, Message>(producerConfigs())
return KafkaTemplate(factory)
}
@Bean
fun producerConfigs(): Map<String, Serializable> =
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, // 메시지 key에 대한 직렬화 설정
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java // 메시지 value에 대한 직렬화 설정
)
}
채팅 메시지 발송을 위한 컨트롤러 작성
Post 요청으로 메시지를 전송 받아 Producer로 메시지를 전달하는 Controller
const val KAFKA_TOPIC: String = "new-kafka-chat"
@RestController
class ChatController(
private var kafkaTemplate: KafkaTemplate<String, Message>
) {
val log: Logger = LoggerFactory.getLogger(ChatController::class.java)
@PostMapping("/chat/send")
fun sendMessage(@RequestBody message: Message) {
message.createTimestamp()
log.info("message : ${message.text}, auth : ${message.author}")
kafkaTemplate.send(KAFKA_TOPIC, message)
// post 요청으로 전달받은 메시지를 해당 카프카 토픽에 생산
}
}
▶︎ 테스트
Postman
을 이용하여 api에 메시지를 전달.
kafka consumer 를 실행하여 메시지가 전달되는지 확인
$ bin/kafka-console-consumer.sh --topic new-kafka-chat --bootstrap-server localhost:9092
{"author":"kimview","text":"this is test message","timestamp":"2022-01-07T15:08:03.786904"}
Kafka Cosumer 설정 클래스
@EnableKafka
@Configuration // @Configuration 어노테이션을 사용하여 설정파일임을 명시하고 bean 등록이 가능하게 한다
class KafkaConsumerConfig {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
lateinit var bootstrapServer: String
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Message> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Message>()
factory.consumerFactory = consumerFactory()
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<String, Message> {
return DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Message::class.java))
// Message::class.java 직렬화 및 역직렬화에 사용될 모델
}
@Bean
fun getConfig(): Map<String, Any> =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, // 메시지 key에 대한 역직렬화 설정
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, // 메시지 value에 대한 역직렬화 설정
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // offest이 없거나 오류가 발생했을때 처리할 작업에 대한 설정
)
}
참고자료
https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc