토픽명은 한정 정하면 바꾸기 어렵기 때문에 동료들과 컨벤션을 정하여 패턴을 정하는 것이 중요합니다.
토픽의 파티션 개수 계산
Retention 시간 (메모리 저장 기간)
spring boot에서 kafka로 생성할 때 사용할 수 있는 Template는 3가지 입니다.
kafka의 template 중에서 가장 기본적인 template입니다. ProducerFactory 클래스를 사용하여 생성됩니다.
만약 Transactions을 사용하지 않는다면, DefaultKafkaProducerFactory가 producer를 싱글톤으로 생성되는데
이런 경우에 flush() 메서드가 호출된다면 같으 producer를 사용하는 다른 쓰레드에서는 지연이 발생하게 됩니다.
이를 방지하고자 producerPerThread(default: false)라는 속성값이 생겼습니다. 이 속성 값은 각 쓰레드에서 별도의 생성자를 만들고 캐싱 처리를 합니다. 다 처리하였으면 closeThreadBoundProducer() 메소드를 호출하여 만든 생성자를 닫아줘야 합니다.
가장 기본적인 template인 만큼 스프링이 기본적으로 제공합니다.
기본적으로 비동기처리 입니다. (빠른 스트림 처리를 위해)
동기로 처리 할 수도 있지만 카프카의 사용 목적과 맞지 않기 때문에 사용하지 않는 것이 좋습니다.
사용자가 직접 ProducerFactory를 정의 할 수 있습니다.
Message 객체를 이용하여 보내고 header에 아래와 같은 값들을 포함하여 전송 할 수 있습니다.
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP
전송하는 토픽별로 옵션을 다르게 설정할 수 있는 template입니다. 하지만 Transactions, Execute, flush 등의 커멘드는 지원하지 않습니다.
Consumer가 특정 메시지를 전달 받았는지 확인이 가능한 Template입니다.
Header에 아래와 같은 값을 담아서 보내주고 잘 받았는지 응답값을 받습니다.
KafkaHeaders.CORREALATION_ID // 요청과 응답을 연결
KafkaHeaders.REPLY_TOPIC // 응답 토픽
KafkaHeaders.REPLY_PARTITION // 응답 토픽의 파티션
kafka consumer의 타입에는 record 타입과 batch 타입이 존재한다.
record 타입은 consumer의 기본 타입으로 1개의 레코드를 처리할 때 사용된다
batch 타입은 record와 비슷하지만 한번에 여러개의 레코드를 처리 할 수 있다는 차이점이 있다.
Message listener Container는 Message listener를 관리하는 컨테이너다.
이를 사용하면 start, stop, pause, resume 등의 메소드를 사용할 수 있다.
KafkaMessageListenerContainer를 1개 이상 사용하는 멀티 쓰레드다.
stop, start 등 처리시 foreach로 순차적으로 실행된다.
1번의 경우 싱글 쓰레드로 한번에 한개의 데이터를 읽을 수 있다. 2번의 경우는 멀티 쓰레드로 한번에 여러개의 데이터를 읽을 수 있다. kafka를 사용하는
이전에는 복잡하게 다 생성시켜 매핑을 시켜줘야 했지만 지금은 spring boot에서 autoConfiguration을 지원해줘 ConcurrentKafkaListenerContainerFactoryConfiguer 쉽게 설정 할 수 있고 다양한 설정을 property로 손쉽게 설정이 가능하다
@KafkaListener와 마찬가지로 이전에는 복잡하게 유효성 조건을 등록했으나 이제는 KafkaListenerEndpointRegistrar에서 손쉽게 등록 할 수 있다.
Spring boot에서는 KafkaAdmin이 Bean으로 autoConfiguration 되어 모든 설정이 default 값으로 자동으로 생성된다.
spring boot에서 kafka 내부의 brokers, topics, partition 등을 관리해주는 역할을 하는 것이 admin Clinet이고 이는 KafkaAdmin에 설정된 셋팅 값을 통해서 생성된다.
KafkaAdmin 객체가 자동으로 생성되는 것과 달리 admin client는 직접 생성해줘야 한다.
Spring boot에서는 KafkaAdmin Bean이 자동으로 생성된다.
kafka에 데이터를 생성하는 template 양식이다.
+) Spring에서 kafka를 사용할 때 생성한 데이터가 브로커에 잘 전달 됐는지 확인 하는 설정은 acks 이다.
acks는 브로커로 부터 데이터가 왔다고 응답받는 수 이다.
3개의 브로커에서 모두 응답을 받는다면 acks = 3
1개의 leader 브로커에서만 응답을 받는다면 acks = 1
데이터의 저장 여부는 상관없이 없다면 akcs = 0
acks는 브로커로부터 응답을 받기 때문에 acks가 높을 수록 신뢰성이 높아지지만 속도는 내려간다. spring의 acks defulat 값은 -1 (all) 이다.
@Bean
fun kafkaCustomTemplate(): KafkaTemplate<String, ChatDto>{ // kafka에 데이터를 주고 받기 위한 template을 생성해준다. kafka는 key , value 형식의 저장소임으로 key는 String, value는 ChatDto 를 사용한다.
return KafkaTemplate(kafkaCustomProducerFactory()) // template을 실질적으로 생성해주는 producerFactory를 넣어준다.
}
private fun kafkaCustomProducerFactory(): ProducerFactory<String, ChatDto> { // template을 생성하는데 디테일한 설정값을 넣어주는 producerFactory이다.
val config : HashMap<String, Any> = HashMap() // 설정값을 셋팅해준다.
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092" // 해당 template은 kafka의 어떤 서버와 연결할 것인지
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java // key의 직열화는 어떻게 할 것인지
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java // value의 직열화는 어떻게 할 것인지
return DefaultKafkaProducerFactory(config) // 설정을 셋팅했으면 실질적으로 template을 생성해주는 Factory에 설정값을 넣어서 호출한다. 여기서는 default factory로 생성하였지만 이외에도 다양한 생성자가 존재한다.
}
데이터 보내기
@PostMapping(
value = ["/api/v1/kafka/chat"]
)
fun testChatDto(@RequestBody dto:ChatDto){
kafkaCustomTemplate.send("viva2", dto) // (토픽, 생성할 값)
}
보낸 값
{
"type" : "ENTER",
"sender" : "viva",
"message" : "test case 1"
}
kafka에서 실제로 받은 값
{"type":"ENTER","sender":"viva","message":"test case 1","createdAt":"2022.03.02 16:07"}
kafka에서 데이터를 받아와 사용하는 cunsumer를 spring에서는 listener라고 지칭한다.
kafka listener를 생성하는 방법에는 2가지가 있다.
이전에도 언급하였지만 listener의 종류에는 2가지가 있다.
listener은 thread-safe 하지 않기 때문에 Bean으로 등록한 listener을 여러곳에서 주입받아서 사용하면 안된다. listener은 한 곳에서 호출되고 처리되어야 한다.
MessageListenerContainer은 2개로 구분된다.
KafkaMessageListenerContainer 구현
@Bean
fun kafkaMessageListenerContainer(): KafkaMessageListenerContainer<String, String>{ // kafkaMessageListenerContainer는 single thread 이다.
val containerProperties = ContainerProperties("viva") // 소비하려는 토픽을 설정해준다.
containerProperties.setGroupId("viva-container") // 소비 그룹을 생성한다. * 반드시 설정해줘야함 소비 그룹이 있어야 id도 생성되고 오류가 안남
containerProperties.ackMode = ContainerProperties.AckMode.BATCH // 추가적인 설정 모드를 설정 할 수 있음
containerProperties.messageListener = DefaultMessageListener()
return KafkaMessageListenerContainer(containerFactory(),containerProperties) // 생성한 cunsumerFactory를 등록해주고 앞서 설정한 container 설정값을 등록해준다.
}
private fun containerFactory(): ConsumerFactory<String, String> { // 소비자 cunsumer의 설정 값을 설정하여 cunsumerFactory를 생성한다.
val props: HashMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
ConcurrentMessageListenerContainer 구현
@Bean
fun concurrentKafkaListenerContainer(): ConcurrentKafkaListenerContainerFactory<String, ChatDto>{ // 1개 이상의 consumerFacotry를 사용하는 multi thread 이다.
val factory = ConcurrentKafkaListenerContainerFactory<String,ChatDto>() // container 생성
factory.setConcurrency(1) // 병렬 처리를 위한 복제품 생성
factory.consumerFactory = cumsumerFactory() // container에 등록할 cunsumerFactory 설정
return factory
}
private fun cumsumerFactory() : ConsumerFactory<String, ChatDto>{ // 소비자 cunsumer의 설정 값을 설정하여 cunsumerFactory를 생성한다. containerFactory() 메소드와 동일하다.
val config : HashMap<String, Any> = HashMap()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java // kafka의 데이터를 역직열화 할 key 타입
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java // kafka의 데이터를 역직열화 할 value 타입
return DefaultKafkaConsumerFactory(config,StringDeserializer(),JsonDeserializer(ChatDto::class.java)) // 역직열화 할 value 값이 object 이기 때문에 직접 주입해줘야 오류가 발생하지 않는다.
}
@KafkaListener을 사용하여 Consumer을 구현 할 수있다.
초기에는 MessageListenerContainer을 직접 설정해서 구현했지만 최근에는 KafkaListener 어노테이션을 통해서 쉽게 구현이 가능하다. 이를 사용하기 위해선 kafka config 파일에 Bean으로 ConcurrentKafkaListenerContainerFactory 거나 KafkaMessageListenerContainer를 꼭 kafkaListenerContainerFactory이름으로 등록해줘야한다.
다른 이름으로 등록하면 KafkaListener 어노테이션을 단 메소드가 데이터를 받아오지 못한다. ConcurrentMessageListenerContainer 구현에서 이름을 concurrentKafkaListenerContainer이렇게 설정해줘서 매핑이 안되는 이슈가 발생하여 찾아보니 반드시 kafkaListenerContainerFactory이름으로 bean을 등록해야 한다고하니 꼭 하라는 대로 하자
@KafkaListener(id = "viva_listener", topics = ["viva"])
fun listen(message : String,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic : String,
@Header(KafkaHeaders.OFFSET) offset : Long,
) {
println("=======receive==========")
print(message)
println("=======TimeStamp==========")
println(timestamp)
println("=======TOPIC==========")
println(topic)
println("=======offset=========")
println(offset)
}
@KafkaListener(id = "viva_listener_chatDto", topics = ["viva2"], containerFactory = "concurrentKafkaListenerContainer")
fun listenChatDto(message : ChatDto,
) {
println("=======receive==========")
print(message.toString())
}