Kafka 메세지 쓰기

dev.jhjhj·2022년 3월 20일
0

3.1 프로듀서 개요

프로듀서가 데이터가 데이터를 전송할 때 내부적으로 처리되는 단계는 다음과 같다.
  

    1. 카프카에 쓰려는 메시지를 갖는 ProducerRecord를 생성 (전송할 토픽과 값을 포함, 선택적으로 Key 값과 파티션 지정 가능)
    1. 메세지 객체들이 네트워크로 전송될 수 있도록 바이트 배열로 직렬화  진행, 직렬처리기(serializer)와 컴포넌트(클래스)가 처리
    1. 해당 데이터는 파티셔너와 컴포넌트(클래스)에 전달

파티션 지정 않았다면, ProducerRecord의 키를 기준으로 파티셔너가 하나의 파티션 선택
파티션이 선택되면 해당 레코드(ProducerRecord 객체)의 메시지가 저장될 토픽과 파티션을 프로듀서가 알게됨
토픽과 파티션으로 전송될 레코드들을 모은 레코드 배치 추가, 별개의 스레드가 그 배치를 카프카 브로커에 전송

    1. 브로커 수신된 메시지 처리 후 응답

성공 → RecodrMetadata 객체 반환(토픽, 파티션, 패티션 내부의 메시지 오프셋 제공)
실패 → 에러 반환 (에러 수신한 프로듀서는 메시지 쓰기 포기하고 에러 반환전에 몇번 더 재전송 시도 할 수 있음)

3.2 카프카 프로듀서 구성하기

  • 카프카 프로듀서를 수행하는 객체는 ProducerRecord
  • 카프카에 메시지를 쓰려면 ProducerRecord 객체를 먼저 생성
  • ProducerRecord 은 세개의 필수 속성을 갖음 (이 외에도 원하는 속성 설정 가능)
    bootstrap.server
    - 카프카 클러스터에 최초 연결을 위해 프로듀서가 사용하는 브로커들의 host:port 목록을 설정
    key.serializer
    - 프로듀서가 생성하는 레코드(ProducerRecord 객체)의 메시지 키를 직렬화하기 위한 클래스 이름을 이 속성에 설정 (key 없이 값만 전송할 때도 key.serializer 설정 필요)
    - key.serializer에 설정하는 클래스는 "org.apache.kafka.common.serialization.Serializer" 인터페이스를 구현해야
    - 카프카 클라이언트 패키지에는 다음과 같은 3가지 Serializer가 포함되어 있다.
    ByteArraySerializer, StringSerializer,IntegerSerializer

따라서 많이 사용하는 타입들을 직렬화 할때는 따로 직렬처리기를 구현하지 않아도 됨
* value.serializer
- 레코드 메시지 값 직렬화 하는 데 사용되는 클래스 이름을 여기서 설정
- 직렬화 방법은 key.serializer와 동일

private Properties kafkaProps = new Properties ();		// 1. 카프카 Properties 객체 생성
kafkaProps.put("bootstrap.servers", "brokerl:9892,broker:9892");

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //  메세지 key와 value 모두 문자열 타입 사용으로 StringSerializer 사용
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer <String, String>(kafkaProps);		// 새로운 프로듀서 객체 생성, 메세지 key, value (예시는 String)을 설정하고 Properties 객체를 생성자 인자로 전달

자세한 설정 참조 :  http://kafka.apache.org/documentation.html#producerconfigs

메세지 전송 방법에는 3가지

  • Fire-and-forget(전송 후 망각)
    • send() 메서드로 메시지 전송만 (성공 여부 후속 조치 X)
    • 전송 실패할 경우, 프로듀서가 자동으로 재전송 시도
    • but, 메시지 유실 가능성 있음
  • Synchronous send(동기식 전송)
    • send() 메서드로 메시지 전송하면 Java의 Future 객체 반환 → Future 객체의 get() 메소드 호출 후, 작업 완료 대기 → 브로커로부터 처리결과 반환
  • Asynchronous send(비동기식 전송)
    • send() 메서드 호출할 때, callback 메서드를 구현한 객체를 매개변수로 전달(뒤에 설명)
    • 이 객체가 구현된 콜백 메서드는 카프카 브로커로부터 응답을 받을 때 자동 호출 →send()가 성공적으로 수행되었는지 확인 가능

3.3 카프카에 메시지 전송하기

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France");  // step 1
 
