Kafka - 4

유호준·2024년 1월 23일

Kafka

목록 보기
3/6

컨슈머 옵션

용도
bootstrap.servers시작할 때 연결할 하나 이상의 카프카 브로커
value.deserializer값 역직렬화에 필요
key.deserializer키 역직렬화에 필요
group.id컨슈머 그룹에 조인하기 위해 사용되는 ID
client.id유저를 식별하기 위한 ID
heartbeat.interval.ms컨슈머가 그룹 코디네이터에게 핑 신호를 보낼 간격

컨슈머 구현

class PromotionConsumer{
    @Volatile
    private var keepConsuming: Boolean = false
    private val kp: Properties = Properties()
    init {
        kp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093, localhost:9094")
        kp.put(ConsumerConfig.GROUP_ID_CONFIG, "kinaction_webconsumer")
        kp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
        kp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
        kp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
        kp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
    }

    fun consume(){
        val consumer = KafkaConsumer<String,String>(kp)
        consumer.subscribe(listOf("kinaction_promos"))

        while (keepConsuming){
            val records = consumer.poll(Duration.ofMillis(250))

            for (record in records){
                println("kinaction_info offset = ${record.offset()}, key = ${record.key()}")
                println("kinaction_info value = ${record.value()}")
            }
        }

        Runtime.getRuntime().addShutdownHook(Thread(this::shutdown))
    }

    private fun shutdown(){
        keepConsuming = false
    }

}
  • 이미 처리한 데이터를 리플레이할 수 있기 때문에 원본 데이터를 재생성하려는 시도에 대해 걱정할 필요 없다.

올바르게 닫기

class KinactionStopConsumer: Runnable {
    private val consumer: KafkaConsumer<String, String>
    private var stopping: AtomicBoolean = AtomicBoolean(false)
   
    override fun run() {
        try {
            consumer.subscribe(listOf("kinaction_promos"))
            while (!stopping.get()){
                val records = consumer.poll(Duration.ofMillis(250))
            }
        }catch (e: WakeupException){
            if (!stopping.get())
                throw e
        }
        finally {
            consumer.close()
        }
    }

    fun shutdown(){
        stopping.set(true)
        consumer.wakeup()
    }
}
  • wakeup 메서드 호출 시 WakeupException 이 발생하고 올바르게 종료하게 된다.

코디네이트 이해

오프셋

  • 로그의 인덱스 위치
  • 로그는 소비하려는 메시지 위치를 알 수 있다
  • --from-beginning은 컨슈머의 auto.offset.reset 구성 매개변수를 내부적으로 earliest로 설정
    - 기본은 latest
  • 변경사항은 로그 끝에 추가해야한다
  • 오프셋은 항상 각 파티션에 대해 증가한다.
  • 메시지가 제거되더라도 오프셋 번호는 다시 사용되지 않는다.
    - 각 파티션에는 고유한 오프셋 시퀀스가 있다
  • 토픽에 작성된 메시지는 파티션을 찾은 다음 인덱스 기반 오프셋을 찾는다.
    - 컨슈머는 일반적으로 컨슈머의 파티션 리더 레플리카에서 읽는다.
    • 네트워크 대기 시간 문제로 인해 레플리카에서 가져오는 기능을 2.4.0 버전에서 도입했다.
  • 컨슈머 클라이언트는 그룹 코디네이터 역할을 수행하는 특정 브로커와 대화하며 파티션과 리더를 알아낸다.
  • 메시지 소비에는 파티션 수도 영향을 미친다.
    - 파티션보다 컨슈머가 많으면 일부 컨슈머는 작업을 수행하지 않는다.
  • 그룹 코디네이터는 그룹 시작 초기에 어떤 컨슈머가 어떤 파티션을 읽을 지 지정하거나 컨슈머가 추가되거나 실패해 그룹을 종료할떄도 컨슈머를 할당한다.

파티션이 항상 많은 것이 좋을까?

  • 처리량은 무료가 아니다.
  • 많은 파티션이 종단 간 대기 시간을 증가시킬 수도 있다.
  • 대기시간을 중요시한다면 브로커간에 파티션이 복제될 때까지 기다리는 것이 가능하지 않을 수 있다.

컨슈머가 상호작용하는 방식

  • 그룹에 컨슈머를 추가하거나 제거함으로써 처리 규모에 영향을 주기 때문에 컨슈머 그룹 개념이 중요하다
  • 동일한 그룹의 일부가 아닌 컨슈머는 동일한 코디네이션을 공유하지 않는다.
  • 기존 그룹에 조인하면 다른 컨슈머와 작업을 공유하거나 이전 실행에서 읽기를 중단한 부분부터 다시 시작할 수 있다.
  • 새 그룹 ID가 필요한지 여부를 결정하는 데 있어 중요한 사항은 컨슈머가 하나의 애플리케이션의 일부로 작업하는지 별도의 논리 흐름으로 작업하는 지이다.

추적

  • 일부 시스템에서는 컨슈머가 읽은 내용을 기록하지 않는다.
    - 메시지를 가져온 다음 수신확인한 후에는 그 메시지는 대기열에서 사라진다.
    • 이는 정확히 하나의 애플리케이션이 처리해야하는 단일 메시지에 적합하다.
  • 일부 시스템에서는 구독자인 모든 사람에게 메시지를 게시하기 위해 토픽을 사용한다.

