[KAFKA] 프로듀서 애플리케이션

.·2024년 6월 24일

KAFKA

목록 보기
12/21

1. 프로듀서 애플리케이션

카프카에서 데이터의 시작점은 바로 프로듀서이다.
카프카 프로듀서 애플리케이션의 역할은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 특정 리더 파티션에 전송해준다.
프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신한다.
프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.

프로듀싱할 때는 kafka-console-producer.sh 명령어를 사용하여 토픽에 직접 데이터를 넣을 수 있는데, 이는 주로 테스트 환경에서 사용한다.
일반적인 환경에서는 프로듀서 애플리케이션을 개발해서 토픽에 데이터를 넣어준다.

프로듀서 애플리케이션은 일반적으로 자바 애플리케이션으로 개발하는데 파이썬, 자바스크립트 등과 같은 언어로도 개발할 수 있다.
카프카는 공식적으로 자바 라이브러리만 제공하기 때문에, 자바가 아닌 언어로 프로듀서 애플리케이션을 구현할 경우 성능 또는 기능적인 면에서 제한적일 수 있다는 점을 유념해야 한다.

2. 프로듀서 애플리케이션의 내부 구조

  • ProducerRecord : 프로듀서에서 생성하는 레코드. 오프셋은 미포함
  • send() : 레코드 전송 요청 메소드
  • Partitioner : 어느 파티션으로 전송할 지 지정하는 파티셔너. 기본값으로 DefaultPartitioner
  • Accumulator : 배치로 묶어 전송할 데이터를 모으는 버퍼

3. 파티셔너

프로듀서 API 사용 시 UniformStickyPartitionerRoundRobinPartitioner가 제공된다.
카프카 클라이언트 라이브러리 2.5.0 버전에서는 파티셔너를 별도로 지정해주지 않으면 레코드 프로듀싱 시 UniformStickyPartitioner가 디폴트 파티셔너로 지정된다.

1) 메시지 키가 있을 경우 동작 원리

메시지 키가 있을 경우 UniformStickyPartitioner와 RoundRobinPartitioner 모두 메시지 키의 해시값을 파티션과 매칭하여 레코드를 전송한다.
그러므로 동일한 메시지 키를 가지는 레코드들은 동일한 파티션 번호로 저장된다.

토픽이 이미 생성된 후 중도에 파티션 개수를 줄이는 것은 불가하지만, 파티션 개수를 증설하는 것은 가능하다.
문제는 이렇게 파티션 개수가 변경될 경우 메시지 키의 해시값으로 파티션이 매칭되기 때문에 메시지 키와 파티션 번호의 매칭이 깨지게 된다는 것이다.
그러므로 메시지 키를 활용해야 한다면 프로듀서가 전송하는 데이터량과 컨슈머가 처리하는 데이터량을 고려하여 충분히 큰 개수로 파티션 개수를 지정해야 한다.

프로듀서가 초당 100개의 데이터를 보내고, 컨슈머가 초당 10개의 데이터를 처리할 수 있다면 10개의 컨슈머를 생성하면 될 것이다.
10개의 컨슈머를 띄우려면 10개의 파티션을 만들면 된다.
이 때 프로듀서가 보내는 데이터량이 많아질 수 있는 경우를 고려해 20~50개 사이로 파티션 개수를 넉넉하게 지정하는 것이 좋다.

2) 메시지 키가 null인 경우 동작 원리

메시지 키가 없을 경우는 파티션에 균등하게 분배하는 로직을 사용한다.

RoundRobinPartitioner는 말 그대로 라운드 로빈 방식으로 순회하는 방식이다.
프로듀서 레코드가 들어오는대로 파티션을 순회하면서 전송하는데, 어큐뮤레이터에서 배치로 묶이는 정도가 적다.
즉 한번에 보낼 수 있는 데이터량이 적기 때문에 전송 성능이 낮다는 단점이 있다.

UniformStickyPartitioner는 RoundRobinPartitioner의 단점을 개선하며 등장했고, 효율을 극대화한 파티셔너이다.
이 파티셔너는 어큐큐레이터에서 레코드들이 배치로 묶일 때 까지 기다렸다 전송한다.
배치로 묶일 뿐 결과적으로는 파티션을 순회하면서 보낸다는 것은 동일하기 때문에 모든 파티션에 골고루 분배하여 전송한다.
파티션에 균등하게 분배한다는 점을 지키면서 배치로 묶어 전송 효율을 높였기 때문에 RoundRobinPartitioner에 비해 향상된 성능을 가진다.

3) 프로듀서의 커스텀 파티셔너

