Kafka는 분산 메세지 스트리밍 플랫폼으로, 대용량 데이터를 안정적으로 처리하기 위해 사용된다.
만약 처리해야하는 메세지가 많아져서 producer가 메세지를 발행하는 속도보다 consumer가 메세지를 처리하는 속도가 느려지는 경우 lag가 쌓이게 된다.
이러한 경우 partition과 consumer를 늘려 컨슈밍 속도를 향상시킬 수 있지만, partition의 경우 한 번 늘리면 다시 줄일 수 없다는 특성이 있기 때문에 신중히 늘리는 것이 좋다.
이러한 경우 Kafka.js의 eachBatch 핸들러를 사용할 수 있다.
eachMessage 핸들러는 기본적으로 핸들러 내에서 메세지를 하나씩 처리할 수 있지만, eachBatch 핸들러에서는 여러개의 메세지를 동시에 처리할 수 있다.
예시는 다음과 같다.
// ... connect to consumer
await consumer.subscribe({ topics: ['topic-name'] })
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const batchSize = 5
for (let i = 0; i < batch.messages.length; i += batchSize) {
// Remove duplicated messages from chunk
const chunks = batch.messages.slice(i, i + batchSize)
const uniqChunks = _.uniqWith(
chunks,
(c1, c2) => c1.value?.toString() === c2.value?.toString()
)
// The last offset will be committed only after all Promises are resloved
const lastOffset = uniqChunks[uniqChunks.length - 1].offset
await Promise.all(
uniqChunks.map(async (chunk) => {
const value = chunk?.value?.toString()
if (!value) {
// handle error
}
await processMessage(value)
})
)
.then(() => {
// The resolved offset will be committed even in case of errors
resolveOffset(lastOffset)
})
.catch((error) => {
// handle error
})
.finally(async () => {
// Heartbeat will be ignored if it is called sooner than `heartbeatInterval`
await heartbeat()
})
}
},
})
batchSize 변수에 한 번에 처리할 메세지 개수를 저장하고, batch로 fetch해온 메세지를 for문 내에서 해당 개수에 맞게 slice해서 처리한다. 이렇게 처리하는 이유는 전체 메세지를 concurrent하게 처리했을 때 연관 서비스에 가는 부담을 낮추기 위해서이다.
또한 만약 메세지가 중복 처리되면 안되는 경우, processMessage() 함수 내에는 중복 처리 로직이 존재한다는 가정 하에 slice되어 비동기로 처리되는 메세지가 유니크한지 체크 후 Promise.all로 비동기 처리하는 방식을 사용할 수 있다.
이때, 마지막 메세지의 offset은 미리 저장해두고, 비동기 처리가 완료되면 resolveOffset 함수로 resolve 처리한다. 이를 통해 만약 일부 메세지 처리 과정에서 에러가 발생하여 재실행 해야하는 경우 offset이 잘못 commit되는 것을 방지할 수 있다.
이렇게 resolve된 메세지는 batch handler 내에서 에러가 발생하더라도 commit됨을 보장한다.
마지막으로 에러 발생 여부와 상관 없이 finally절에서는 hearbeat()을 호출한다. Kafka.js의 batch 핸들러는 batch 작업이 종료될 때마다 broker에게 heartbeat을 보내는데, 만약 메세지 처리 작업이 오래 걸리는 경우 해당 주기가 정해진 sessionTimeout 보다 길어져 broker가 consumer를 그룹에서 제외하고 rebalance 작업을 진행할 수 있다.
따라서 핸들러 내에서 주기적으로 hearbeat을 보내는 방식을 통해 session을 안정적으로 유지할 수 있다. 만약 heartbeat() 함수가 호출된 타이밍이 정해진 heartBeatInterval 이내인 경우, 이를 무시하므로 자주 호출되어도 문제가 없다.
만약, 비동기 처리 과정에서 좀 더 정교한 offset 처리가 필요한 경우 Promise.all 대신, Promise.allSettled를 사용하는 것을 고려해볼 수 있다.
이는 각 Promise마다 결과를 반환해주므로, 에러가 발생한 Promise에 대해서만 재처리가 가능하다.
consumer 연결 시 maxBytesPerPartition과 maxBytes config를 지정하여 fetch 해오는 메세지의 양을 조절할 수 있다.
만약 기존에 eachMessage 핸들러를 사용하고 있었다면, eachMessage 핸들러 역시 메세지 자체는 batch로 가져오고, 그 후 하나씩 처리하는 방식이므로 기존에 가져오는 메세지량에 문제가 없었다면 해당 config 값을 조절할 필요는 없다.