🚨해당 글은 Spring Kafka에 대한 사용방법과 실제 업무의 경험을 곁들인 적용법을 다루고 있습니다.
Apache Kafka에 대한 내용은 다루지 않고 있음을 참고부탁드립니다.
이전글에서는 spring-kafka
에서의 Producer
에 관하여 정리하였었다. 이번글에서는 spring-kafka
를 통한 Consumer
방식에 대해 정리하겠다.
우선은 기본적인 사용방식에 대해서 먼저 정리를 하겠다. spring-kafka
문서에서는 카프카 메세지를 수신하기 위해서 MessageListenerContainer
와 message listener
또는 @KafkaListener
를 사용하라고 한다.
현재 MessageListenerContainer
에 지원되는 MessageListener
는 8개의 Interface
를 지원한다.
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
간단하게 설명하면 단건
, 다건
, Manual Commit
, Consumer
에 해당되는 조합의 Interface
가 제공된다. 자세한 내용은 다음 문서에 자세히 설명되어 있다.
Message Listener Containers
MessageListenerContainer
의 구현체로는 2가지가 제공된다.
KafkaMessageListenerContainer
인스턴스를 제공하여 멀티 스레드를 제공한다.spring-kafka
에서 메세지 수신을 위해 제공하는 방식중 하나인 @KafkaListener
는 기본 설정으로는 kafkaListenerContainerFactory
으로 등록된 bean
으로 ConcurrentMessageListenerContainer
를 생성하여 메세지를 수신한다.
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
@Component
@Slf4j
public class MyKafkaEventListener {
@KafkaListener(groupId = "myGroup", topics = "TEST")
public void test(ConsumerRecord<String, String> data, Acknowledgment acknowledgment, Consumer<String, String> consumer){
System.out.println();
}
@KafkaListener(groupId = "myGroup", topics = {"TEST1", "TEST2", "TEST3","TEST4","TEST5"})
public void test1(ConsumerRecord<String, String> data, Acknowledgment acknowledgment, Consumer<String, String> consumer){
log.info(data.topic());
acknowledgment.acknowledge();
}
}
위의 설정은 기본적인 설정으로 @KafkaListener topics
에 정의된 topic
에 message
가 수신되면 해당 메소드가 실행되게 된다.
ContainerProperties
에 setAckMode
를 설정하게 된다면 ConsumerConfig
의 enable.auto.commit
값이 false
로 설정되며 직접 commit
를 설정할 수 있게된다.
ContainerProperties.setAckMode를 참고하면 각 AckMode
에 따라 Commit
이 어떤 방식으로 수행되는지 확인 할 수 있다.
지금까지는 기본적인 사용방법에 대해서 정리하였다. 이렇게 설정된 서버의 로그를 확인해보면 내 생각과 살짝 다르게 동작하는것이 아닐까하는 로그를 확인할 수 있었다.
INFO 29333 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: []
INFO 29333 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-1, groupId=myGroup] Adding newly assigned partitions: TEST-0
INFO 29333 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-3, groupId=myGroup] Adding newly assigned partitions: TEST1-0, TEST2-0, TEST3-0, TEST4-0, TEST5-0
INFO 29333 --- [ntainer#1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-4, groupId=myGroup] Adding newly assigned partitions: TEST3-1
INFO 29333 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST-0]
INFO 29333 --- [ntainer#1-1-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST3-1]
INFO 29333 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST1-0, TEST2-0, TEST3-0, TEST4-0, TEST5-0]
@KafkaListener
를 2개를 선언하였으며 ConcurrentMessageListenerContainer.setConcurrency
를 2를 설정하였다. 설정한 예상대로 0-0-C-1
, 0-1-C-1
, 1-0-C-1
, 1-1-C-1
4개의 쓰레드가 생성되었다. 그러나 해당 쓰레드에서 수신하고 있는 Topic-Partition
이 예상과 다르게 분배되어있는 것이였다. 나는 TEST1-5
토픽의 파티션 6개가 2개의 쓰레드에 분배되어 수신을 할것으로 생각하였다.
Partition Assignment Strategy
지금까지 순차보장을 위해 Topic
에 하나의 Partition
을 생성하여 사용한 경우 concurrency
를 2이상 설정한 경우 수행되지 않는 쓰레드를 생성하였던 것이였다. 확인 결과 Consumer
설정 중 partition.assignment.strategy
이 존재하였다.
Range Partition Assignment Strategy
기본 설정은 RangeAssignor
방식으로 Topic
의 Partition
이 순서대로 Consumer
에 할당되는 전략이였다. 즉 Topic
이 수많더라도 상관없이 해당 Topic
의 Partition
의 순서에 따라 Consumer
가 결정되는 전략이다.
Round Robin & Sticky Partition Assignment Strategies
Round Robin
방식과 Sticky
방식은 Consumer
에 Partition
를 균등하게 분배하는 방식이며 두 방식의 차이는 Rebalancing
이 발생할시 기존 Consumer
에 할당된 Partition
를 전체를 다시 재할당 하는 방식은 Round Robin
방식이며 영향을 받지 않는 Consumer
와 Partition
의 할당을 가능한 유지하는 방식이 Sticky
방식이다. 즉 초기 할당된 결과는 같으나 Rebalancing
이 발생 할 경우 서로의 분배 결과가 다를수 있다.
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList(StickyAssignor.class));
return props;
}
INFO 30569 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-2, groupId=myGroup] Adding newly assigned partitions:
INFO 30569 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: []
INFO 30569 --- [ntainer#1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-4, groupId=myGroup] Adding newly assigned partitions: TEST2-0, TEST3-1, TEST5-0
INFO 30569 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-1, groupId=myGroup] Adding newly assigned partitions: TEST-0
INFO 30569 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-myGroup-3, groupId=myGroup] Adding newly assigned partitions: TEST1-0, TEST3-0, TEST4-0
INFO 30569 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST-0]
INFO 30569 --- [ntainer#1-1-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST2-0, TEST3-1, TEST5-0]
INFO 30569 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [TEST1-0, TEST3-0, TEST4-0]
위와 같이 Round Robin
방식은 전체를 다시 재할당 하는 방식이기 때문에 Message
수신에 중단이 반드시 발생하게 된다. Sticky
방식은 기본적으로는 재할당 되는Partition
은 중단이된다. 이러한 중단없이 다시 할당하는 방식은 CooperativeSticky
이다. 간단하게 설명하면 두번의 Rebalancing
를 통해서 첫번째 Rebalancing
에서는 영향을 받는 Consumer
의 Partition
을 종료시키며 두번째 Rebalancing
에서 할당될 Consumer
정보를 넘겨 해당 Consumer
에 할당하는 방식으로 거의 중단없이 Message
수신이 가능한 방식이다. 자세한 설명은 다음 강의를 참고하면된다.
Concurrency
라는 단어는 개발자 입장에서 매우 자극적인 단어이다. 이러한 단어만 보고 단순히 지금까지 ConcurrentMessageListenerContainer
를 사용하며 Concurrency
를 2이상 설정하여 사용하였었는데 잘못된 사용법이였던것이다. 이번 Spring Kafka
정리를 통하여 그 동안 놓치고 있었던 설정들에 대해 알 수 있었던 시간이 되었다.