[KoPring] 서비스에 Kafka 적용기 4 - Kafka Stream 설정

Sihwan Kim·2024년 2월 27일

KoPring

목록 보기
6/10

Kafka Stream 설정

1. 의존성 추가

dependencies {
	implementation ("org.apache.kafka:kafka-streams:3.4.0")
}

2. Config 클래스 작성

@EnableKafkaStreams
@EnableKafka
@Configuration
class KafkaConfig(
    @Value("\${spring.kafka.bootstrap-servers}")
    var bootStrapServers : String
) {
    @Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
    fun kafkaStreamConfig() : KafkaStreamsConfiguration{
        val kStreamConfig = hashMapOf<String,Any>()
        kStreamConfig[StreamsConfig.APPLICATION_ID_CONFIG] = "stream-test"
        kStreamConfig[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers
        kStreamConfig[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
        kStreamConfig[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
        kStreamConfig[StreamsConfig.NUM_STREAM_THREADS_CONFIG] =1
        return KafkaStreamsConfiguration(kStreamConfig)
    }
}

여기에서
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
으로 설정해야 다음에 나올 Service 코드가 정상동작한다.🤣
설정하지 않으면 parameter 0 of constructor in required a bean of type 이런 오류가 나온다.


KafkaStreamService 구현

@Service
class KafkaStreamService {

    val stringSerde: Serde<String> = Serdes.String()

    @Autowired
    fun buildPipeline(sb : StreamsBuilder) {
        val kStream = sb.stream("testTopic", Consumed.with(stringSerde, stringSerde))
        kStream.filter { key, value ->
            value.contains("test")
        }.to("testStream")
    }
}

이 코드는 testTopic으로 들어오는 값들에서 test라는 값이 들어있는 메시지를 testStream이라는 Topic으로 보내는 코드다. 😊

테스트

item1234를 메시지로 보낼 때 로그

test item1234를 메시지로 보낼 때 로그

😊 테스트 성공 ㅎㅎ

0개의 댓글