Kafka 를 사용한 채팅프로그램 만들기 #1

kimView·2022년 1월 7일
0
post-thumbnail

Kafka를 이용한 사내 프로젝트 진행 전 간단한 공부를 위해 채팅프로그램을 만들어볼 예정이다. java 대신 kotlin으로 진행하여 kotlin 문법도 같이 연습해 본다.

Spring boot : 채팅서버
kafka : 메시지 큐
websocket(stomp) : 클라이언트 - 서버간 체팅 구현

1. springboot 프로젝트 생성

Spring Initializr

Spring Initializr 를 사용하여 springboot 프로젝트 생성하였다.
intellj 에서 제공되는 Spring Initializr를 사용하였으나 직접 https://start.spring.io 에서 생성하여도 된다.

빌드 관리 도구는 Gradle, 언어는 kotlin 으로 설정.

추가한 Dependencies 는 다음과 같다.

  • Lombok
  • Spring Web :
  • WebSocket
  • Spring for Apache Kafka

gradle

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.6.2"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.6.10"
    kotlin("plugin.spring") version "1.6.10"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

configurations {
    compileOnly {
        extendsFrom(configurations.annotationProcessor.get())
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-websocket")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.kafka:spring-kafka")
    compileOnly("org.projectlombok:lombok")
    annotationProcessor("org.projectlombok:lombok")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.kafka:spring-kafka-test")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

2. kafka 연동

Springboot 서버와 kafka 서버를 연동 해야한다.
연동을 위하여 springboot에 설정을 세팅을 진행.

springboot 에서 kafka를 연동할 때 AutoConfiguration을 이용한 방법과 @Configuration 클래스를 이용한 bean등록 방법이 있다.
그 중 직접 @Configuration 클래스를 작성하는 방법으로 진행한다.

application.yml

consumer, producer의 주소와 consumer에 사용될 그룹id 작성

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092 // 컨슈머 서버 주소
      group-id: chatting
    producer:
      bootstrap-servers: localhost:9092 // 프로듀서 서버 주소

Message Model

연결에 앞서 메시지 내용을 저장할 data class 를 생성.

  • 작성자 : author
  • 메시지 : text
  • 시간 : timestamp
data class Message(
    @JsonProperty("author") val author: String?, // 작성자
    @JsonProperty("text") val text: String?, // 내용
    @JsonProperty("timestamp") var timestamp: String? // 작성 시간
) {
    fun createTimestamp() {
        this.timestamp = LocalDateTime.now().toString()
    }
}

Producer

ProducerConfig.kt

Kafka Producer 설정 클래스

@EnableKafka
@Configuration // @Configuration 어노테이션을 사용하여 설정파일임을 명시하고 bean 등록이 가능하게 한다
class KafkaProducerConfig {

    @Value("\${spring.kafka.producer.bootstrap-servers}")
    lateinit var bootstrapServer: String

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Message> {
        val factory = DefaultKafkaProducerFactory<String, Message>(producerConfigs())
        return KafkaTemplate(factory)
    }

    @Bean
    fun producerConfigs(): Map<String, Serializable> =
        mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, // 메시지 key에 대한 직렬화 설정
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java // 메시지 value에 대한 직렬화 설정
        )
}

Producer 연결 테스트

채팅 메시지 발송을 위한 컨트롤러 작성

Post 요청으로 메시지를 전송 받아 Producer로 메시지를 전달하는 Controller

const val KAFKA_TOPIC: String = "new-kafka-chat"

@RestController
class ChatController(
    private var kafkaTemplate: KafkaTemplate<String, Message>
) {
    val log: Logger = LoggerFactory.getLogger(ChatController::class.java)

    @PostMapping("/chat/send")
    fun sendMessage(@RequestBody message: Message) {
        message.createTimestamp()
        log.info("message : ${message.text}, auth : ${message.author}")
        kafkaTemplate.send(KAFKA_TOPIC, message)
        // post 요청으로 전달받은 메시지를 해당 카프카 토픽에 생산
    }

}

▶︎ 테스트

Postman 을 이용하여 api에 메시지를 전달.

kafka consumer 를 실행하여 메시지가 전달되는지 확인

$ bin/kafka-console-consumer.sh --topic new-kafka-chat --bootstrap-server localhost:9092
{"author":"kimview","text":"this is test message","timestamp":"2022-01-07T15:08:03.786904"}

Cosumer

CosumerConfig.kt

Kafka Cosumer 설정 클래스

@EnableKafka
@Configuration // @Configuration 어노테이션을 사용하여 설정파일임을 명시하고 bean 등록이 가능하게 한다
class KafkaConsumerConfig {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    lateinit var bootstrapServer: String

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Message> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Message>()
        factory.consumerFactory = consumerFactory()
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Message> {
        return DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Message::class.java)) 
        // Message::class.java 직렬화 및 역직렬화에 사용될 모델 
    }

    @Bean
    fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, // 메시지 key에 대한 역직렬화 설정 
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, // 메시지 value에 대한 역직렬화 설정
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", // offest이 없거나 오류가 발생했을때 처리할 작업에 대한 설정
        )
}

참고자료

https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc

profile
개인 공부용

0개의 댓글