프로듀서 설명
- 프로듀서 작업에는 클러스터에 대한 메타데이터 가져오기가 포함된다.
- 프로듀서는 할당된 파티션의 리더 레플리카에만 쓸 수 있다.
- 사용자는 다른 세부 정보 없이 토픽 이름만 알고 있으므로 이 메타 데이터는 프로듀서가 쓸 브로커를 결정하는 데 도움이 된다.
- 재시도를 위한 로직이 이미 내장되어 있다.
- 메시지 순서가 필수적인 경우 max.in.flight.requests.per.connection 1로 설정 후 acks를 all로 설정해야한다
프로듀서 옵션
- 일부 구성값을 변경하는 것 만으로도 주요 동작을 변경할 수 있다
- ProducerConfig에서 제공된 상수를 사용하고 컨플루언트 웹사이트에서 high 레이블을 찾아보자
| 키 | 용도 |
|---|
| acks | 메시지 전달이 성공하기 위한 프로듀서가 요구하는 복제 ackd의 수 |
| bootstrap.servers | 시작 시 연결할 하나 이상의 카프카 브로커 |
| value.seriallizer | 값의 직렬화에 사용되는 클래스 |
| key.seriallizer | 키의 직렬화에 사용되는 클래스 |
브로커 목록 구성
- 메시지를 보낼 토픽을 프로듀서에게 알려야한다
- 토픽은 파티션으로 구성되어 있는데 카프카는 토픽과 파티션이 있는 위치를 어떻게 알 수 있을까?
- bootstrap.servers
- 프로듀서가 클러스터에 연결되면 이전에 제공하지 않은 세부정보를 엊는데 필요한 메타데이터를 얻을 수 있다.
더 빨리(또는 안전하게) 처리하기
- 비동기 메시지 패턴은 많은 사람이 대기열 유형의 시스템을 사용하는 한가지 이유이다.
- 프로듀서 전송 요청의 결과를 코드에서 기다리거나 콜백 Future 객체를 사용해 비동기적으로 처리할 수 있다.
acks
all 옵션
- 파티션리더의 레플리카가 ISR의 전체 목록에 대해 복제 완료를 기다린다는 의미
- 파티션에 대한 모든 레플리카가 성공하기 전까지는 성공 확인을 받지 못한다
- 이 옵션은 다른 브로커에 대한 의존성으로 인해 가장 느리다
타임스탬프
- 레코드에는 전달하는 이벤트에 대한 타임스탬프가 포함되어 있다.
- 프로듀서에게 전달할 때 long 유형으로 사용자가 직접 전달하거나 시스템 시간으로 전달할 수 있다
- message.timestamp.type 구성을 CreateTime으로 설정하면 클라이언트에서 설정한 시간이 사용되는 반면 LogAppendTime으로 설정하면 브로커 시간이 사용된다.
- 트랜잭션이 브로커에 전달된 시간이 아니라 발생하는 시간을 얻기 위해 생선된 시간을 사용할 수 있다.
- 생성된 시간이 메시지 자체 내에서 처리되거나 실제 이벤트 시간이 비즈니스 또는 주문과 관련 없는 경우에 브로커 시간이 유용할 수 있다.
- 이전의 레코드보다 더 빠른 레코드를 얻을 수도 있다.
- 첫 번째 레코드의 재시도가 완료되기 전에 더 늦은 타임스탬프가 있던 다른 메시지가 커밋된 경우
- 데이터는 로그에서 오프셋을 기준으로 정렬된다.
요구사항에 대한 코드 생성
- 운영자가 센서에 대한 명령을 완료할 때 감사 메시지가 손실되지 않도록 하고 싶다.
- 이벤트를 상호 연관시킬 필요가 없다는 것이다.
get 메서드는 동기적으로 응답이 완료될 때 까지 기다린다.
val metadata = producer.send(it).get()
- 카프카는 특정 파티션에 메시지를 보내는 기본 방법을 제공한다
- 2.4 버전 이전에는 키가 없는 메시지의 기본값은 라운드 로빈이었다 이후 버전에는 고정파티션 전략을 사용한다.
- partitioner.class에 커스텀 파티셔너를 사용할 수 있다.
- 메시지 실패에 대해 100% 걱정할 필요는 없다고 말했지만 실패를 방지하고 싶다.
public class AlertCallback implements Callback{
public void onCompletion(RecordMetadata metadata, Exception exception){
...
}
}
만들고자 하는 특정 형식이 있다면?
- Serializer, Deserializer를 구현한다
- 프로듀서가 값을 직렬화한 방법과 관하여 컨슈머가 값을 역직렬화하는 방법에도 영향을 미친다.
- 클라이언트의 데이터 형식에 대해 일종의 동의 또는 코디네이터가 필요하다.
다른 목표 : 시간 경과에 따른 경보 추적
- 얼럿 추세 상태 캡쳐
- 각 스테이지에 대한 정보에 관심 갖기 때문에 이러한 이벤트를 그룹화하는 방법을 생각해야한다
- 이 경우 각 스테이지 ID가 고유하므로 이를 사용한다
- 일반적으로 동일한 키는 동일한 파티션에 할당하므로 그룹화에 다른 할일은 없다
KafkaProducer<Alert,String>(kp).use {producer ->
val alert = Alert(0, "Stage 0", "CRITICAL", "Stage 0 stopped")
val record = ProducerRecord<Alert, String>("kinaction_alerttrend", alert, alert.alertMessage)
val result = producer.send(record).get()
println("kinaction_info offset = ${result.offset()}, topic = ${result.topic()}, timestamp = ${result.timestamp()}")
}