카프카 컨슈머 상세개념

600g (Kim Dong Geun)·2021년 7월 19일
1

카프카 컨슈머

멀티 스레드 컨슈머

  • 카프카는 처리량을 늘리기 위해 파티션 개수컨슈머 개수 를 늘려서 운영할 수 있음.

  • 파티션과 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 카프카 운영법

  • 파티션의 데이터를 처리하는 방법은 다수의 프로세스 를 운영하거나, 다수의 쓰레드 를 운영하는 방법이 존재

  • 자바는 Thread를 지원하는 언어이기 때문에 둘 중 어느 방법을 사용하더라도 상관은 없음.

  • 다만, 쓰레드의 동시성을 지원하면서 컨슈머를 운영해야 데이터가 원하는 결과로 처리된다.,

    • 또한 OOM을 일으키지 않도록 주의할 것.
  • 컨슈머는 쓰레드를 이용해서 데이터를 처리하는 2가지 전략이 존재

    1. 컨슈머 쓰레드는 1개만 실행하고, 데이터 처리를 담당하는 워커 쓰레드를 여러개 실행하는 전략
    2. 컨슈머 인스턴스에서 poll() 메소드를 호출하는 스레드를 여러개 띄워서 사용하는 컨슈머 멀티 스레드 전략.

멀티 워커 쓰레드 전략

  • 브로커로부터 전달받은 레코드들을 1개의 컨슈머 스레드로 받은 데이터들을 병렬로 처리한다면, 더욱 향상된 속도로 처리할 수 있음.
  • Java의 ExcutorService 라이브러리를 사용하면 레코드를 병렬처리하는 스레드를 효율적으로 생성하고 관리할 수 있음.
  • Executors는 데이터 처리 환경에 맞는 다양한 스레드를 제공하는데, 예를들어 작업이후 스레드가 종료되어야 한다면 CachedThreadPool 을 사용할 수 있다.
  • 간단한 예제
public class ConsumerWorker implements Runnable{
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private String recordValue;

    public ConsumerWorker(String recordValue) {
        this.recordValue = recordValue;
    }

    @Override
    public void run() {
        logger.info("thread:{}\t record:{}",Thread.currentThread().getName(), recordValue);
    }
}
public class Main {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(List.of("test"));
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<String, String> record : records) {
                ConsumerWorker worker = new ConsumerWorker(record.value());
                executorService.execute(worker);
            }
        }

    }
}
  • 결과

주의할 점

  • 스레드를 사용함으로써 데이터 처리가 끝나지 않았음에도 불구하고 커밋을 하기 때문에 리밸런싱, 컨슈머 장애 시에 데이터 유실이 발생할 수도 있음.
  • 오토 커밋의 경우, 데이터 처리가 스레드에서 진행중임에도 불구하고 다음 poll() 메소드를 호출
  • 스레드의 처리시간이 다 다르기 때문에 레코드 처리에 있어 중복이 발생하거나 데이터의 역전현상이 발생할 수 있음
  • 따라서 데이터의 역전현상이 발생해도 되며 매우 빠른 처리 속도가 필요한 데이터 처리에 적합하다.

카프카 컨슈머 멀티 쓰레드 전략

  • 하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당된다.

  • 하나의 컨슈머는 여러 파티션에 할당될 수 있다.

  • 토픽의 파티션 개수만큼 컨슈머 스레드를 운영하면, 하나의 프로세스에서 모든 파티션을 커버할 수 있음

  • 여기서 주의할점은 구독하고자 하는 토픽의 파티션 개수만큼만 컨슈머 스레드를 운영해야 함.

    컨슈머 스레드가 파티션 개수보다 많아지면 할당할 파티션 개수가 더는 없으므로, 파티션에 할당되지 못한 컨슈머 스레드는 데이터 처리를 하지 않게 된다.

컨슈머 랙 (Consumer LAG)

컨슈머 랙은 토픽의 최신 오프셋과 컨슈머 오프셋간의 차이다.

  • 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다.

  • 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 어플리케이션을 운영한다면 필수적으로 모니터링 해야 하는지표

  • 프로듀서의 데이터 양이 일정함에도 불구하고 컨슈머의 장애로인해 랙이 증가할 수도 있는데, 컨슈머는 파티션 개수만큼 늘려서 병렬처리하며 파티션마다 컨슈머가 할당되어 데이터를 처리한다.

컨슈머 랙을 확인하는 방법