try {
    producer.send(record);      // step 2
} catch (Exception e){
    e.printStackTrace();        // step 3
}
  • step 1
    - 프로듀서는 전송할 메시지를 갖는 레코드 객체를 생성 (ProducerRecord는 생성자를 여러 개 갖을 수 있음)
    - 예시에서는 데이터를 저장할 토픽 이름(항상 String type), 카프카로 전송할 key와 값(예제에서는 String type)을 인자로 받는 생성자 사용

    • key과 값 타입은 직렬처리기와 프로듀서 객체(producer)에서 사용하는 타입과 같아야 함
  • step 2
    - 프로듀서 객체의 send()메서드를 사용해서 레코드 전송
    - 전송된 메시지는 버퍼(배치)에 수록된 후, 스레드에서 브로커로 전송
    - 브로커가 수신된 메시지 성공적으로 쓰면 → RecodeMetadata 반환 → send()메서드가 이를 받아서 자바의 Future 객체를 반환
    (예시에서는 메서드 반환값 무시 → 메세지 전송 성공/실패 모름, 실제로는 이렇게 안함)

  • step 3
    - 카프카에게 메시지 전송 전에 프로듀서에 에러가 생기면 예외 처리

3.3.1 동기식으로 메시지 전송하기

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France");
 
try {
    producer.send(record).get();        // step 1
} catch (Exception e){
    e.printStackTrace();        // step 2
}
  • step 1
    - Future 객체의 get() 메서드를 사용해서 카프카 응답을 대기
    - ProducerRecord 객체가 전송 성공 시, RecodeMetadata 를 받게 되고, 이 객체를 사용해서 카프카에 쓴 메시지의 오프셋 (offset)을 알 수 있다.
    - ProducerRecord 객체가 전송 실패하면 예외 처리 발생

  • step2
    - 예외 처리

    • 카프카 메시지 전송 전 에러
    • 전송 중 카프카 브로커가 전송 재시도 불가능 에러
    • 사용 가능한 전송 재시도 횟수 전부 소진
  • 카프카 프로듀서 사용할 때 발생하는 에러
    - 재시도 가능(retriable) 에러
    - 연결 에러 : 다시 연결을 시도하여 성공하면 해결 됨
    - no lea der(리더 없음) 에러 : 해당 파티션의 새로운 리더가 선출되면 해결 됨
    재시도 불가능한 에러
    - 메세지가 너무 클 때 : 카프카 프로듀서는 재시도 않고 즉시 예외 반환

3.3.2 비동기식으로 메시지 전송하기

비동기식으로 메시지 전송할 때, 에러를 처리하기 위해 프로듀서에 콜백(callback)을 추가

private class DemoProducerCallback implements Callback {        // step 1
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e){
        if(e != null) {
            e.printStackTrace();        // step 2
        }
    }
}
 
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Biomedical Materials", "USA");        // step 3
producer.send(record, new DemoProducerCallback());      // step 4
  • step 1
    * 콜백 구현 시, org.apache.kafka.clients.producer.Callback 인터페이스 상속
  • step 2
    * 카프카가 에러 반환 시, onCompletion() 메서드에서 예외 처리
  • step 3
    * 전송할 메세지 객체 생성
  • step 4
    * send() 메서드로 메시지 전송 시, 콜백 객체에 인자 전달
    • 따라서 메시지 쓴 후, 응답을 반환할 때, 이 객체의 onCompletion() 메서드가 자동 호출

3.4 프로듀서 구성하기

프로듀서를 구성하는 다양한 매개 변수 중, 프로듀서의 메모리 사용, 성능, 신뢰성 등에 영향을 주는 매개변수에 대해 설명

acks

전송된 레코드(ProducerRecord객체)를 수신하는 파티션 리플리카(복제 서버로 동작하는 브로커)의 수

메시지 유실될 가능성에 큰 영향을 준다.

acks=0일 때
프로듀서는 브로커의 응답을 기다리지 않는다. (서버 응답 대기 없어 성능은 빠르고, 높은 처리량 필요시 사용 / 단, 메시지 유실 가능성 높음)
acks=1일 때
리더 리플리카가 메시지 수신할 때, 프로듀서는 브로커로 부터 수신 성공 응답 받음
리더에 메시지를 쓸 수 없다면... → 프로듀서는 에러 응답 → 메시지 재전송
리더가 중단되어 새로운 리플리카가 리더로 선출된 경우 → 메시지 유실 가능성
acks=all 일 때
동기화된 모든 리플리카가 메시지 수신 시, 프로듀서는 브로커의 성공 응답을 받음
가장 안전, 단, 브로커의 메시지를 수신할 때 까지 기다려야 해서 느림

