프로듀서가 데이터가 데이터를 전송할 때 내부적으로 처리되는 단계는 다음과 같다.
파티션 지정 않았다면, ProducerRecord의 키를 기준으로 파티셔너가 하나의 파티션 선택
파티션이 선택되면 해당 레코드(ProducerRecord 객체)의 메시지가 저장될 토픽과 파티션을 프로듀서가 알게됨
토픽과 파티션으로 전송될 레코드들을 모은 레코드 배치 추가, 별개의 스레드가 그 배치를 카프카 브로커에 전송
성공 → RecodrMetadata 객체 반환(토픽, 파티션, 패티션 내부의 메시지 오프셋 제공)
실패 → 에러 반환 (에러 수신한 프로듀서는 메시지 쓰기 포기하고 에러 반환전에 몇번 더 재전송 시도 할 수 있음)
따라서 많이 사용하는 타입들을 직렬화 할때는 따로 직렬처리기를 구현하지 않아도 됨
* value.serializer
- 레코드 메시지 값 직렬화 하는 데 사용되는 클래스 이름을 여기서 설정
- 직렬화 방법은 key.serializer와 동일
private Properties kafkaProps = new Properties (); // 1. 카프카 Properties 객체 생성
kafkaProps.put("bootstrap.servers", "brokerl:9892,broker:9892");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 메세지 key와 value 모두 문자열 타입 사용으로 StringSerializer 사용
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <String, String>(kafkaProps); // 새로운 프로듀서 객체 생성, 메세지 key, value (예시는 String)을 설정하고 Properties 객체를 생성자 인자로 전달
자세한 설정 참조 : http://kafka.apache.org/documentation.html#producerconfigs
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France"); // step 1
try {
producer.send(record); // step 2
} catch (Exception e){
e.printStackTrace(); // step 3
}
step 1
- 프로듀서는 전송할 메시지를 갖는 레코드 객체를 생성 (ProducerRecord는 생성자를 여러 개 갖을 수 있음)
- 예시에서는 데이터를 저장할 토픽 이름(항상 String type), 카프카로 전송할 key와 값(예제에서는 String type)을 인자로 받는 생성자 사용
step 2
- 프로듀서 객체의 send()메서드를 사용해서 레코드 전송
- 전송된 메시지는 버퍼(배치)에 수록된 후, 스레드에서 브로커로 전송
- 브로커가 수신된 메시지 성공적으로 쓰면 → RecodeMetadata 반환 → send()메서드가 이를 받아서 자바의 Future 객체를 반환
(예시에서는 메서드 반환값 무시 → 메세지 전송 성공/실패 모름, 실제로는 이렇게 안함)
step 3
- 카프카에게 메시지 전송 전에 프로듀서에 에러가 생기면 예외 처리
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France");
try {
producer.send(record).get(); // step 1
} catch (Exception e){
e.printStackTrace(); // step 2
}
step 1
- Future 객체의 get() 메서드를 사용해서 카프카 응답을 대기
- ProducerRecord 객체가 전송 성공 시, RecodeMetadata 를 받게 되고, 이 객체를 사용해서 카프카에 쓴 메시지의 오프셋 (offset)을 알 수 있다.
- ProducerRecord 객체가 전송 실패하면 예외 처리 발생
step2
- 예외 처리
카프카 프로듀서 사용할 때 발생하는 에러
- 재시도 가능(retriable) 에러
- 연결 에러 : 다시 연결을 시도하여 성공하면 해결 됨
- no lea der(리더 없음) 에러 : 해당 파티션의 새로운 리더가 선출되면 해결 됨
재시도 불가능한 에러
- 메세지가 너무 클 때 : 카프카 프로듀서는 재시도 않고 즉시 예외 반환
비동기식으로 메시지 전송할 때, 에러를 처리하기 위해 프로듀서에 콜백(callback)을 추가
private class DemoProducerCallback implements Callback { // step 1
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e){
if(e != null) {
e.printStackTrace(); // step 2
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Biomedical Materials", "USA"); // step 3
producer.send(record, new DemoProducerCallback()); // step 4
프로듀서를 구성하는 다양한 매개 변수 중, 프로듀서의 메모리 사용, 성능, 신뢰성 등에 영향을 주는 매개변수에 대해 설명
acks
전송된 레코드(ProducerRecord객체)를 수신하는 파티션 리플리카(복제 서버로 동작하는 브로커)의 수
메시지 유실될 가능성에 큰 영향을 준다.
acks=0일 때
프로듀서는 브로커의 응답을 기다리지 않는다. (서버 응답 대기 없어 성능은 빠르고, 높은 처리량 필요시 사용 / 단, 메시지 유실 가능성 높음)
acks=1일 때
리더 리플리카가 메시지 수신할 때, 프로듀서는 브로커로 부터 수신 성공 응답 받음
리더에 메시지를 쓸 수 없다면... → 프로듀서는 에러 응답 → 메시지 재전송
리더가 중단되어 새로운 리플리카가 리더로 선출된 경우 → 메시지 유실 가능성
acks=all 일 때
동기화된 모든 리플리카가 메시지 수신 시, 프로듀서는 브로커의 성공 응답을 받음
가장 안전, 단, 브로커의 메시지를 수신할 때 까지 기다려야 해서 느림
buffer.memory
브로커에게 전송될 메시지의 버퍼로 사용할 메모리의 양 설정
compression.type
이 매개변수 설정 시, 메시지 압축되어 전송
retries
에러가 발생하기 전에 프로듀서가 메시지를 재전송하는 횟수를 제어
retry.backoff.ms
프로듀서가 메시지 재전송 시, 기본적으로 100밀리초를 대기하는 데, 이 매개변수를 사용해서 시간 간격 조정 가능
batch.size
다수의 레코드가 전송될 때, 프로듀서가 batch에 메시지를 모으는 데, 이 매개변수를 사용해서 각 batch에 사용될 메모리양을 제어
linger.ms
현재 배치를 전송하기 전까지 기다리는 시간(밀리초)를 나타냄
client.id
어떤 클라이언트에 전송된 메시지인지 식별하기 위해 브로커가 사용하는 매개변수(문자열 가능)
max.in.flight.requests.per.connection
서버의 응답을 받지 않고, 프로듀서가 전송하는 메세지의 개수를 제어
값이 클 수록, 메모리 사용은 증가하지만 처리량은 좋아짐
단, 너무 큰 값으로 사용하면 배치 처리가 비효율 → 오히려 처리량 감소
timeout.ms / request.time.out / metadata.fetch.timeout.ms
동기화된 리플리카들이 메시지를 인지하는 동안, 브로커가 대기하는 시간 제어
데이터 전송 시 프로듀서가 서버의 응답을 대기 시간 제어
메타데이터 요청 시 프로듀서가 서버의 응답 대기 시간 제어
max.block.ms
프로듀서 전송 버퍼가 가능차거나, 메타 데이터를 요청했지만 사용할 수 없을 때, 해당 매개변수의 값만큼 일시 중단
max.request.size
프로듀서가 전송하는 쓰기 요청의 크기를 제어
recieve.buffer.bytes와 send.buffer.bytes
데이터를 읽고 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기
카프카로 전송하는 객체가 단순 문자열, 정수가 아닐 때, Avro, Thrift, Protobuf 같은 범용 직렬화 라이브러리 사용해서 커스텀 직렬처리기 생성 가능
하지만, 범용 직렬화 라이브러리의 사용을 적극 권장
전송하려는 객체 필드의 타입이 변경되거나 새로 추가된다면, 기존 메세지와 새 메세지 간의 호환 유지에 문제가 생김
-> 범용직렬처리기(JSON, 아파치 Avro, Thrift, Protobuf) 사용을 권장
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id","type": "int"},
{"name": "name","type": "string"},
// 스키마 변경 전 (구버전)
// {"name": "faxNumber","type": ["null","string"],"default": "null"}]
// 스키마 변경 후 (신버전)
, {"name": "email","type": ["null","string"],"default": "null"}]
}
메시지를 쓰는 애플리케이션이 새로운 스키마로 전환하더라고, 애플리케이션은 지속해서 변경 없이 메시지 처리 가능
팩스번호를 없애고 email 필드 추가할 경우 → 스키마 변경 전, 후의 애플리케이션 모두 카프카 이상없이 사용하는 방법 고려 필요
업그레이드 이전 데이터를 읽는 애플리케이션
* getName(), getId(), getFaxNumber() 메서드 존재 / 신버전의 스키마를 사용해서 쓴 메시지를 읽으면 getFaxNumber()는 null을 반환할 것
업그레이드 이후 데이터를 읽는 애플리케이션
* getFaxNumber()는 없고 getEmail() 메서드 존재 / 구버전의 스키마를 사용해서 쓴 메시지를 읽으면 getEmail()은 null을 반환할 것
_즉, 데이터를 읽는 모든 애플리케이션 변경 X, 기존 데이터 변경 X
데이터를 쓰는 데 사용되는 스키마와
읽는 애플리케이션에서 기대하는 스키마가 호환되어야 함
역직렬처리기는 데이터를 쓸 때 사용되었던 스키마를 사용해야 한다
스키마 전체를 레코드에 담기에는 크기 부담
따라서, 카프카에서 사용되는 모든 스키마를 레지스트리에 저장 → 스키마 레지스트리를 사용
모든 데이터는 레지스트리에 저장, 카프카 레코드에는 사용하는 스키마의 식별자 (id)만 저장하면 된다.
컨슈머는 해당 식별자(id)를 이용해서 스키마 레지스트리의 스키마를 가져온 후, 이를 맞춰서 데이터를 역직렬화 가능
이때 모든 작업은 직렬처리기와 역직렬처리기에서 수행된다.
카프카에 데이터를 쓰는 코드에서 스키마 레지스트리 관련 코드를 추가하지 않고, 직렬처리기를 사용하듯이 Avro를 사용하면 된다.
Avro 사용 방법 참고 : http://avro.apache.org/docs/current
만약, 다수의 프로듀서 중, 하나의 프로듀서가 스키마를 변경하여 메시지를 발행 → 토픽에 순차적으로 쌓이면...
(그림, 스키마 저장 및 검색을 위한 Confluent Schema Registry)
스키마 레지스트리는 스키마 생성, 조회, 관리에 대한 HTTP API를 제공
API 상세는 https://docs.confluent.io/home/overview.html 에서 참조
참고 ) https://docs.confluent.io/platform/current/schema-registry/index.html#schemas-subjects-and-topics
ProducerRecord 객체는 토픽 이름과 key, 값을 포함 (대부분)
카프카 메시지는 key와 값의 쌍으로 구성되지만, 기본값이 null로 설정된 키와 함께 토픽과 값만 갖는 ProducerRecord 객체도 생성이 가능
key의 목적
- 메시지를 식별하는 추가 정보를 가짐
- 메시지를 쓰는 토픽의 여러 파티션 중 하나를 결정하는 역할
- 같은 key를 가진 모든 메시지는 같은 파티션에 저장됨
이 장에서는 org.apache.kafka.client 패키지에 포함된 자바 프로듀서 클라이언트에 대해 알아봤지만 여전히 스칼라 언어로 작성된 구버전 프로듀서가 존재
kafka.producer 패키지와 핵심 카프카 모듈(SyncProducers, AsyncProducer)에 포함
- SyncProducers
메시지 추가 전송하기 전에 이미 전송된 메시지가 서버에서 인지되는 것을 기다림
- AsyncProducer
백그라운드로 실행되어 메시지 수집 후, 별도 스레드에서 전송, 전송 성공여부 프로듀서 클라이언트에 알려주지 않음
하지만, 현재 버전의 카프카에서 SyncProducers, AsyncProducer 모두 지원