Apache Kafka - Producer 1

kkmdevel·2024년 5월 15일
0

kafka

목록 보기
3/10
post-thumbnail

ProducerRecord

ProdcuerRecord에 담긴 정보에 따라 메시지를 보냄 (Topic,value 필수)
어떤 브로커의 파티션으로 메시지를 보내야 할지 성능 가용성등을 고려하여 결정
여러개의 Record로 구성된 Batch level로 메세지 전송


Serializer 직렬화

이동, 저장, 복원을 자유롭게 하기 위해서 바이트 배열 형태로 저장

ProdcuerRecord 객체를 Byte Array로 변경하여 Broker에 전송
Consumer가 받은 Byte Array를 Deserializer를 통해 ConsumerRecord 객체로 역직렬화

props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer. class.getName());

Partitioner

  • key != null
    메시지는 Partitioner를 통해 토픽의 어떤 파티션으로 전송되어야 할 지 미리 결정됨
    특정 Key 값을 가지는 메시지는 특정 파티션으로 고정 전송, 순서 보장

  • key == null

    Round Robin : 최대한 메시지를 파티션에 균일하게 분배하려는 전략
    배치 데이터를 빨리 채우지 못함 -> 전송 늦어짐
    배치를 다 채우지 못하고 전송 -> 전송 성능이 떨어지는 문제 발생

    Sticky Partitioning : 하나의 배치에 메시지를 먼저 채워서 보내는 방식 -> 라운드 로빈의 성능 개선

Sender

별도의 Thread

기본적으로 Thread 간 Async(비동기) 전송
Main Thread가 send()메소드를 호출하여 메시지 전송 후
내부 Buffer(Batch)에 메시지를 저장 후에 별도의 Thread가 Kafka Broker에 전송


Sync 동기

Broker로 부터 성공적으로 받았다는 Ack 메시지를 받은 후 다음 메시지를 전송 안전 중시

RecordMetaData recordMetadata = KafkaProducer.send().get();

호출하여 브로커로 부터 Ack 메시지를 받을 때까지 대기 함 -> 성능이 떨어짐


ASync 비동기

해당 메시지를 성공적으로 받았다는 Ack 메시지를 기다리지 않고 전송 성능 중시
Broker로 부터 Ack 메시지를 비동기로 Producer에 받기 위해 Callback 사용
동기 호출은 성능이 많이 떨어짐 -> 기본적으로 비동기 호출 사용

Callback

다른 코드의 인수로서 넘겨주는 실행 가능한 코드
callback을 넘겨받는 코드는 callback을 즉시 실행할 수도 있고 , 나중에 실행할 수도 있음

Future<RecordMetaData> future = KafkaProducer.send()
  1. Callback을 Interface 로 구성하고 , 호출되어질 메소드를 선언
  2. 해당 Callback을 구현하는 객체 생성 . 즉 호출 되어질 메소드를 구체적으로 구현
  3. 다른 함수의 인자로 해당 Callback을 인자로 전달
  4. 해당 함수는 특정 이벤트 발생 시 Callback에 선언된 메소드를 호출
  • Callback 과정

  1. Main Thread에서 send를 호출하여 sender에서의 Network Thread에 callback을 가지는 객체 전달
  2. Network Thread에서 callback객체는 가지고 message는 Broker에게 전달
  3. ASync 방식이기 때문에 message 1의 ACK를 기다리지 않고 바로 다음 message 전달
  4. Broker가 message 1의 ACK를 보내면 Network Thread는 callback 1에 메타데이터를 채워줌
  5. Main Thread에서 callback 1을 참조하여 Exception 또는 RecordMetadata 확인
  6. ACK가 오지않거나 Retry 가능한 Exception인 경우 재전송 (acks 설정에 따라 다름)
  • 코드
producer.send(producerRecord,new Callback() {
	@Override
	public void onCompletion RecordMetadata metadata, Exception exception) {
		if (exception == null) {

		} else{
			exception.printStackTrace();
        }
    }
});
profile
25/08/12

0개의 댓글