buffer.memory

브로커에게 전송될 메시지의 버퍼로 사용할 메모리의 양 설정

compression.type

이 매개변수 설정 시, 메시지 압축되어 전송

retries

에러가 발생하기 전에 프로듀서가 메시지를 재전송하는 횟수를 제어

retry.backoff.ms

프로듀서가 메시지 재전송 시, 기본적으로 100밀리초를 대기하는 데, 이 매개변수를 사용해서 시간 간격 조정 가능

batch.size

다수의 레코드가 전송될 때, 프로듀서가 batch에 메시지를 모으는 데, 이 매개변수를 사용해서 각 batch에 사용될 메모리양을 제어

linger.ms

현재 배치를 전송하기 전까지 기다리는 시간(밀리초)를 나타냄

client.id

어떤 클라이언트에 전송된 메시지인지 식별하기 위해 브로커가 사용하는 매개변수(문자열 가능)

max.in.flight.requests.per.connection

서버의 응답을 받지 않고, 프로듀서가 전송하는 메세지의 개수를 제어

값이 클 수록, 메모리 사용은 증가하지만 처리량은 좋아짐

단, 너무 큰 값으로 사용하면 배치 처리가 비효율 → 오히려 처리량 감소

timeout.ms / request.time.out / metadata.fetch.timeout.ms

  • 동기화된 리플리카들이 메시지를 인지하는 동안, 브로커가 대기하는 시간 제어

  • 데이터 전송 시 프로듀서가 서버의 응답을 대기 시간 제어

  • 메타데이터 요청 시 프로듀서가 서버의 응답 대기 시간 제어

max.block.ms

프로듀서 전송 버퍼가 가능차거나, 메타 데이터를 요청했지만 사용할 수 없을 때, 해당 매개변수의 값만큼 일시 중단

max.request.size

프로듀서가 전송하는 쓰기 요청의 크기를 제어

recieve.buffer.bytes와 send.buffer.bytes

데이터를 읽고 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기

3.5 직렬처리기

3.5.1 커스텀 직렬처리기

  • 카프카로 전송하는 객체가 단순 문자열, 정수가 아닐 때, Avro, Thrift, Protobuf 같은 범용 직렬화 라이브러리 사용해서 커스텀 직렬처리기 생성 가능

  • 하지만, 범용 직렬화 라이브러리의 사용을 적극 권장

  • 전송하려는 객체 필드의 타입이 변경되거나 새로 추가된다면, 기존 메세지와 새 메세지 간의 호환 유지에 문제가 생김

-> 범용직렬처리기(JSON, 아파치 Avro, Thrift, Protobuf) 사용을 권장

3.5.2 아파치 Avro를 사용해서 직렬화하기

아파치 Avro

  • 언어 중립적인 데이터 직렬화 시스템
  • 언어 독립적인 스키마로 데이터 구조를 표한하는 데 주로 JSON 형식
  • 직렬화 역시 JSON을 지원하지만, 주로 이진 파일 사용
{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "id","type": "int"},
        {"name": "name","type": "string"},
        // 스키마 변경 전 (구버전)
        // {"name": "faxNumber","type": ["null","string"],"default": "null"}]
        // 스키마 변경 후 (신버전)
        , {"name": "email","type": ["null","string"],"default": "null"}]
}

Avro가 메시지 시스템에 사용하는 데 적합한 이유

  • 메시지를 쓰는 애플리케이션이 새로운 스키마로 전환하더라고, 애플리케이션은 지속해서 변경 없이 메시지 처리 가능

  • 팩스번호를 없애고 email 필드 추가할 경우 → 스키마 변경 전, 후의 애플리케이션 모두 카프카 이상없이 사용하는 방법 고려 필요

  • 업그레이드 이전 데이터를 읽는 애플리케이션
    * getName(), getId(), getFaxNumber() 메서드 존재 / 신버전의 스키마를 사용해서 쓴 메시지를 읽으면 getFaxNumber()는 null을 반환할 것

  • 업그레이드 이후 데이터를 읽는 애플리케이션
    * getFaxNumber()는 없고 getEmail() 메서드 존재 / 구버전의 스키마를 사용해서 쓴 메시지를 읽으면 getEmail()은 null을 반환할 것

  • _즉, 데이터를 읽는 모든 애플리케이션 변경 X, 기존 데이터 변경 X

주의사항

