Kafka Producer의 역할
- Topic에 해당하는 메시지를 생성
- 특정 Topic으로 데이터를 publish
- 처리 실패 / 재시도
1) 코드에서 살펴보기
의존성 추가: Gradle
- 주의사항: Broker 버전과 Client 버전의 하위 호환이 완벽하지 않으므로 둘의 버전을 맞추는 것이 좋다.
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
코드 작성
- Kafka 에서는 되도록 2개 이상의 IP 와 PORT 를 설정하도록 권장하고 있다.
- key는 메시지를 보낼때 토픽의 파티션이 지정될 때 쓰인다.
- Kafka는 Key를 해시화해서 각 파티션과 1:1 로 매칭시킨다.
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.server", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer <String, String> producer = new KakfkaProducer<>(configs);
ProducerRecord record = new ProducerRecord<String, String>("click_log", "login");
producer.send(record);
producer.close();
}
}
public class Producer {
public static void main(String[] args) throw IOException {
Properties configs = new Properties();
configs.put("bootstrap.server", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer <String, String> producer = new KakfkaProducer<>(configs);
ProducerRecord record1 = new ProducerRecord<String, String>("click_log", "1", "buy");
ProducerRecord record2 = new ProducerRecord<String, String>("click_log", "2", "login");
producer.send(record1);
producer.send(record2);
producer.close();
}
}
2) 이미지로 살펴보기
- KEY가 없는 경우
- KEY가 있는 경우
토픽에 파티션을 추가하는 순간 키와 파티션의 일관성을 보장할 수 없다.
3) 선택옵션
필수 옵션 - 반드시 입력
bootstrap.servers
: 카프카 클러스터에 연결하기 위한 브로커 목록
key.serializer
: 메시지 키 직렬화에 사용되는 클래스
value.serializer
: 메시지 값을 직렬화하는데 사용되는 클래스
선택 옵션 - Default 값 존재
acks
: 레코드 전송 신뢰도 조절(레플리카)
compression.type
: snappy, gzip, lz4 중 하나로 압축하여 전송
retries
: 클러스터 장애에 대응하여 메시지 전송을 재시도 하는 횟수
buffer.memory
: 브로커에 전송될 메시지의 버퍼로 사용될 메모리양
batch.size
: 현재의 배치를 전송하기 전까지 기다리는 시간
client.id
: 어떤 클라이언트인지 구분하는 식별자