필수 속성값
bootstrap.servers
key.serializer
, value.serializer
예제
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 정한 타입이 일치해야함
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// send() 메소드는 RecordMetadata를 포함한 자바 Future 객체를 리턴하지만, 여기서는 무시하기 때문에 전송 성공 여부를 알 수 없다.
producer.send(record);
} catch(Exception e) {
/**
전송할 때 혹은 브로커 자체 에러를 무시하더라도 프로듀서가 메시지를 보내기 전 에러가 발생할 경우 예외 발생 가능.
SerializationException -> 직렬화 실패 경우
TimeoutException -> 버퍼가 가득 찬 경우
InterruptException -> 스레드에 인터럽트가 걸리는 경우
**/
e.printStackTrace();
}
동기적 전송
작업이 몰리면 브로커가 쓰기 요청에 응답하기까지 최소 2ms 최대 몇 초가 지연 될 수 있다.
전송 요청하는 스레드는 위 시간동안 아무 작업도 할 수 없다.
(동기적 전송은 잘 안쓴다.)
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
try {
// 카프카로 응답이 올 때 까지 대기하기 위해 .get() 사용
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
비동기적 전송
카프카는 레코드를 쓴 뒤 해당 레코드의 토픽, 파티션 오프셋을 리턴하는데, 대부분의 애플리케이션은 이런 메타데이터가 필요가 없다.
메시지 전송이 실패했을 경우에는 해당 내용을 알아야 하는데, 이때는 콜백을 사용한다.
// org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스 필요.
// 해당 인터페이스에는 onCompletion() 단 하나의 메서드만 정의되어 있다.
private class TestProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
// 만약 에러를 리턴하면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게된다.
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "test2", "test3");
/// Callback 객체 매개변수로 전달
producer.send(record, new TestProducerCallback());
KafkaProducer 에러 종류
아래의 설정들은 프로듀서에서만 설정이 가능하다. (브로커에서 설정 불가능)
하기 옵션을 제외한 나머지 옵션은 기본으로 두고 쓰는걸 권장한다.
kafka 2.22 기준// 옵션 적용 방법 props.put("옵션 이름", "옵션 값"); props.put(ProducerConfig.옵션 이름, "옵션 값");
acks = 0 | acks = 1 | acks = all | |
---|---|---|---|
성공 기준 | 전송 후(브로커의 응답을 기다리지 않음) | 리더 브로커가 받으면 | 모든 복제가 완료 |
특징 | 처리량이 매우 중요하고, 유실이 되어도 상관없으면 설정 | 리더가 받고 복제 직전에 죽어버리면 메시지가 유실된다.리더가 그 전에 죽으면 재전송. | 최소 2개 이상의 브로커가 해당 메시지를 가지고 있고, 크래시가 나도 유실되지 않기 때문에 가장 안전하나 응답까지의 시간이 다른 설정보다 오래 걸린다 |
linger.ms
에 설정된 제한 시간이 되면 배치를 전송한다.message.max.bytes
설정과 동일하게 설정하는게 좋음.ProducerRecord를 보낼 때 걸리는 시간이 두 구간으로 나뉘어 있다.
send()
에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간send()
를 호출한 스레드는 블록send()에
대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때까지 걸리는 시간send()
를 호출했을 때partitionsFor
를 호출해서 명시적으로 메타데이터를 요청했을 때max.block.ms
만큼 시간이 흐르면 예외가 발생send()
가 정상적으로 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간을 결정linger.ms, request.timeout.ms
보다 커야함.delivery.timeout.ms
가 넘어가면, 마지막으로 재시도 하기 전에 브로커가 리턴한 예외와 함께 콜백이 호출delivery.timeout.ms
가 넘어가버리면 타임아웃 예외와 함께 콜백이 호출retry.backoff.ms
설정을 통해 조정 가능delivery.timeout.ms
로 잡아주는걸 권장.linger.ms
에 설정된 제한 시간이 되었을 때 메시지 배치를 전송.