쿠버네티스에서 카프카 리밸런싱 방지하기에서 소개한 카프카 리밸런싱과 스태틱 멤버십에 대해 간단한 예제로 실습해 보려고 합니다.
해당 실습은 쿠버네티스 환경이 아닌 어플리케이션 레벨의 카프카 리밸런싱과 스태틱 멤버십 예제입니다. 원 글에서 언급한 쿠버네티스를 활용한 실습은 진행하지 않는다
는 점 참고해 주시면 감사하겠습니다.
3개의 파티션으로 구성된 토픽에 2개의 컨슈머가 존재하는 상황을 만들어 봅시다.
도커 컨테이너로 카프카를 실행한 뒤 아래 명령어로 토픽을 생성했습니다.
bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092 --partitions 3
간단하게 토픽에 메시지를 보낼 수 있는 API를 작성했습니다.
@Configuration
public class Config {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void produceToPartition0() {
kafkaTemplate.send("topic1",0,"","from partition0");
}
public void produceToPartition1() {
kafkaTemplate.send("topic1",1,"","from partition1");
}
public void produceToPartition2() {
kafkaTemplate.send("topic1",2,"","from partition2");
}
}
@Slf4j
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService producerService;
@GetMapping
public void sendAll() {
producerService.produceToPartition0();
producerService.produceToPartition1();
producerService.produceToPartition2();
}
@GetMapping("/0")
public void send0() {
producerService.produceToPartition0();
}
@GetMapping("/1")
public void send1() {
producerService.produceToPartition1();
}
@GetMapping("/2")
public void send2() {
producerService.produceToPartition2();
}
}
컨슈머 그룹이 동일한 두 개의 컨슈머를 각각의 어플리케이션에 실행할 예정입니다.
@Configuration
public class Config {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "CONSUMER-GROUP-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
@Slf4j
public class ConsumerService {
@KafkaListener(topics = "topic1", groupId = "CONSUMER-GROUP-1")
void consumer_1(String data) {
log.info("Consumer 1 received data {}", data);
}
}
컨슈머2는 구분을 위해 Consumer2라고 작성한 로그를 제외하면 모든 부분이 컨슈머1과 동일합니다.
@Configuration
public class Config {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "CONSUMER-GROUP-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
@Slf4j
public class ConsumerService {
@KafkaListener(topics = "topic1", groupId = "CONSUMER-GROUP-1")
void consumer_2(String data) {
log.info("Consumer 2 received data {}", data);
}
}
스태틱 멤버십이 설정되어있지 않은 상태에서 리밸런싱이 발생하는지 확인해 봅시다.
처음 실행한 컨슈머1에서 3개의 파티션이 모두 할당되어 있었습니다. 하지만 컨슈머2가 실행되어 리밸런싱이 발생했고 컨슈머1은 최종적으로 2번 partition을 처리하게 되었습니다.
producer API를 이용해 메시지를 생성하고 로그를 확인해 봅시다.
Consumer1이 partition2를, Consumer2가 partition0과 partition1을 처리하고 있습니다.
역시 리밸런싱이 발생했습니다. 마찬가지로 producer API를 이용해 메시지를 생성하고 로그를 확인해 봅시다.
Consumer1이 partition0과 partition1을, Consumer2가 partition2를 처리하고 있습니다.
리밸런싱 후 이전과 다른 파티션을 담당하고 있음을 확인할 수 있습니다.
group.instance.id값을 설정하면 스태틱 멤버십으로 인식됩니다.
각각의 Config에 다음의 설정을 추가합니다.
config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "cm");
config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "cm2");
application.yml파일로 설정을 제어하고 있다면 다음과 같이 작성하면 됩니다.
spring:
kafka:
consumer:
bootstrap-servers: "localhost:9092"
group-id: "CONSUMER-GROUP-1"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
group:
instance:
id: "cm2"
Consumer1이 partition0과 partition1을, Consumer2가 partition2를 처리하고 있습니다.
다시 돌아온 컨슈머2가 이전과 동일하게 partition2를 담당하고 있으며 컨슈머1에서 리밸런싱이 일어나지 않았습니다. (여러번 컨슈머2를 재시작해도 동일한 결과를 얻습니다)
스태틱 멤버십은 session.timeout.ms와 max-poll-interval-ms동안 리밸런싱이 발생하지 않습니다. 만약 session.timeout.ms또는 max-poll-interval-ms를 초과해 컨슈머2가 돌아오지 않는다면 이때는 리밸런싱이 발생합니다.