프로듀서의 파티셔너로 UniformStickyPartitioner와 RoundRobinPartitioner가 제공되지만, 파티셔너를 따로 생성하여 사용할 수도 있다.
카프카 클라이언트 라이브러리에서 제공되는 Partitioner 인터페이스를 구현하여 커스텀 파티셔너를 생성할 수 있다.
커스텀 파티셔너 클래스에서 메시지 키 또는 메시지 값에 따른 파티션 지정 로직을 만들어줄 수 있다.

4. 프로듀서 주요 옵션

1) 필수 옵션

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

프로듀서 애플리케이션을 운영할 때 default 값이 없어서 필수로 지정해줘야 하는 옵션들이 있다.
만약 필수 옵션에 대한 정보를 넣어주지 않으면 프로듀서 애플리케이션을 실행할 수 없다.

bootstrap.servers

프로듀서가 데이터를 어느 서버로 전송할 지에 대한 설정이다.
프로듀서가 데이터를 전송할 대상이 되는 카프카 클러스터에 속한 브로커의 호스트이름:포트를 1개 이상 작성한다.
상용환경에서는 일반적으로 2개 이상의 브로커 정보를 작성하여 일부 브로커에 장애가 발생하더라도 접속하는 데 이슈가 없도록 설정해준다.

key.serializer

레코드의 메시지 키를 직렬화하는 클래스를 지정한다.

value.serializer

레코드의 메시지 값을 직렬화하는 클래스를 지정한다.

프로듀서에서 레코드를 보낼 때는 직렬화를 진행한다.
이 직렬화를 통해 데이터를 저장할 때 모든 종류의 데이터를 저장할 수 있게 만들어준다.
컨슈머에서는 직렬화된 데이터를 풀기 위해 역직렬화를 진행해야 하므로, 프로듀서가 보낼 데이터를 어떻게 역직렬화할 지는 상호 협의되어 있어야 한다.

float이나 int형으로 이루어진 데이터를 무조건 String으로 직렬화 처리해버린다면 그만큼의 데이터 사용량을 발생시킨다.
프로듀서에서 브로커로 데이터를 보낼 때에도 네트워크 사용량이 더 많아지고, 로그 세그먼트로 데이터를 저장할 때도 더 많은 메모리 공간을 사용하므로 효율에 좋지 않을 수 있다.
그러므로 상황에 따라서 float이나 int의 직렬화 또는 커스텀 직렬화 클래스를 사용하는 것도 좋은 방법이다.

그런데 String으로 직렬화하지 않을 경우 문제가 있다.
첫번째는 kafka-console-consumer에서 데이터를 보지 못할 수도 있다.
kafka-console-consumer에서 데이터를 확인할 때 byte array로 역직렬화하는데 String으로도 같이 볼 수 있다.
그러므로 String이 아닌 방식으로 직렬화한 경우 kafka-console-consumer를 사용해서 데이터를 확인하지 못할 수도 있다.
디버깅을 위해 kafka-console-consumer를 사용할 경우가 있으므로 특별한 경우가 아니라면 String 직렬화 방식을 사용한다.

두번째는 특정한 방식으로 직렬화해서 데이터를 보냈으나, 컨슈머 쪽에서 역직렬화하는지 모르거나, 역직렬화 하더라도 작동이 잘 안될 수 있다는 것이다.

그러므로 String 직렬화로 통일하게 되면 네트워크 혹은 데이터 사용량 측면에서 불리할 수 있으나 여러 운영상의 이점이 있으므로 보통은 String Serializer를 사용한다.

2) 선택 옵션

선택 옵션은 default 값이 있어서 필요할 경우에만 따로 지정해줄 수 있는 옵션이다.

acks

기본값: 1 (리더 파티션에 데이터가 잘 적재되면 성공)

프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데 사용하는 옵션이다.
0, 1, -1(all) 중 하나로 설정할 수 있다.

프로듀서가 데이터를 리더 브로커에 보내고, 팔로워 파티션이 이 리더 브로커에서 데이터를 복제해놓는다.
이 때 리더 파티션에 잘 적재될 경우 성공으로 볼 지, 리더 파티션과 팔로워 파티션에 모두 잘 적재될 경우 성공으로 볼 지, 적재 여부를 확인하지 않고 전송만 하면 성공으로 볼 지에 대한 설정이다.
그러므로 acks 옵션에 따라 데이터에 대한 신뢰도 정도가 결정된다.

linger.ms

기본값: 0 (즉각적으로 데이터 전송)

어큐뮬레이터에 배치를 전송하기 전 까지 기다리는 최소 시간이다.
기본값이 0이면 send()를 통해 데이터를 보낼 때 기다리지 않고 즉각적으로 sender를 통해 브로커로 데이터가 전달된다.
일부 지연이 발생하더라도 배치로 모아서 데이터를 전송하고 싶을 경우 10(0.01초) 이나 100(0.1초) 정도로 설정해줄 수 있다.

retries

기본값: 2147483647

