Kafka Consumer With Java

김파란·2024년 12월 11일

Kafka

목록 보기
7/8

오프셋 재설정 동작

  • 오프셋 리셋하는 방식에는 3가지가 있다
  • 컨슈머가 특정 파티션에 대해 유효한 커밋된 오프셋을 찾을 수 없을 때 사용된다
    • 새로운 컨슈머 그룹이 생성되었거나, 기존 컨슈머 그룹의 오프셋이 만료되거나 삭제된 경우
  • none: 설정된 컨슈머 그룹이 없을 때 동작하지 않는다
    • 즉 애플리케이션 시작 전에 컨슈머그룹을 설정해야 한다
    • 컨슈머가 유효한 커밋된 오프셋을 찾지 못하면, 데이터를 읽는 대신 예외를 발생시킨다
  • earliest: 토픽을 맨 처음부터 보겠다는 뜻이다
    • 커밋된 오프셋이 없는 경우에만 처음부터 읽는다
  • latest: 방금 보낸 새 메시지만 읽는다는 의미이다
    • 기존 메시지는 무시하고 애플리케이션 시작된 이후 들어오는 메시지를 처리한다
    • 기본값이다
  • properties.setProperty("auto.offset.reset", "earliest");

오프셋 재설정 동작

  • 카프카는 기본적으로는 7일간 데이터를 저장한다
  • 즉 컨슈머가 7일간 중지되면 읽으려 하는 오프셋이 무효화된다
  • 이럴 때 알아야하는 것이 컨슈머 오프셋 재설정 동작이다
  • offset.retention.munutes로 기간을 제어할 수 있다

Polling

  • 컨슈머를 실행할 때 Polling을 한다고 데이터를 바로 가져오지 않는다
  • 컨슈머 그룹에 join을 하고 해당 토픽의 파티션을 할당한다
  • 파티션에서 커밋된 오프셋을 찾고 메시지를 받는다
public class ConsumerDemo {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer");

        String groupId = "my-java-application";
        String topic = "demo_java";

        // 프로듀서 환경설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());

        // 컨슈머 그룹
        properties.setProperty("group.id", groupId);

        // offset reset
        properties.setProperty("auto.offset.reset", "earliest");

        // 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 토픽 구독
        // 컬렉션이나 패턴을 넘겨주면 된다
        consumer.subscribe(Arrays.asList(topic));

        // 데이터 꺼내오기
        while (true) {
            log.info("Polling");

            // timeout
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {
                log.info("Key: " + record.key() + ", Value: " + record.value());
                log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
            }
        }

    }
}

우아한 종료

public class ConsumerDemoWithShutdown {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer");

        String groupId = "my-java-application";
        String topic = "demo_java";

        // 프로듀서 환경설정
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());

        // 컨슈머 그룹
        properties.setProperty("group.id", groupId);

        // offset reset
        properties.setProperty("auto.offset.reset", "earliest");

        // 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 메인 스레드 참조
        final Thread mainThread = Thread.currentThread();

        // Shutdown Hook 추가
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
                // 자바 interrupt와 같이 poll 상태일때 wakeup을 하면 Wakeup 예외를 던진다
                consumer.wakeup();

                // 셧다운 훅이 메인 스레드가 끝날 때까지 대기한다
                // 우아한 종료
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        try {
            // 토픽 구독
            // 컬렉션이나 패턴을 넘겨주면 된다
            consumer.subscribe(Arrays.asList(topic));

            // 데이터 꺼내오기
            while (true) {
                log.info("Polling");

                // timeout
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                }
            }

        } catch (WakeupException e) {
            log.info("Consumer is starting to shut down");
        } catch (Exception e) {
            log.error("Unexpected exception in the consumer", e);
        }finally {
            consumer.close(); // 컨슈머 종료 및 오프셋 커밋
            log.info("The consumer is now gracefully shut down");
        }



    }
}

컨슈머 그룹

  • 같은 컨슈머 그룹으로 컨슈머를 실행할 때마다 파티션들이 리밸런싱 된다
    • 컨슈머1이 기존 파티션 1,2,3을 가지고 있다고 했을 때 컨슈머가 조인하게 되면 컨슈머1이 파티션 1,2 컨슈머2가 파티션 3을 폴링하도록 재배치하게 된다
  • 컨슈머가 그룹에 합류하거나 나갈 때마다 파티션이 이동하게 된다
  • 파티션이 이동하는 현상을 리밸런싱이라고 한다

