프로퍼티를 이용해서 producer 속성을 정의한다. (설정 정보)
이 프로퍼티를 이용해 producer 객체를 만든다 (producer 객체는 send 메서드를 지원한다)
ProducerRecord가 KAFKA 브로커에 전송할 메세지가 된다.
producer를 다 사용 했다면 close 메소드를 통하여 닫아주면 된다.
send 메서드를 이용해서 레코드를 전송하면 먼저 Serializer을 이용해서 byte 배열로 변환을 하고 Partitioner을 이용해서 메시지를 어느 토픽에 파티션으로 보낼지 결정을 하게 된다.
그리고 변환된 byte 배열 메시지를 버퍼에 저장을 한다, 그때 버퍼에 바로 저장을 하는게 아니라 batch로 묶어서 메시지를 저장한다.
Sender가 그 메시지를 가져오서 카프카 브로커로 전송을 한다.
Seder는 별도 thread로 동작한다.
Sender는 batch를 차례대로 꺼내서 브로커로 전달을 하는데 배치가 찼는지 여부에 상관없이 읽어서 보낸다.
Sender가 브로커에 메시지를 보내는 동안 send 메서드를 통해서 들어온 레코드 들은 배치에 누적된다.
2개가 서로 다른 thread로 동작을 하기 때문에 둘 중 하나가 실행 중인 동안 다른 하나가 안될 일은 없다.
batch 사이즈를 설정할 수 있는데 이것은 배치의 최대 크기를 설정한다.
대기 시간을 주면 그 시간만큼 기다리니까 그 기다리는 시간동안 배치에 메시지가 쌓이게 되고 한 번에 더 많은 메시지를 보낼 수 있는 여지가 생김.
KafkaFuture<RecordMetadata> f = producer.send(new ProducerRecord<>("topic","value"));
try{
RecordMetadata metadata = f.get(); //블로킹
}catch(ExecutionException ex){
}
하나의 메시지를 보내고 블로킹 되고 하나를 보내고 블로킹 되서 배치에 메시지가 쌓이지 않고 한 개씩만 들어간다.
배치 효과 떨어짐 -> 처리량 저하
대신에 건별로 확실하게 결과를 알 수 있다.
처리량이 낮아도 되는 경우에만 사용
producer.send(new ProducerRecord<>("simple", "value"),
new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception ex){
}
});
send 메서드에 callback 객체를 전달하는 한다.
onCompletion 메서드로 결과를 받는데 Exception 객체를 받게되면 전송에 실패 했다고 볼 수 있다.
배치가 쌓이지 않는다는 단점이 사라지고 처리량 저하가 없다.
프로듀서가 리더에 요청 전송을 하고 리더가 성공적으로 저장을 했고, 두 팔로워 중 하나가 성공적으로 저장을 하면 동기화된 리플리카 개수가 2가 된다.
즉 최소 개수가 충족을 하기 때문에 성공 응답을 보낸다.
재시도 가능 에러는 재시도 처리 (예 : 브로커 응답 타임 아웃, 일시적인 리더 없음 등)
프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능 한 에러에 대해 재전송 시도 (retries 속성)
send() 메서드에서 exception 발생시 exception 타입에 따라 send() 재호출
콜백 메서드에서 exception 받으면 ㅏ입에 따라 send() 재호출
아주아주 특별한 이유가 없다면 재시도 x
별도 파일, DB 등을 이용해서 실패한 메시지 기록
추후에 수동(또는 자동) 보정 작업 진행
send( ) 메서드에서 exception 발생시
send( ) 메서드에 전달한 콜백에서 exception 받는 경우
send( ) 메서드가 리턴한 Future의 get() 메서드에서 exception 발생시