Spring Kafka에서 Concurrency는 한 개의 컨슈머 그룹 내에서 특정 토픽의 파티션을 동시에 처리할 수 있는 스레드의 수를 의미합니다. 즉, 하나의 컨슈머 그룹 내에서 여러 스레드가 동시에 메시지를 처리하여 처리량을 높일 수 있도록 하는 설정입니다.
Spring Kafka에서는 @KafkaListener 애노테이션의 concurrency 속성을 통해 Concurrency를 설정할 수 있습니다.
@KafkaListener(topics = "my-topic", groupId = "my-group", concurrency = "3")
public void listen(ConsumerRecord<?, ?> record) {
// 메시지 처리 로직
}
Concurrency 값은 일반적으로 토픽의 파티션 수와 동일하게 설정해야 효과가 있는데 Concurrency를 아무리 늘린다고 해도 파티션의 수가 1개라면 병렬처리는 일어나지 않는다.
또한 하나 하나의 메시지 처리 시간이 길 경우, Concurrency 값을 너무 크게 설정하면 메모리 부족이나 CPU 과부하가 발생할 수 있습니다.
마지막으로 Concurrency를 사용할 때는 오프셋 관리에 유의해야 합니다. 각 스레드가 독립적으로 오프셋을 관리하기 때문에, 장애상황으로 인해 중복 처리나 데이터 손실이 발생하지 않도록 주의해야 합니다.
Spring-kafka만을 사용해서는 직접적으로 파티션의 수를 늘려줄 수가 없어서 Kafka CLI 사용해서 파티션수를 늘려줄 수 있다.
kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 5 --topic my-topic
kafka-topics.sh --zookeeper <주키퍼호스트>:<주키퍼포트> --describe --topic <토픽이름>
변경 전 토픽의 파티션 수 1개

변경 후 토픽의 파티션 수 3개

만약 파티션의 수를 줄여야 하는 상황이 온다면 해당 토픽을 다른 이름과 원하는 파티션의 수로 새롭게 생성 한다음 필요한 경우 데이터 마이그레이션을 진행하고 기존 토픽을 삭제하는 방식을 사용해야한다.