컨슈머 랙을 확인하는 방법은 총 3가지가 있다.

  1. 카프카 명령어를 사용하는 방법
  2. 컨슈머 어플리케이션에서 metrics() 메소드를 확인하는 방법
  3. 외부 모니터링 툴을 사용하는 방법

카프카 명령어를 사용하여 컨슈머 랙 조회

Kafka-consumer-groups.sh 명령어를 사용하면 컨슈머 랙을 포함한 특정 컨슈머 그룹의 상태를 확인할 수 있음.

컨슈머 랙을 확인하기 위해 가장 기초적인 방법은 다음과 같은 명령어를 사용하면 됨

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group {group-name} --describe

사용법은 간단하나, 컨슈머 랙을 확인하는 방법이 일회성에 그치고 지표를 지속적으로 기록하고 모니터링하기에는 부족하다.

컨슈머 metrics() 메소드를 사용하여 컨슈머 랙 조회

컨슈머 어플리케이션에서 KafkaConsumer 인스턴스의 metrics() 메소드를 활용하면 컨슈머 랙 지표를 확인할 수 있다. 컨슈머 인스턴스가 제공하는 컨슈머 랙 관련 모니터링 지표는 3가지로 records-lag-max, records-lag, records-lag-avg 임.

다만 컨슈머가 정상 동작할 경우에만 확인할 수 있음,

모니터링 코드를 각 카프카 컨슈머마다 중복해서 작성해야함.

외부 모니터링 툴을 사용하며 컨슈머 랙 조회

외부 모니터링 종류는 다음과 같다.

  1. 데이터독 (Data Dog)
  2. 컨플루언트 컨트롤 센터(Confluent Control Center)
  3. 카프카 버로우
  4. 그라파나

등등...

카프카 버로우

카프카 버로우는 링크드인에서 공개한 오픈소스 컨슈머 랙 체크 툴이다. 버로우를 카프카 클러스터와 연동하면 REST API를 통해 컨슈머 그룹별 컨슈머 랙을 조회할 수 있다.

메소드엔드 포인트설명
GET/burrow/admin버로우 헬스 체크
GET/v3/kafka버로우와 연동중인 카프카 클러스터 리스트
GET/v3/kafka/{클러스터 이름}클러스터 정보 조회
GET/v3/kafka/{클러스터 이름}/consumer클러스터에 존재하는 컨슈머 그룹의 리스트
GET/v3/kafka/{클러스터 이름}/topic클러스터에 존재하는 토픽 리스트
GET/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}컨슈머 그룹의 컨슈머 랙, 오프셋 정보 조회
GET/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹이름}/status컨슈머 그룹의 파티션 정보, 상태 조회
GET/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}/lag컨슈머 그룹의 파티션 정보, 상태, 컨슈머 랙 조회
DELETE/v3/kafka/{클러스터 이름}/consumer/{컨슈머 그룹 이름}버로우에서 모니터링 중인 컨슈머 그룹 삭제
GET/v3/kafka/{클러스터 이름}/topic/{토픽 이름}토픽 상세 조회
  • 버로우는 다수의 카프카 클러스터를 동시에 연결하여 컨슈머 랙을 확인한다.
  • 기업환경에서는 카프카 클러스터를 2개이상으로 구축하고 운영하는 경우가 많기 때문에 한번의 설정으로 다수의 카프카 클러스터 컨슈머 랙을 확인할 수 있다는 장점도 있음

그러나 모니터링을 위해 컨슈머 랙 지표를 수집, 적재, 알람 설정을 하고 싶다면 별도의 저장소와 대시보드를 구축해야함.

컨슈머 랙 모니터링 아키텍쳐

  • 앞서 언급했듯이 버로우를 통해 컨슈머 랙을 모니터링할 때는 컨슈머 랙을 개별적으로 모니터링 할 수 있는 별개의 저장소와 대시보드를 사용하는 것이 효과적.
  • 컨슈머랙 모니터링을 위해 사용할 수 있는 저장소와 대시보드는 다양하지만 빠르게, 무료로 설치할 수 있는 아키텍쳐를 제안
컨슈머 랙 모니터링 아키텍처 준비물
  1. 버로우 : REST API를 통해 컨슈머 랙을 조회
  2. 텔레그래프 : 데이터 수집 및 전달에 특화된 툴, 버로우를 조회하여 데이터를 엘라스틱 서치에 전달
  3. 엘라스틱 서치 : 컨슈머 랙 정보를 담는 저장소
  4. 그라파나: 엘라스틱서치의 정보를 시각화하고 특정 조건에 따라 슬랙 알람을 보낼 수 있는 웹 대시보드 툴
profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

0개의 댓글