Kafka: Producer

xellos·2022년 7월 9일
0

Kafka

목록 보기
2/3

Kafka Producer의 역할

  • Topic에 해당하는 메시지를 생성
  • 특정 Topic으로 데이터를 publish
  • 처리 실패 / 재시도

1) 코드에서 살펴보기

의존성 추가: Gradle

  • 주의사항: Broker 버전과 Client 버전의 하위 호환이 완벽하지 않으므로 둘의 버전을 맞추는 것이 좋다.
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'

코드 작성

  • Kafka 에서는 되도록 2개 이상의 IP 와 PORT 를 설정하도록 권장하고 있다.
  • key는 메시지를 보낼때 토픽의 파티션이 지정될 때 쓰인다.
  • Kafka는 Key를 해시화해서 각 파티션과 1:1 로 매칭시킨다.
//키가 없는 경우
public class Producer {
	public static void main(String[] args) throws IOException {
    	Properties configs = new Properties();
        configs.put("bootstrap.server", "localhost:9092");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       	configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer <String, String> producer = new KakfkaProducer<>(configs);
        ProducerRecord record = new ProducerRecord<String, String>("click_log", "login");
        
        producer.send(record);
        producer.close();
    }
}
//키가 있는 경우
public class Producer {
	public static void main(String[] args) throw IOException {
    	Properties configs = new Properties();
        configs.put("bootstrap.server", "localhost:9092");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       	configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer <String, String> producer = new KakfkaProducer<>(configs);
        ProducerRecord record1 = new ProducerRecord<String, String>("click_log", "1", "buy");
        ProducerRecord record2 = new ProducerRecord<String, String>("click_log", "2", "login");
        
        producer.send(record1);
        producer.send(record2);
        producer.close();
    }
}

2) 이미지로 살펴보기

  1. KEY가 없는 경우
  1. KEY가 있는 경우
    토픽에 파티션을 추가하는 순간 키와 파티션의 일관성을 보장할 수 없다.

3) 선택옵션

필수 옵션 - 반드시 입력

  • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
  • key.serializer : 메시지 키 직렬화에 사용되는 클래스
  • value.serializer : 메시지 값을 직렬화하는데 사용되는 클래스

선택 옵션 - Default 값 존재

  • acks : 레코드 전송 신뢰도 조절(레플리카)
  • compression.type : snappy, gzip, lz4 중 하나로 압축하여 전송
  • retries : 클러스터 장애에 대응하여 메시지 전송을 재시도 하는 횟수
  • buffer.memory : 브로커에 전송될 메시지의 버퍼로 사용될 메모리양
  • batch.size : 현재의 배치를 전송하기 전까지 기다리는 시간
  • client.id : 어떤 클라이언트인지 구분하는 식별자

0개의 댓글