Apache Kafka - Producer

현시기얌·2022년 2월 10일
0

Apache Kafka

목록 보기
4/12

Producer, Consumer, Consumer Group

  • Producer : 메시지를 생산(Produce)해서 kafka의 Topic으로 메시지를 보내는 애플리케이션
  • Consumer : Topic의 메시지를 가져와서 소비(Consume)하는 애플리케이션
  • Consumer Group : Topic의 메시지를 사용하기 위해 협력하는 Consumer들의 집합
    하나의 Consumer는 하나의 Consumer Group에 포함되며 Consumer Group내의 Consumer들은 협력하여 Topic 메시지를 병렬 처리 한다.

Record(Message) 구조

  • Producer가 보내는 Record(Message) 크게보면 Header, Key, Value로 구성되어 있다.
  • Header 부분에 Topic명을 지정할 수 있고 Key와 Value에는 실제 보내고자 하는 데이터, 비즈니스에 연관되어 있는 데이터, Consumer에서 Read해야 하는 데이터들이 들어가 있다.
  • Key와 Value는 Avro, JSON 등의 다양한 형태가 가능하다.

Serializer/Deserializer

  • Kafka는 이렇게 보내지는 Record(데이터)를 Byte Array로만 저장한다.
  • Producer에서 JSON, String, Avro 같은 형태를 Record 형태로 만들어서 Key와 Value 형태로 Send하면 그 내부에서 Producer안에 있는 Publish&Subscribe 라이브러리에서 Serializer(직렬화)가 이루어지게 되고 직렬화를 통해서 Byte Array로 변환되어 Kafka에게 전달된다.
  • Consumer는 Kafka에서 Byte Array 데이터를 Deserializer(역직렬화)하여 원본 데이터로 활용한다.

Producer Sample Code With Serializer

private Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker101:9092,broker102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);

KafkaProducer producer = new KafkaProducer(props);
  • KEY_SERIALIZER_CLASS_CONFIG 와 VALUE_SERIALIZER_CLASS_CONFIG 파라미터를 사용해서 실제 Serializer 클래스를 지정할 수 있다.
  • 위의 코드를 보면 Key는 String Serializer Class로 Value는 KafkaAvro Serializer Class로 지정하여 데이터를 직렬화 하여 ByteArray로 변환하는 것을 볼 수 있다.

Producing to Kafka

  • Producer에서 Record를 만들고 send() 메소드를 통해 Record를 보내면 Producer Application 내에 있는 Kafka 라이브러리가 그 다음 과정들을 진행해준다.
  • Serializer를 통해서 Key와 Value가 ByteArray로 변환되고 Partitioner라고 하는 내부 요소 컴포넌트를 통해서 어느 Partition으로 보낼건지가 결정이 된다.
  • 선택사항으로 Compress(압축옵션)이 있다면 압축이 진행이 되고 RecordAccumulator라고 하는 곳으로 보내져서 배치형태로 모여지거나 건바이건으로 Kafka로 전송이 된다.
  • 그리고 Kafka는 이에 대한 응답을 주는데 성공을 하면 metadata가 성공했다고 리턴을 해주고 실패했다면 재시도를 할거냐 말거냐 하는 옵션에 따라서 자동으로 재시도를 하고 말고가 정해진다.
  • 재시도를 하다가 Retry 할 수 없는 상황이 발생하면 Exception이 떨어지면서 종료된다.
  • 이 때 우리가 해야 할 일은 Record를 만들고 어떤 Serializer Class로 직렬화 하여 Send() 할지만 정하면 나머지는 알아서 라이브러리에서 진행이된다.

Partitioner의 역할


  • Partitioner는 메시지를 Topic의 어떤 Partition으로 보낼지를 결정한다.
    Partitioner를 사용할 때 보통 Default Partitioner를 사용한다.
  • Default Partitioner은 Producer에서 보내는 메시지의 Key값을 Hash 알고리즘을 이용해 숫자로 만든 다음에 Partiton의 개수를 가지고 나머지를 구한다. 그리고 나온 나머지 값에 해당하는 Partition으로 메시지를 보낸다.
  • 이 때 전제 조건은 Key가 Null이 아닐 때다.

Default Partitioner 에서 Key가 Null일 때

  • Kafka 2.4 이전에는 Key값이 Null이면 Round Robin 형태로 Partition에 메시지를 보냈다.
  • Kafka 2.4 이후에는 Sticky 정책으로 Batch가 닫힐 때 까지 Partition으로 메시지를 계속 보낸다. (시작은 0번부터 0번 이후에는 랜덤으로)

요약

  • Message = Record = Event = Data
  • Message는 Header와 Key, Value로 구성된다.
  • Kafka는 Record(데이터)를 Byte Array로만 저장한다.
  • Producer는 Serializer, Consumer는 Deserializer를 사용한다.
  • Producer는 Message의 Key 존재 여부에 따라서 Partitioner를 통한 메시지 처리 방식이 달라진다.
profile
현시깁니다

0개의 댓글