밸런싱 전략

  • 적극적 리밸런싱
    • 기본 전략으로 컨슈머가 들어오면 모든 컨슈머는 작업을 중단하고 무작위로 재할당 받는다
    • Stop the world: 짧은 시간에 모든 걸 멈추는 현상이 발생한다
    • 무작위이기 때문에 같은 파티션에 간다는 보장이 없고 사용을 중지한다는 단점이 있다
  • 협력적 리밸런싱
    • 파티션을 작은 그룹으로 나눠서 한 커슈머에서 다른 컨슈머로 재할당한다
    • 중단 없이 데이터를 처리한다는 장점이 있다
    • 여러번 반복에 거쳐서 하기때문에 안정적으로 할당받을 수 있다

적극적 리밸런싱

  • RangeAssignor: 파티션을 토픽당 기준으로 할당해서 균형을 맞추기 어렵다
  • RoundRobin: 모든 파티션이 모든 토픽에 걸쳐서 할당된다. 균형적이다
  • StickyAssignor: 초반에는 라운드 로빈처럼 하다가 컨슈머가 이동하게 되면 이동을 최소화하기 위해 파티션 이동을 최소화 한다

협력적 리밸런싱

  • CooperativeStickyAssignor: 데이터 이동 횟수를 최소화하기 위해 파티션 이동 횟수를 최소화한다
  • kafka Conenct에서는 협력적 리밸런싱이 기본값이다
  • Kafka Streams에서는 StreamsParitionAssignor이 기본값이다

정적 그룹 멤버십

  • 컨슈머가 그룹에 합류하거나 나갈 때마다 리밸런싱이 발생한다
    • 왜냐하면 카프카는 모든 컨슈머가 모든 파티션을 읽어들이게 하기 때문이다
  • 컨슈머가 나가더라도 할당을 바뀌지 않게 할 수 있다
    • 컨슈머가 나갔다 들어오면 재할당을 받는 이유가 새 멤버 ID를 받기 때문이다
    • 그룹 인스턴스 ID를 컨슈머 구성 값의 일부로 특정하면 컨슈머는 정적 멤버가 된다
  • 세션 시간 내에 다시 합류하면 자동으로 재할당되고, 리밸런싱은 일어나지 않는다
// 환경설정 추가
properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());

전달 의미론

최대 한 번(At Most Once)

  • 메시지 배치를 받자마자 오프셋을 커밋한다
  • 처리가 잘못되면 메시지가 손실된다
  • 메시지를 읽으면 바로 커밋하기 때문에 메시지를 처리하지 못하더라도 재처리가 불가능하다
  • 두 번은 절대 안읽지만 안 볼 때도 있다

최소 한 번(At Least Once)

  • 메시지 처리가 잘못되면 다시 읽을 기회를 준다
  • 메시지를 읽을 기회가 두 번이기 때문에 처리가 멱등이어야 한다
  • 메시지 커밋 전에 오류가 발생하면 커밋이 진행되지 않으므로, 커밋 전 과정에서 다시 처리가 이루어진다. 이를 방지하기 위해 멱등성을 보장해야 한다
  • 애플리케이션에서는 적어도 최소 한 번을 보장해야 한다

정확히 한 번(Exactly once)

  • 카프카에서 데이터를 가져오고 트랜잭션 API를 사용하여 다시 Kafka로 보낼 때 얻을 수 있다
  • Kafka Streams API를 사용하면 쉽게 달성할 수 있다

멱등 컨슈머

  • ID를 이용해서 처리를 하면 멱등성을 보장받을 수 있다
    • 이건 잘 알아보고 해야 한다 ID가 없을 수도 있고 ID로 멱등성을 보장하지 않는 곳도 있다
  • 방법 1: 카프카 데이터상의 있는 내용을 이용해서 ID를 정의한다
  • 방법 2: 데이터 내의 ID를 사용한다
    • 이 방법이 제일 나은 방법이다
	// 방법 1
    // 카프카 레코드 좌표를 사용하여 ID를 정의

    // 자체 아이디가 없더라도 유니크한 아이디를 만들어서 사용
   String id = record.topic() + "_" + record.partition() + "_" + record.offset();
  	 try {
     IndexRequest indexRequest = new IndexRequest("wikimedia")
               .source(record.value(), XContentType.JSON)
               .id(id);
         }
         

