
dependencies {
implementation ("org.apache.kafka:kafka-streams:3.4.0")
}
@EnableKafkaStreams
@EnableKafka
@Configuration
class KafkaConfig(
@Value("\${spring.kafka.bootstrap-servers}")
var bootStrapServers : String
) {
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
fun kafkaStreamConfig() : KafkaStreamsConfiguration{
val kStreamConfig = hashMapOf<String,Any>()
kStreamConfig[StreamsConfig.APPLICATION_ID_CONFIG] = "stream-test"
kStreamConfig[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers
kStreamConfig[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
kStreamConfig[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
kStreamConfig[StreamsConfig.NUM_STREAM_THREADS_CONFIG] =1
return KafkaStreamsConfiguration(kStreamConfig)
}
}
여기에서
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
으로 설정해야 다음에 나올 Service 코드가 정상동작한다.🤣
설정하지 않으면 parameter 0 of constructor in required a bean of type 이런 오류가 나온다.
@Service
class KafkaStreamService {
val stringSerde: Serde<String> = Serdes.String()
@Autowired
fun buildPipeline(sb : StreamsBuilder) {
val kStream = sb.stream("testTopic", Consumed.with(stringSerde, stringSerde))
kStream.filter { key, value ->
value.contains("test")
}.to("testStream")
}
}
이 코드는 testTopic으로 들어오는 값들에서 test라는 값이 들어있는 메시지를 testStream이라는 Topic으로 보내는 코드다. 😊