브로커로부터 에러를 받고 난 후 재전송을 시도하는 횟수를 지정한다.

max.in.flight.requests.per.connection

기본값: 5

한번에 요청하는 최대 커넥션 개수이다.
sender를 통해 데이터를 보낼 때 그 쓰레드가 몇 개인지에 대한 설정이다.
설정된 값만큼 동시에 전달 요청을 수행한다.
데이터 처리량이 많을 경우 이 설정값을 높게 변경하는 것도 고려해볼 수 있다.

partitioner.class

기본값: org.apache.kafka.clients.producer.internals.DefaultPartitioner

레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다.
파티셔너 클래스는 메시지 키에 따라 레코드를 어느 파티션으로 보낼 지 지정해주는 역할을 한다.
이 기본 파티셔너로 UniformStickyPartitioner가 지정되며, 커스텀 파티셔너 클래스를 사용할 수도 있다.

enable.idempotence

기본값: false

멱등성 프로듀서로 동작할지 여부를 설정한다.
프로듀서와 브로커가 통신할 때 네트워크 이슈 발생 시 중복해서 데이터를 전송할 경우가 있는데, 이를 방지하기 위한 용도로 사용한다.
2.5.0 버전에서는 기본값이 false이지만 3.x 버전부터는 기본값이 true로 변경되었다.
true로 설정 시 데이터 전송에 대한 신뢰도를 높일 수 있다.

transactional.id

기본값: null

프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지의 여부를 설정한다.
transactional.id를 설정하게 되면 enable.idempotence는 자동으로 true로 변경된다.
null은 트랜잭션으로 동작하는 것이 아닌 레코드를 지속적으로 보내는 용도로 지정한다.

5. ISR과 acks

1) ISR

프로듀서는 레코드를 리더 파티션으로 전송한다.

팔로워 파티션은 리더 파티션을 주기적으로 감시한다.
새로운 레코드가 들어와서 오프셋이 증가하게 되면 팔로워 파티션은 리더 파티션으로부터 부족한 오프셋만큼 레코드를 복제해온다.

이렇게 리더 파티션과 팔로워 파티션의 레코드가 서로 동기화된다.
ISR(In-Sync-Replicas)은 리더 파티션과 팔로워 파티션의 레코드 오프셋 개수가 동일하여 모두 싱크된 상태를 의미한다.
이런 동기화를 통해 리더 파티션에 장애가 발생할 경우팔로워 파티션을 리더 파티션으로 승격시켜 사용하더라도 정합성을 보장받을 수 있다.

ISR이라는 용어가 나온 이유는 팔로워 파티션이 리더 파티션으로부터 데이터를 복제해오는 데는 시간이 소요되기 때문이다.
리더 파티션에 레코드가 적재된 후 팔로워 파티션이 복제해오는 시간차로 인해 리더 파티션과 팔로워 파티션 간 오프셋 차이가 발생하게 된다.

이렇게 레코드에 차이가 날 때는 ISR에는 리더 파티션만 포함되고, 팔로워 파티션이 리더 파티션으로부터 모든 데이터를 복제해오면 그때 팔로워 파티션까지 ISR에 포함된다.

참고 포스팅
https://velog.io/@namyj97/KAFKA-%EC%B9%B4%ED%94%84%EC%B9%B4-%EB%B8%8C%EB%A1%9C%EC%BB%A4%EC%99%80-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0

2) acks

카프카 프로듀서의 acks옵션은 0, 1, all(-1) 값을 가질 수 있다.
acks 옵션을 어떤 값으로 지정하느냐에 따라 데이터를 얼마나 신뢰성있게 저장할 지의 정도가 결정된다.
그러므로 처리량의 중요도, 데이터 정합성의 중요도에 따라 acks 옵션을 적절히 설정하는 것이 중요하다.
acks 옵션에 따라 성능이 달라질 수 있으므로 acks 옵션에 따른 카프카 동작 방식을 인지하고 옵션값을 설정해야 한다.
acks 옵션은 신뢰성을 높이고 성능을 낮출지, 성능을 높이고 신뢰성을 낮출지에 대한 선택이 된다.
리더 파티션과 팔로워 파티션에 모두 동일하게 레코드가 저장된 것을 신뢰도가 높다고 본다.

acks=0

acks를 0으로 설정하는 것은 프로듀서가 리더 파티션으로 레코드를 전송하였을 때 리더 파티션으로 데이터가 잘 적재되었는지 확인하지 않는다는 의미이다.
전송 도중 장애가 발생하여 레코드가 적재되지 않더라도 아무것도 확인하지 않고 send() 메소드를 호출하기만 하면 전송 성공으로 판단한다는 것이다.

