카프카는 처리량을 늘리기 위해 파티션 개수 와 컨슈머 개수 를 늘려서 운영할 수 있음.
파티션과 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 카프카 운영법
파티션의 데이터를 처리하는 방법은 다수의 프로세스 를 운영하거나, 다수의 쓰레드 를 운영하는 방법이 존재
자바는 Thread를 지원하는 언어이기 때문에 둘 중 어느 방법을 사용하더라도 상관은 없음.
다만, 쓰레드의 동시성을 지원하면서 컨슈머를 운영해야 데이터가 원하는 결과로 처리된다.,
컨슈머는 쓰레드를 이용해서 데이터를 처리하는 2가지 전략이 존재
public class ConsumerWorker implements Runnable{
private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private String recordValue;
public ConsumerWorker(String recordValue) {
this.recordValue = recordValue;
}
@Override
public void run() {
logger.info("thread:{}\t record:{}",Thread.currentThread().getName(), recordValue);
}
}
public class Main {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(List.of("test"));
ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
ConsumerWorker worker = new ConsumerWorker(record.value());
executorService.execute(worker);
}
}
}
}
하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당된다.
하나의 컨슈머는 여러 파티션에 할당될 수 있다.
토픽의 파티션 개수만큼 컨슈머 스레드를 운영하면, 하나의 프로세스에서 모든 파티션을 커버할 수 있음
여기서 주의할점은 구독하고자 하는 토픽의 파티션 개수만큼만 컨슈머 스레드를 운영해야 함.
컨슈머 스레드가 파티션 개수보다 많아지면 할당할 파티션 개수가 더는 없으므로, 파티션에 할당되지 못한 컨슈머 스레드는 데이터 처리를 하지 않게 된다.
컨슈머 랙은 토픽의 최신 오프셋과 컨슈머 오프셋간의 차이다.
프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다.
컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 어플리케이션을 운영한다면 필수적으로 모니터링 해야 하는지표
프로듀서의 데이터 양이 일정함에도 불구하고 컨슈머의 장애로인해 랙이 증가할 수도 있는데, 컨슈머는 파티션 개수만큼 늘려서 병렬처리하며 파티션마다 컨슈머가 할당되어 데이터를 처리한다.
컨슈머 랙을 확인하는 방법은 총 3가지가 있다.
Kafka-consumer-groups.sh
명령어를 사용하면 컨슈머 랙을 포함한 특정 컨슈머 그룹의 상태를 확인할 수 있음.
컨슈머 랙을 확인하기 위해 가장 기초적인 방법은 다음과 같은 명령어를 사용하면 됨
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group {group-name} --describe
사용법은 간단하나, 컨슈머 랙을 확인하는 방법이 일회성에 그치고 지표를 지속적으로 기록하고 모니터링하기에는 부족하다.
컨슈머 어플리케이션에서 KafkaConsumer
인스턴스의 metrics() 메소드를 활용하면 컨슈머 랙 지표를 확인할 수 있다. 컨슈머 인스턴스가 제공하는 컨슈머 랙 관련 모니터링 지표는 3가지로 records-lag-max
, records-lag
, records-lag-avg 임.
다만 컨슈머가 정상 동작할 경우에만 확인할 수 있음,
모니터링 코드를 각 카프카 컨슈머마다 중복해서 작성해야함.
외부 모니터링 종류는 다음과 같다.
등등...
카프카 버로우는 링크드인에서 공개한 오픈소스 컨슈머 랙 체크 툴이다. 버로우를 카프카 클러스터와 연동하면 REST API를 통해 컨슈머 그룹별 컨슈머 랙을 조회할 수 있다.
메소드 | 엔드 포인트 | 설명 |
---|---|---|
GET | /burrow/admin | 버로우 헬스 체크 |
GET | /v3/kafka | 버로우와 연동중인 카프카 클러스터 리스트 |
GET | /v3/kafka/{클러스터 이름} | 클러스터 정보 조회 |
GET | /v3/kafka/{클러스터 이름}/consumer | 클러스터에 존재하는 컨슈머 그룹의 리스트 |
GET | /v3/kafka/{클러스터 이름}/topic | 클러스터에 존재하는 토픽 리스트 |
GET | /v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름} | 컨슈머 그룹의 컨슈머 랙, 오프셋 정보 조회 |
GET | /v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹이름}/status | 컨슈머 그룹의 파티션 정보, 상태 조회 |
GET | /v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}/lag | 컨슈머 그룹의 파티션 정보, 상태, 컨슈머 랙 조회 |
DELETE | /v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름} | 버로우에서 모니터링 중인 컨슈머 그룹 삭제 |
GET | /v3/kafka/{클러스터 이름}/topic/{토픽 이름} | 토픽 상세 조회 |
그러나 모니터링을 위해 컨슈머 랙 지표를 수집, 적재, 알람 설정을 하고 싶다면 별도의 저장소와 대시보드를 구축해야함.