Consumer Offset Commit Strategies

  • 오토커밋과 수동커밋이 있다

(1). 자동커밋

  • enable.auto.commit=true이면 오토커밋으로 배치가 동시에 처리된다
  • 기본설정으로 오프셋이 정기적으로 커밋된다
  • 최소 한 번 읽기가 기본 설정값이다

동작방식

  • poll()메서드가 코드에 호출될 때 항상 커밋된다
  • 그러면 auto.commit.interval.ms가 경과한다
  • auto.commit.interval.ms가 5초인 경우 poll을 호출할 때마다 5초 간격으로 커밋된다
    • 그전에 메시지를 성공적으로 처리해야 한다. 그러지 않으면 메시지가 소실될 수도 있다
  • 마지막으로 poll 호출 전에 처리된 모든 메시지를 비동기적으로 처리한다
  • 5초로 설정해도 폴을 하는 동안에는 커밋을 하지 않아 시간이 지날 수 있다

(2).수동커밋

  • enable.auto.commit=false으로 수동으로 오프셋을 커밋한다

동작방식

  • commitSync, commitAsync를 호출해 오프셋을 커밋을 한다
   // 수동 커밋
   consumer.commitSync();
   consumer.commitAsync();

컨슈머 작동방식

  • 컨슈머 코디네이터와 통신하는 컨슈머가 있다
  • 컨슈머 코디네이터는 컨슈머가 계속 사용중인지 감지하는데 사용하는 브로커이다
  • hearbeat 스레드와 poll 스레드가 있다
  • 컨슈머 코디네이터는 hearbeat매커니즘을 사용하는 브로커이다
  • 데이터를 빠르게 처리하고 자주 폴 하는 것이 좋다

1). Heartbaet 쓰레드

  • heartbeat.interval.ms 기본값 3초로 Kafka로 데이터를 보낸다
  • 컨슈머가 사용 중임을 알리기 위한 heartbeat 데이터뿐이다
  • 보통 session.timeout.ms의 1/3으로 설정한다
  • heartbeat 타임아웃 시간 중에 전송되지 않으면 컨슈머가 사용 중이 아닌 것으로 간주한다
  • 빠르게 리밸런싱을 하려면 시간을 짧게 하는 것이 좋다

2). Poll 쓰레드

  • max.poll.interval.ms의 기본값은 5분으로 두번의 poll사이에 시간을 얼마나 둘지이다
  • 처리에 5분이 넘게 걸릴 경우 컨슈머가 사용중이 아닌 것으로 간주한다
  • max.poll.records는 기본값이 500으로 요청 다 한번에 가져올 레코드 개수이다
  • fetch.min.bytes는 기본값이 1로 요청당 kafka에서 가져올 최소 데이터 개수이다
    • 이 값을 높이면 레이턴시를 대가로 요청 수를 낮춰 처리량을 높일 수 있다
    • 1MB가 되기전까지는 데이터가 필요없다고 말하는 것과 같다
  • fetch.max.wait.ms는 기본값이 500, 즉 0.5초이다
    • bytes에 다 차기전까지 데이터를 보내지 않을 최대 시간이다
  • max.partition.fetch.bytes는 기본값이 1MB이다
    • 서버가 반환할 파티션 당 데이터의 최대 양이다
    • 파티션이 100개라면 메모리에 100MB를 적재해야 하므로 필요에 따라 조정해야 한다

컨슈머와 파티션 리더

  • 컨슈머는 기본적으로 파티션 리더에서 데이터를 읽어온다
  • 하지만 데이터센터가 여러개라면 레이턴시와 네트워크 비용이 높아져 비효율적이게 될 것이다
  • 그래서 2.4부터 가까운 레플리카에서 읽어오도록 변경됐다
  • 레이턴시와 비용적으로 절감할 수 있게 되었다
  • 이를 컨슈머 랙 인식을 할 수 있게 설정을 해줘야 한다
    • AWS ID를 만들고 class.replicas 해줘야 한다

0개의 댓글