이 경우 응답값을 받아 적재 성공 여부를 확인하는 작업이 없기 때문에 속도는 가장 빠르나, 정상 적재 결과를 알 수 없기 때문에 신뢰도는 가장 낮다.
그러므로 데이터 일부 유실을 감안하더라도 전송 속도가 중요할 경우 이 옵션값을 사용하면 된다.
예를 들어 GPS나 네비게이션 데이터와 같이 데이터 일부 유실보다는 전체적인 데이터를 파악하여 실시간으로 빠르게 판단해야 할 경우 이 옵션을 사용한다.

acks=1, min.insync.replicas=2

acks를 1로 설정하는 것은 프로듀서가 보낸 데이터가 리더 파티션에만 정상적으로 적재되었는지 확인한다는 것이다.
프로듀서가 리더 파티션이 있는 브로커와 통신하여 리더 파티션에 레코드가 잘 적재되었는지에 대한 응답값을 받는다.

만약 리더 파티션에 정상적으로 적재되지 않았다는 응답을 받으면, 팔로워 파티션에서 승급된 리더 파티션으로 레코드 재전송을 시도한다.
이 경우 akcs를 0으로 설정하는 것 보다 신뢰도가 향상되지만, 적재 성공 여부 응답값을 받기까지 시간이 소요되기 때문에 데이터 처리량과 같은 성능 측면에서는 불리하다.

acks=1 설정은 acks=0 보다 신뢰성이 높지만, 완벽한 신뢰성을 갖추는 것은 아니다.
replication factor(복제 개수)를 2개 이상으로 운영할 경우가 그러한데, 리더 파티션 적재 성공 여부만 확인하기 때문에 리더 파티션에 적재가 완료되었더라도 팔로워 파티션에는 아직 동기화되지 않을 가능성이 있다.
리더 파티션 적재 여부만 보고 성공으로 판단해버리기 때문에 팔로워 파티션이 데이터를 복제해가기 이전에 장애가 발생하면 리더 파티션과 팔로워 파티션 간 동기화가 이루어지지 못하고 데이터 유실이 일어날 수 있다.
하지만 아주 큰 장애가 일어나는 경우가 아니면 LAG 지연이 많이 발생하지 않기 때문에 일반적인 경우 보통 1로 설정하여 운영한다.

acks=all(-1)

acks를 all(또는 -1)로 설정하는 것은 프로듀서가 보낸 데이터가 리더 파티션과 팔로워 파티션에 모두 정상적으로 적재되었는지 확인한다는 의미이다.
프로듀서가 레코드를 전송한 후 리더 파티션에 적재되었는지, 팔로워 파티션에 적재되었는지를 모두 확인한 후 그에 대한 응답값을 받는다.

리더 파티션, 팔로워 파티션 모두 잘 적재되었는지 확인하고 응답값을 받는 시간이 소요되기 때문에 acks=0이나 acks=1에 비해 데이터 처리량은 가장 낮다.
all로 설정할 경우 성능이 현저히 낮아지기 때문에 데이터 처리량이 많을 경우 이 옵션값으로 사용하면 안된다.
성능 저하를 차치하더라도 데이터가 절대 유실되면 안될 경우에만 이 옵션을 사용한다.

replication factor가 3이라면 1개의 리더 파티션과 2개의 팔로워 파티션이 생성될 것이다.
이 경우 acks=all로 설정하면 리더 파티션 1개와 팔로워 파티션 2개를 모두 확인하게 되는 것일까?

이 때 min.insync.replicas 옵션에 따라 확인하는 파티션 개수가 결정된다.
이 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 적재되었는지 확인하기 위한 ISR 그룹의 최소 파티션 개수이다.

min.insync.replicas 옵션값이 2라면 총 3개의 파티션이 존재하더라도 리더 파티션 1개, 팔로워 파티션 1개로 총 2개의 파티션만 확인하게 된다.
즉, acks=all이라도 나머지 하나의 팔로워 파티션까지는 확인하지 않는다는 것이다.

팔로워 파티션까지 적재여부를 확인하는 이유는 리더 파티션에 장애가 발생할 경우 리더 파티션과 동기화된 팔로워 파티션을 리더 파티션으로 승격시켜 사용하기 위해서이다.
그런데 2대 이상의 서버가 동시에 장애가 발생할 경우는 극히 드물다.
즉, 일부의 팔로워 파티션만 동기화 해놓더라도 데이터 신뢰성을 보장하며 운영할 수 있다.
acks=-1로 설정할 경우 보통 min.insync.replicas=2로 설정한다.

min.insync.replicas 옵션값을 1로 설정하면 ISR 중 하나의 파티션에 데이터가 적재되었음을 확인하는 것이다. 이 경우 acks=1로 설정하는 것과 동일하게 동작하며 리더 파티션 적재 성공 여부만 확인한다.
ISR 중 가장 처음 적재가 완료되는 파티션은 리더 파티션이기 때문이다.

0개의 댓글