컨슈머는 poll() 메서드를 호출하여 Kafka 브로커로부터 메시지를 가져옵니다.
폴링 요청이 브로커에 도달하면, 해당 컨슈머 그룹의 오프셋 커밋 상태와 비교하여 읽을 메시지의 오프셋을 결정합니다.
컨슈머가 자동 커밋 기능을 사용하는 경우, 일정한 간격(주로 auto_commit_interval_ms 설정 값)으로 커밋을 수행하는 타이머가 동작합니다.
타이머가 설정된 간격마다 컨슈머는 최종으로 읽은 메시지의 오프셋을 커밋하여 메시지의 손실을 최소화합니다.
폴링한 메시지는 컨슈머의 내부 버퍼에 수집됩니다. 이 버퍼에는 각 파티션마다 여러 개의 레코드 배치가 포함될 수 있습니다. 컨슈머는 이러한 레코드 배치를 수집하여 메모리에 보관하고 처리 준비를 합니다.
컨슈머는 메모리에 보관된 레코드 배치를 하나씩 가져와서 개별 레코드를 처리합니다.
레코드 처리는 개발자가 구현한 로직에 따라 이루어집니다. 예를 들어, 데이터를 저장하거나 가공하는 작업 등을 수행할 수 있습니다.
위 과정은 poll() 메서드 호출 시마다 반복됩니다. 컨슈머는 주어진 폴링 간격에 따라 메시지를 가져오고, 자동 커밋 타이머가 동작하여 커밋을 수행하며, 수집된 레코드를 처리하는 과정을 반복합니다.
__consumer_offset은 Kafka 내부에서 사용되는 특수한 토픽으로, 컨슈머 그룹 오프셋을 저장하고 관리하는 데 사용됩니다. 컨슈머 그룹이 오프셋을 커밋할 때마다 해당 오프셋 값은 이 특수한 토픽에 저장됩니다. Kafka는 이 정보를 사용하여 각 컨슈머 그룹의 진행 상황을 추적하고, 그룹 내의 각 컨슈머가 파티션의 올바른 오프셋에서 메시지를 읽을 수 있도록 보장합니다.
auto.offset.reset 설정은 컨슈머가 컨슈머 그룹에 처음 참여하거나 특정 파티션에 유효한 커밋된 오프셋을 찾을 수 없는 경우의 동작을 결정합니다. 이 설정을 사용하여 해당 시나리오에서 어떻게 동작할지 지정할 수 있습니다.
(1) earliest: 파티션에 유효한 커밋된 오프셋이 없는 경우, 컨슈머는 해당 파티션에서 가장 이른 오프셋부터 읽기를 시작합니다. 이는 토픽의 시작부터 모든 메시지를 읽는 데 사용됩니다. 모든 메시지를 처음부터 처리하고 싶을 때 유용합니다.
(2) latest: 파티션에 유효한 커밋된 오프셋이 없는 경우, 컨슈머는 해당 파티션에서 가장 최근 오프셋부터 읽기를 시작합니다. 이는 컨슈머가 그룹에 참여한 이후 도착한 새로운 메시지만 소비하도록 합니다. 새로운 메시지만 처리하고 싶을 때 유용합니다.
enable_auto_commit
이 옵션은 컨슈머의 자동 커밋 기능을 활성화 또는 비활성화합니다. 기본적으로 이 옵션은 true로 설정되어 있어 자동 커밋이 활성화됩니다. 자동 커밋이 활성화되면 Kafka 컨슈머는 처리한 메시지의 오프셋을 주기적으로 커밋하여, 컨슈머 그룹의 진행 상태를 기록합니다.
auto_commit_interval_ms
이 옵션은 자동 커밋 간격을 지정합니다. 기본적으로 auto_commit_interval_ms는 5000ms (5초)로 설정되어 있습니다. 컨슈머는 이 간격마다 자동으로 오프셋을 커밋합니다. 따라서 enable_auto_commit이 true로 설정되어 있을 때, auto_commit_interval_ms의 값에 따라 커밋 주기가 결정됩니다.
const { Kafka } = require('kafkajs');
async function consumeMessages() {
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received message: ${message.value}`);
// 여기에서 수동으로 오프셋을 커밋합니다.
await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);
},
});
}
consumeMessages().catch((error) => {
console.error(`Error in consumer: ${error.message}`);
});
await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);
참고) https://blog.rockthejvm.com/optimizing-kafka-clients-a-hands-on-guide/