그룹 코디네이터

  • 컨슈머 클라이언트와 협력하여 특정 그룹이 읽은 토픽 내부의 기록을 유지
  • 토픽에 대한 파티션 좌표와 그룹 ID는 오프셋 값에 특정 값을 할당한다
  • 같은 그룹이 아니라면 함께 작동하지 않는다
  • 일반적으로 컨슈머 그룹당 하나의 컨슈머만 하나의 파티션을 읽을 수 있다.
  • 컨슈머가 실패할 때 읽고 있던 파티션이 재할당된다
  • 그룹 코디네이터에 대한 핑의 양을 결정하는 heartbeat.interval.ms가 있다.
    - 일정시간 핑을 보내지 못한다는 것은 컨슈머 클라이언트가 중지되거나 치명적인 예외로 인한 실패로 발생할 수 있다.

파티션 할당 전략

  • partition.assignment.strategy 속성은 개별 컨슈머에 어떤 파티션이 할당되는지를 결정한다

RangeAssigner

  • 단일 토픽을 사용해 파티션 수를 찾은 다음 컨슈머 수로 분할한다
  • 분할이 짝수가 아닌 경우 첫 번째 컨슈머는 남은 파티션을 가져온다
  • 일부 컨슈머 클라이언트가 리소스 대부분을 사용한다면 다른 할당 전략으로 전환하는 것을 고려하자

Round-Robin Strategy

  • 컨슈머 밑으로 균일하게 분산되는 방식

Sticky Strategy

작업 위치 표시

  • 애플리케이션이 토픽의 모든 메시지를 읽도록 해야 한다는 것은 고려해야할 중요한 사항 중 하나다
    - 몇 개를 놓쳐도 괜찮은가?
    - 각 메시지를 읽었음을 확인해야 하는가?
  • enable.auto.commit는 기본적으로 true
    - 컨슈머 클라이언트가 대신 커밋해준다.
    - 좋은 점은 소비되는 오프셋을 커밋하기 위해 다른 호출을 할 필요가 없다는 것이다.
  • 카프카 브러커는 컨슈머 클라이언트 오류로 인해 메시지가 수신확인되지 않은 경우 자동으로 메시지를 다시 보낸다
    - 별도의 스레드에서 최신 폴링으로 얻은 메시지를 처리하는 경우, 모든 작업이 실제로 완료되지 않더라도 자동 커밋될 수 있다.
    • 메시지를 쉽게 잃을 수 있다.
  • 커밋의 타이밍이 완벽하지 않을 수 있다.
    - 직접 커밋 메서드를 호출하지 않으면 폴링이나 만료된 타이머 또는 자체 스레드 로직에 따라 일부 정의되지 않은 동작이 있을 수 있다.
    - commitSync 메소드로 commit을 기다릴 수 있다
consumer.commitSync()
consumer.commitAsync(kaOffsetMap){map, e ->
	if (e != null){
		for (key in map.keys)
			println("kinaction_error offset = ${map.get(key)!!.offset()}")
	}    
	else{
		for (key in map.keys)
			println("kinaction_info offset = ${map.get(key)!!.offset()}")
	}
}

요구사항에 대한 코드 검색

읽기 옵션

  • 키에 의한 메시지 조회 옵션은 없지만 특정 오프셋을 찾는 것은 가능하다.
  • 우리가 직면할 수 있는 문제는 읽었더라도 토픽의 시작 부분부터 읽고 싶다는 것이다.
    - 로직 오류, 전체 로그의 리플레이 등등
  • auto.offset.reset을 earliest로 설정하는 것이 중요하다
    - 사용가능한 다른 기술은 다른 컨슈머 그룹 ID를 사용하는 것이다
  • 과거 메시지를 제외하고 시작하고 싶다면 latest를 사용하면 된다.
  • 까다로운 오프셋 검색 방법 중 하나는 OffsetsForTimes이다.
    - 토픽과 파티션에 대한 오프셋과 타임스탬프 맵을 다시 가져올 수 있다.
    • 기록된 이벤트와 관련된 예외가 발생한 경우
    • 반환된 오프셋은 기준을 충족하는 타임스탬프가 있는 첫번째 메시지이다.

요구사항

  • 메시지를 잃지 않기 위해 직접 커밋할 필요가 있다.
  • 브로커에게 전송되는 오프셋이 미래 인덱스가 되어야 한다

컨슈머 로직

  • p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
  • 위 설정을 해 자동 커밋을 방지하고 직접 커밋을 하는 것이 중요하다

중요한 이슈를 알리기 위해 신속히 얼럿을 처리하는 기능

  • 파티션 0번으로 CRITICAL한 이슈가 전송된다고 가정할 때
  • consumer.assign(listOf(TopicPartition("kinaction_alert",0)))
  • 위 코드를 통해 파티션을 직접 할당 받는다
profile
포트폴리오 - https://drive.google.com/file/d/152OM9p7JQorjUfvR4BaxqGuP5xtQ8-fM/view?usp=sharing

0개의 댓글