컨슈머 옵션
| 키 | 용도 |
|---|
| bootstrap.servers | 시작할 때 연결할 하나 이상의 카프카 브로커 |
| value.deserializer | 값 역직렬화에 필요 |
| key.deserializer | 키 역직렬화에 필요 |
| group.id | 컨슈머 그룹에 조인하기 위해 사용되는 ID |
| client.id | 유저를 식별하기 위한 ID |
| heartbeat.interval.ms | 컨슈머가 그룹 코디네이터에게 핑 신호를 보낼 간격 |
컨슈머 구현
class PromotionConsumer{
@Volatile
private var keepConsuming: Boolean = false
private val kp: Properties = Properties()
init {
kp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093, localhost:9094")
kp.put(ConsumerConfig.GROUP_ID_CONFIG, "kinaction_webconsumer")
kp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
kp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
kp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
kp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
}
fun consume(){
val consumer = KafkaConsumer<String,String>(kp)
consumer.subscribe(listOf("kinaction_promos"))
while (keepConsuming){
val records = consumer.poll(Duration.ofMillis(250))
for (record in records){
println("kinaction_info offset = ${record.offset()}, key = ${record.key()}")
println("kinaction_info value = ${record.value()}")
}
}
Runtime.getRuntime().addShutdownHook(Thread(this::shutdown))
}
private fun shutdown(){
keepConsuming = false
}
}
- 이미 처리한 데이터를 리플레이할 수 있기 때문에 원본 데이터를 재생성하려는 시도에 대해 걱정할 필요 없다.
올바르게 닫기
class KinactionStopConsumer: Runnable {
private val consumer: KafkaConsumer<String, String>
private var stopping: AtomicBoolean = AtomicBoolean(false)
override fun run() {
try {
consumer.subscribe(listOf("kinaction_promos"))
while (!stopping.get()){
val records = consumer.poll(Duration.ofMillis(250))
}
}catch (e: WakeupException){
if (!stopping.get())
throw e
}
finally {
consumer.close()
}
}
fun shutdown(){
stopping.set(true)
consumer.wakeup()
}
}
- wakeup 메서드 호출 시 WakeupException 이 발생하고 올바르게 종료하게 된다.
코디네이트 이해
오프셋
- 로그의 인덱스 위치
- 로그는 소비하려는 메시지 위치를 알 수 있다
--from-beginning은 컨슈머의 auto.offset.reset 구성 매개변수를 내부적으로 earliest로 설정
- 기본은 latest
- 변경사항은 로그 끝에 추가해야한다
- 오프셋은 항상 각 파티션에 대해 증가한다.
- 메시지가 제거되더라도 오프셋 번호는 다시 사용되지 않는다.
- 각 파티션에는 고유한 오프셋 시퀀스가 있다
- 토픽에 작성된 메시지는 파티션을 찾은 다음 인덱스 기반 오프셋을 찾는다.
- 컨슈머는 일반적으로 컨슈머의 파티션 리더 레플리카에서 읽는다.
- 네트워크 대기 시간 문제로 인해 레플리카에서 가져오는 기능을 2.4.0 버전에서 도입했다.
- 컨슈머 클라이언트는 그룹 코디네이터 역할을 수행하는 특정 브로커와 대화하며 파티션과 리더를 알아낸다.
- 메시지 소비에는 파티션 수도 영향을 미친다.
- 파티션보다 컨슈머가 많으면 일부 컨슈머는 작업을 수행하지 않는다.
- 그룹 코디네이터는 그룹 시작 초기에 어떤 컨슈머가 어떤 파티션을 읽을 지 지정하거나 컨슈머가 추가되거나 실패해 그룹을 종료할떄도 컨슈머를 할당한다.
파티션이 항상 많은 것이 좋을까?
- 처리량은 무료가 아니다.
- 많은 파티션이 종단 간 대기 시간을 증가시킬 수도 있다.
- 대기시간을 중요시한다면 브로커간에 파티션이 복제될 때까지 기다리는 것이 가능하지 않을 수 있다.
컨슈머가 상호작용하는 방식
- 그룹에 컨슈머를 추가하거나 제거함으로써 처리 규모에 영향을 주기 때문에 컨슈머 그룹 개념이 중요하다
- 동일한 그룹의 일부가 아닌 컨슈머는 동일한 코디네이션을 공유하지 않는다.
- 기존 그룹에 조인하면 다른 컨슈머와 작업을 공유하거나 이전 실행에서 읽기를 중단한 부분부터 다시 시작할 수 있다.
- 새 그룹 ID가 필요한지 여부를 결정하는 데 있어 중요한 사항은 컨슈머가 하나의 애플리케이션의 일부로 작업하는지 별도의 논리 흐름으로 작업하는 지이다.
추적
- 일부 시스템에서는 컨슈머가 읽은 내용을 기록하지 않는다.
- 메시지를 가져온 다음 수신확인한 후에는 그 메시지는 대기열에서 사라진다.
- 이는 정확히 하나의 애플리케이션이 처리해야하는 단일 메시지에 적합하다.
- 일부 시스템에서는 구독자인 모든 사람에게 메시지를 게시하기 위해 토픽을 사용한다.
그룹 코디네이터
- 컨슈머 클라이언트와 협력하여 특정 그룹이 읽은 토픽 내부의 기록을 유지
- 토픽에 대한 파티션 좌표와 그룹 ID는 오프셋 값에 특정 값을 할당한다
- 같은 그룹이 아니라면 함께 작동하지 않는다
- 일반적으로 컨슈머 그룹당 하나의 컨슈머만 하나의 파티션을 읽을 수 있다.
- 컨슈머가 실패할 때 읽고 있던 파티션이 재할당된다
- 그룹 코디네이터에 대한 핑의 양을 결정하는 heartbeat.interval.ms가 있다.
- 일정시간 핑을 보내지 못한다는 것은 컨슈머 클라이언트가 중지되거나 치명적인 예외로 인한 실패로 발생할 수 있다.
파티션 할당 전략
- partition.assignment.strategy 속성은 개별 컨슈머에 어떤 파티션이 할당되는지를 결정한다
RangeAssigner
- 단일 토픽을 사용해 파티션 수를 찾은 다음 컨슈머 수로 분할한다
- 분할이 짝수가 아닌 경우 첫 번째 컨슈머는 남은 파티션을 가져온다
- 일부 컨슈머 클라이언트가 리소스 대부분을 사용한다면 다른 할당 전략으로 전환하는 것을 고려하자
Round-Robin Strategy
Sticky Strategy
작업 위치 표시
- 애플리케이션이 토픽의 모든 메시지를 읽도록 해야 한다는 것은 고려해야할 중요한 사항 중 하나다
- 몇 개를 놓쳐도 괜찮은가?
- 각 메시지를 읽었음을 확인해야 하는가?
- enable.auto.commit는 기본적으로 true
- 컨슈머 클라이언트가 대신 커밋해준다.
- 좋은 점은 소비되는 오프셋을 커밋하기 위해 다른 호출을 할 필요가 없다는 것이다.
- 카프카 브러커는 컨슈머 클라이언트 오류로 인해 메시지가 수신확인되지 않은 경우 자동으로 메시지를 다시 보낸다
- 별도의 스레드에서 최신 폴링으로 얻은 메시지를 처리하는 경우, 모든 작업이 실제로 완료되지 않더라도 자동 커밋될 수 있다.
- 커밋의 타이밍이 완벽하지 않을 수 있다.
- 직접 커밋 메서드를 호출하지 않으면 폴링이나 만료된 타이머 또는 자체 스레드 로직에 따라 일부 정의되지 않은 동작이 있을 수 있다.
- commitSync 메소드로 commit을 기다릴 수 있다
consumer.commitSync()
consumer.commitAsync(kaOffsetMap){map, e ->
if (e != null){
for (key in map.keys)
println("kinaction_error offset = ${map.get(key)!!.offset()}")
}
else{
for (key in map.keys)
println("kinaction_info offset = ${map.get(key)!!.offset()}")
}
}
요구사항에 대한 코드 검색
읽기 옵션
- 키에 의한 메시지 조회 옵션은 없지만 특정 오프셋을 찾는 것은 가능하다.
- 우리가 직면할 수 있는 문제는 읽었더라도 토픽의 시작 부분부터 읽고 싶다는 것이다.
- 로직 오류, 전체 로그의 리플레이 등등
- auto.offset.reset을 earliest로 설정하는 것이 중요하다
- 사용가능한 다른 기술은 다른 컨슈머 그룹 ID를 사용하는 것이다
- 과거 메시지를 제외하고 시작하고 싶다면 latest를 사용하면 된다.
- 까다로운 오프셋 검색 방법 중 하나는 OffsetsForTimes이다.
- 토픽과 파티션에 대한 오프셋과 타임스탬프 맵을 다시 가져올 수 있다.
- 기록된 이벤트와 관련된 예외가 발생한 경우
- 반환된 오프셋은 기준을 충족하는 타임스탬프가 있는 첫번째 메시지이다.
요구사항
- 메시지를 잃지 않기 위해 직접 커밋할 필요가 있다.
- 브로커에게 전송되는 오프셋이 미래 인덱스가 되어야 한다
컨슈머 로직
p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- 위 설정을 해 자동 커밋을 방지하고 직접 커밋을 하는 것이 중요하다
중요한 이슈를 알리기 위해 신속히 얼럿을 처리하는 기능
- 파티션 0번으로 CRITICAL한 이슈가 전송된다고 가정할 때
consumer.assign(listOf(TopicPartition("kinaction_alert",0)))
- 위 코드를 통해 파티션을 직접 할당 받는다