
필수 속성값
bootstrap.serverskey.serializer, value.serializer예제
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//  정한 타입이 일치해야함
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// send() 메소드는 RecordMetadata를 포함한 자바 Future 객체를 리턴하지만, 여기서는 무시하기 때문에 전송 성공 여부를 알 수 없다.
    producer.send(record);
} catch(Exception e) {
/** 
 전송할 때 혹은 브로커 자체 에러를 무시하더라도 프로듀서가 메시지를 보내기 전 에러가 발생할 경우 예외 발생 가능.
 SerializationException -> 직렬화 실패 경우
 TimeoutException -> 버퍼가 가득 찬 경우
 InterruptException -> 스레드에 인터럽트가 걸리는 경우
**/
    e.printStackTrace();
}동기적 전송
작업이 몰리면 브로커가 쓰기 요청에 응답하기까지 최소 2ms 최대 몇 초가 지연 될 수 있다.
전송 요청하는 스레드는 위 시간동안 아무 작업도 할 수 없다.
(동기적 전송은 잘 안쓴다.)
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// 카프카로 응답이 올 때 까지 대기하기 위해 .get() 사용
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}비동기적 전송
카프카는 레코드를 쓴 뒤 해당 레코드의 토픽, 파티션 오프셋을 리턴하는데, 대부분의 애플리케이션은 이런 메타데이터가 필요가 없다.
메시지 전송이 실패했을 경우에는 해당 내용을 알아야 하는데, 이때는 콜백을 사용한다.
// org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스 필요.
// 해당 인터페이스에는 onCompletion() 단 하나의 메서드만 정의되어 있다.
private class TestProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
// 만약 에러를 리턴하면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게된다.
            e.printStackTrace();
        }
    }
}
 
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
/// Callback 객체 매개변수로 전달
producer.send(record, new TestProducerCallback());KafkaProducer 에러 종류
아래의 설정들은 프로듀서에서만 설정이 가능하다. (브로커에서 설정 불가능)
하기 옵션을 제외한 나머지 옵션은 기본으로 두고 쓰는걸 권장한다.
kafka 2.22 기준// 옵션 적용 방법 props.put("옵션 이름", "옵션 값"); props.put(ProducerConfig.옵션 이름, "옵션 값");
| acks = 0 | acks = 1 | acks = all | |
|---|---|---|---|
| 성공 기준 | 전송 후(브로커의 응답을 기다리지 않음) | 리더 브로커가 받으면 | 모든 복제가 완료 | 
| 특징 | 처리량이 매우 중요하고, 유실이 되어도 상관없으면 설정 | 리더가 받고 복제 직전에 죽어버리면 메시지가 유실된다.리더가 그 전에 죽으면 재전송. | 최소 2개 이상의 브로커가 해당 메시지를 가지고 있고, 크래시가 나도 유실되지 않기 때문에 가장 안전하나 응답까지의 시간이 다른 설정보다 오래 걸린다 | 
linger.ms 에 설정된 제한 시간이 되면 배치를 전송한다.message.max.bytes 설정과 동일하게 설정하는게 좋음.ProducerRecord를 보낼 때 걸리는 시간이 두 구간으로 나뉘어 있다.
send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간send()를 호출한 스레드는 블록send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때까지 걸리는 시간
send()를 호출했을 때partitionsFor를 호출해서 명시적으로 메타데이터를 요청했을 때max.block.ms만큼 시간이 흐르면 예외가 발생send()가 정상적으로 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간을 결정linger.ms, request.timeout.ms보다 커야함.delivery.timeout.ms가 넘어가면, 마지막으로 재시도 하기 전에 브로커가 리턴한 예외와 함께 콜백이 호출delivery.timeout.ms가 넘어가버리면 타임아웃 예외와 함께 콜백이 호출retry.backoff.ms 설정을 통해 조정 가능delivery.timeout.ms로 잡아주는걸 권장.linger.ms에 설정된 제한 시간이 되었을 때 메시지 배치를 전송.