데이터를 쓰는 데 사용되는 스키마와
읽는 애플리케이션에서 기대하는 스키마가 호환되어야 함
역직렬처리기는 데이터를 쓸 때 사용되었던 스키마를 사용해야 한다

3.5.4 카프카에서 Avro 레코드 사용하기

스키마 전체를 레코드에 담기에는 크기 부담

따라서, 카프카에서 사용되는 모든 스키마를 레지스트리에 저장 → 스키마 레지스트리를 사용

모든 데이터는 레지스트리에 저장, 카프카 레코드에는 사용하는 스키마의 식별자 (id)만 저장하면 된다.

컨슈머는 해당 식별자(id)를 이용해서 스키마 레지스트리의 스키마를 가져온 후, 이를 맞춰서 데이터를 역직렬화 가능

이때 모든 작업은 직렬처리기와 역직렬처리기에서 수행된다.

카프카에 데이터를 쓰는 코드에서 스키마 레지스트리 관련 코드를 추가하지 않고, 직렬처리기를 사용하듯이 Avro를 사용하면 된다.

Avro 사용 방법 참고 : http://avro.apache.org/docs/current

스키마 레지스트리란?

  • 카프카 클라이언트 사이에서 메시지의 스키마를 저장하고 관리하는 웹 애플리케이션
등장배경
  • 다수의 프로듀서가 존재할 때, 컨슈머 입장에서는 어떤 프로듀서가 메시지를 보냈는 지 모른다.
  • 또한, 프로듀서의 쓰기 작업은 메시지 큐의 가장 말단에서만 실행 중간에 수정 작업을 진행할 수 없다.

만약, 다수의 프로듀서 중, 하나의 프로듀서가 스키마를 변경하여 메시지를 발행 → 토픽에 순차적으로 쌓이면...

  • 컨슈머는 변경된 스키마에 대해 적절히 대응하지 못한다.
스키마 레지스트리의 목적
  • 토픽별로 메시지 key와 value 스키마 버전 관리
  • 스키마 호환성 규칙 강제
  • 스키마 버전 별 호환성을 강제하여, 스키마가 변경되어도 컨슈머 입장에서 메시지를 적절하게 구독할 수 있게 한다.
스키마 버전 조회

             (그림, 스키마 저장 및 검색을 위한 Confluent Schema Registry)

스키마 레지스트리는 스키마 생성, 조회, 관리에 대한 HTTP API를 제공

API 상세는 https://docs.confluent.io/home/overview.html 에서 참조

참고 ) https://docs.confluent.io/platform/current/schema-registry/index.html#schemas-subjects-and-topics

3.6 파티션

기본 파티션

  • ProducerRecord 객체는 토픽 이름과 key, 값을 포함 (대부분)

  • 카프카 메시지는 key와 값의 쌍으로 구성되지만, 기본값이 null로 설정된 키와 함께 토픽과 값만 갖는 ProducerRecord 객체도 생성이 가능

  • key의 목적
    - 메시지를 식별하는 추가 정보를 가짐
    - 메시지를 쓰는 토픽의 여러 파티션 중 하나를 결정하는 역할
    - 같은 key를 가진 모든 메시지는 같은 파티션에 저장됨

기본 파티셔너의 특징

  • 키가 null이고, 카프카의 기본 파티셔너를 사용하면
    - 사용 가능한 토픽의 파티션들 중 하나가 무작위로 선택됨
    - 각 메시지에 저장된 메시지 개수의 균형을 맞추기 위해 라운드 로빈 알고리즘을 사용함
  • 키가 있고 기본 파티셔너를 사용하면
    - 카프카에서 key의 해시값을 구한 후, 그 값에 따라 특정 파티션에 메시지 저장

3.7 구버전의 프로듀서 API들

이 장에서는 org.apache.kafka.client 패키지에 포함된 자바 프로듀서 클라이언트에 대해 알아봤지만 여전히 스칼라 언어로 작성된 구버전 프로듀서가 존재

  • kafka.producer 패키지와 핵심 카프카 모듈(SyncProducers, AsyncProducer)에 포함
    - SyncProducers
    메시지 추가 전송하기 전에 이미 전송된 메시지가 서버에서 인지되는 것을 기다림
    - AsyncProducer
    백그라운드로 실행되어 메시지 수집 후, 별도 스레드에서 전송, 전송 성공여부 프로듀서 클라이언트에 알려주지 않음

  • 하지만, 현재 버전의 카프카에서 SyncProducers, AsyncProducer 모두 지원

profile
어제보다 더 나은

0개의 댓글