카프카 프로듀서

최진규·2023년 6월 11일
0

카프카

목록 보기
6/7

카프카의 토픽으로 메시지를 보내는 역할을 맡는 어플리케이션

어떤 파티션으로 메시지를 보낼것인가 ?

파티션 선정

파티셔닝이라는 개념이 있는데, 파티셔너는 메시지의 키값을 보고 파티셔닝을 진행한다.
쉽게 말해서 키값을 기준으로 해싱을 진행하고 그 해싱값에 따라서 특정 파티셔닝으로 보낸다.
따라서 특정 메시지를 특정 파티션에 보내야 한다면 키값을 동일하게 유지하면 된다.

랜덤 파티션

키값을 지정하지 않으면 라운드 로빈으로 파티션으로 메시지가 분배된다.

콘솔로 메시지 보내기

일단 터미널에서 카프카 주키퍼와 카프카 브로커를 띄워보자.

  • 주키퍼 시작 : bin/zookeeper-server-start.sh zookeeper.properties
  • 브로커 시작 : bin/kafka-server-start.sh config/server.properties

auto.create.topics.enable

servers.properties에 해당 옵션으로 특정 메시지를 보냈을때 메시지가 전송될 토픽이 존재하지 않는다면 자동생성이 가능하다.
하지만 여기서는 false로 꺼두자.

auto.create.topics.enable=false
실제로 안전하게 개발하기 위해서는 꺼두는게 옳을것 같다.

토픽 생성

  • 토픽 이름 : hello-sunfish
  • 파티션 : 2
  • replication factor : 1

뜯어보면, 파티션이 2개이고, replication factor가 1이니, 리더 파티션 1개로 하겠다는 의미이다.

bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 --partitions 2 --replication-factor 1 --topic hello-sunfish

이렇게 생성을 하고 토픽에 대한 옵션이 어떻게 되는지 알고싶다면, 다음의 명령어로 알 수 있다.

bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic hello-sunfish --describe

결과는 다음과 같다.

Topic: hello-sunfish	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: hello-sunfish	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello-sunfish	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
  • 파티션 갯수는 2 (0번, 1번)
  • replication-factor는 1 -> 브로커 서버가 하나라서 그렇다.
  • 각 파티션의 리더는 0번 브로커에 위치해 있다.
  • ISR은 브로커가 하나라서 0번 하나만 존재한다.

데이터 전송

일단은 카프카 프로듀싱을 위한 쉘 스크립트로 해보자.

다음의 스크립트를 실행한다.

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello-sunfish

>hello
>hello2
>hello3
>hello4
>hello5

이렇게 5개의 메시지를 보내본다.

그리고 제대로 데이터가 들어갔는지를 체크하기 위해서 컨슈밍을 해보자.

데이터 컨슈밍

다음의 스크립트를 실행한다.

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic hello-sunfish --from-beginning

hello2
hello4
hello
hello3
hello5

다음과 같이 나온다.

잘 보면 순서가 보장되지 않는다.

하나의 파티션은 하나의 컨슈머가 담당한다.
즉 2개의 파티션당 하나의 컨슈머가 전담하게 되는데,
우리는 컨슈머 스크립트를 통해서 2개의 파티션에서 동시에 데이터를 읽은것이다.
그렇기 때문에 파티션이 복수인 경우에 파티션별 순서보장은 되지 않는다.

그래도 잘 보면, 2,4이 붙어있고, 0,3,5가 붙어있고 이 묶음끼리는 순서가 보장이 된다.

즉 2,4가 파티션1번으로 0,3,5가 파티션2번으로 들어갔음을 알 수 있고,
각 파티션 내부의 데이터는 순서보장이 된다.
차례대로 파티션에서 컨슈밍하기 때문이다.

자바 어플리케이션으로 프로듀싱

기본

public class KafkaTestProducer {  
  
	public static void main(String[] args) {  
		Properties properties = new Properties();  
		properties.setProperty("bootstrap.servers", "my-kafka:9092");  
		properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		  
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);  
		producer.send(new ProducerRecord<String, String>("hello-sunfish", "java kafka producer test"));  
		producer.close();  
	}  
}

실행해보고 컨슈밍해보자.

hello2
hello4
hello
hello3
hello5
java kafka producer test

다음과 같이 밑에 한줄이 추가된다.

프로듀서에서 send()를 하는 방법은 총 3가지 이다.

동기 전송

위의 방법은 메시지를 보내는데에 실패했는지 알수없다.
그리고 send()future 객체를 반환한다.

future는 자바에서 비동기 처리에 사용된다.
future객체의 결과값을 알려면 get()을 사용해줘야 한다.

try catch로 감싸서 다음과 같이 처리해보자.

public class KafkaTestProducer {  
  
	public static void main(String[] args) {  
		Properties properties = new Properties();  
		properties.setProperty("bootstrap.servers", "my-kafka:9092");  
		properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		  
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);  
		try {  
		RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("hello-sunfish", "java kafka producer test")).get();  
		if (Objects.isNull(metadata)) {  
		producer.send(new ProducerRecord<String, String>("hello-sunfish", "java kafka producer test")).get();  
		}  
		} catch (Exception e) {  
		e.printStackTrace();  
		} finally {  
		producer.close();  
		}  
	}  
}

첫번째 send()에서 요청을 동기로 보내고 결과를 받아봤을때 실패했다면 다시 재전송한다.

비동기 전송

비동기로 전송하면서 에러처리가 가능하다.
콜백 메소드를 등록하는 방법이다.

KafkaTestProducer

public class KafkaTestProducer {  
  
	public static void main(String[] args) {  
		Properties properties = new Properties();  
		properties.setProperty("bootstrap.servers", "my-kafka:9092");  
		properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
		  
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);  
		try {  
		producer.send(new ProducerRecord<String, String>("hello-sunfish", "java kafka producer test"), new KafkaTestProducerCallback());  
		} catch (Exception e) {  
		e.printStackTrace();  
		} finally {  
		producer.close();  
		}  
	}  
	  
}

KafkaTestProducerCallback

public class KafkaTestProducerCallback implements Callback {  
  
	@Override  
	public void onCompletion(RecordMetadata metadata, Exception exception) {  
		if (Objects.nonNull(exception) || Objects.isNull(metadata)) {  
			exception.printStackTrace();  
		}  
	}  
}

특정 파티션으로 지정해서 보내기

파티셔너는 메시지의 key값을 이용해서 파티셔닝을 진행한다.
Key를 활용해서 파티셔닝을 진행해보자.

public class KafkaPartitioningTestProducer {

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "my-kafka:9092");
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {

      String evenKey = "evenKey";
      String oddKey = "oddKey";
      IntStream
        .rangeClosed(0, 10)
          .forEach((i) -> {
            if (i % 2 != 0) {
              producer.send(new ProducerRecord<String, String>("hello-sunfish-odd-even-partitioned", oddKey, String.format("java kafka producer test odd = [%d]", i)));
            } else {
              producer.send(new ProducerRecord<String, String>("hello-sunfish-odd-even-partitioned", evenKey, String.format("java kafka producer test even = [%d]", i)));
            }
          });

      producer.send(new ProducerRecord<String, String>("hello-sunfish", "java kafka producer test"), new KafkaTestProducerCallback());
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer.close();
    }
  }
}

실행해보면, 다음과 같이 컨슈밍된다.

java kafka producer test odd = [1]
java kafka producer test odd = [3]
java kafka producer test odd = [5]
java kafka producer test odd = [7]
java kafka producer test odd = [9]
java kafka producer test even = [0]
java kafka producer test even = [2]
java kafka producer test even = [4]
java kafka producer test even = [6]
java kafka producer test even = [8]
java kafka producer test even = [10]

producer 주요 옵션

bootstrap.servers

카프카 클러스터는 클러스터 마스터라는 개념이 없다.
클러스터 내의 모든 서비스 (브로커)가 클라이언트의 요청을 받을 수 있다.

해당 옵션은 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타냅니다.

호스트 이름:포트

이렇게 사용하면 된다.

my-kafka:8081,my-kafka:8082,...

그런데 전체 리스트를 입력하는것이 항상 좋다.
카프카 클러스터가 살아있는 상태이긴 하지만, 해당 호스트에만 장애가 발생하는 경우 접속이 불가능하다.

클라이언트는 리스트로 주어진 서버중 하나에 장애가 발생해도 다른 호스트로 재시도를 하기 때문이다.

acks

프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack의 수입니다.
해당 옵션의 수가 적으면 성능이 좋지만, 메시지 손실 가능성이 있고, 반대로 숫자가 크면 성능이 좋지 않지만 메시지 손실 가능성은 낮아진다.

acks=0

0으로 설정하는 경우 프로듀서는 서버로부터 어떠한 ack도 기다리지 않는다.
따라서 가장 빠르게 메시지를 보낼 수 있다.
이 경우 서버가 데이터를 받았는지 보장하지 않고 클라이언트는 전송 실패에 대한 결과를 알지 못한다.
전체 브로커가 다운되었는지, 혹은 특정 브로커가 다운되었을때를 고려하지 않고 데이터를 보낸다.
따라서 데이터 전송이 실패했음을 알지 못하기 때문에 재요청 설정도 적용되지 않는다.

acks=1

1로 설정하는 경우, 프로듀서는 ack을 기다리게 된다.
하지만 리더 파티션이 제대로 데이터를 받고 ack을 보내고 ack을 1번 받았기 때문에 팔로워 파티션에서도 제대로 데이터를 받았는지는 알지 못한다.
따라서 이 값은 리더만 제대로 값을 받았는지만 알 수 있다.
만약에 ack이 오지 않는다면, 재요청 설정이 동작하게 된다.

메세지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우 사용한다.

하지만 무조건 손실이 없다고는 할 수 없다.
1. 프로듀서가 메시지 acks=1로 메시지 보냄.
2. 리더 파티션은 데이터를 정상적으로 받고 저장 후 ack을 프로듀서에게 보낸다.
3. 팔로워는 리더 파티션을 보고 데이터를 복제해가야한다. (리더 파티션의 consumer_offset을 참조한다.)
4. 하지만 복제가 필요한 순간에 리더 파티션이 위치한 브로커에 장애가 발생한다.
5. 팔로워 파티션은 데이터를 복제하지 못했는데 카프카 브로커 다운 대응으로 다음 리더로 선출된다.
즉 리더는 ack을 날렸지만 새로운 리더는 데이터가 없는 상황이 될 수 있다.

acks=all or -1

이 경우 모든 팔로워 파티션까지 데이터를 제대로 받았는지 ack을 기다린다.
만약에 팔로워 하나라도 데이터를 받지 못했다면 재요청 설정이 동작한다.

예를 들어서 replication-factor가 10이라면, 프로듀서는 총 10번의 ack을 기다리게 된다.

가장 강력하게 데이터 무손실을 보장하게 된다.

하지만 이 옵션을 제대로 활용하려면 브로커 설정인 min.insync.replicas에 대해서 제대로 이해하고 있어야 한다.

해당 값은 리더가 ack를 보내기전에 확인하는 최소한의 replicas의 숫자이다.

이 값을 잘 이해하고 설정해야 한다.
만약에 replication-factor가 3이고, min.insync.replicas가 1인 상황에서는
acks=all인게 의미가 없어진다.
min.insync.replicas의 값이 1이기 때문에, 즉 리더 하나만 제대로 데이터를 받았으니
복제가 제대로 유지가 됐다고 판단하여 ack을 보내게 된다.

그럼 이 값을 replication-factor와 동일하게 맞추면 가장 안정적일까 ?
그렇지도 않다.

replication-factor가 3이고, min.insync.replicas가 3인 경우에는
하나의 팔로워 파티션이 위치한 브로커에 장애가 발생한다면 모든 프로듀서가 보내는 메시지가 실패하는 상황이 발생한다.
즉 복제가 유지되지 않았으니 무조건 리더 파티션은 ack을 날리지 않는다.

가장 안정적인 케이스는 min.insync.replicas가 2인 경우이다.
적어도 1개의 팔로워가 제대로 복제를 했고, 하나의 브로커가 죽어도 전체 장애로는 이어지지 않는다.

추가적으로는 브로커의 수가 3대가 아닌 5대의 경우에도 min.insync.replicas를 2로 맞추는게 best practice이다.

그 이유는 리더가 다운되어도 적어도 한대의 파티션은 동기화가 되어있을것이고
다음 리더로 선출되는 파티션은 동기화가 잘된 우선순위로 선출되기 때문이다.

buffer.memory

프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트이다.

compression.type

프로듀서가 데이터를 압축해서 보낼 수 있는데 어떤 타입으로 압축할지를 정할 수 있다.

retries

일시적인 오류로 인해서 실패한 데이터를 다시 보내는 횟수

batch.size

프로듀서는 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 한다.
이 설정으로 배치 크기 바이트 단위를 조정할 수 있다.
즉 이 단위보다 큰 데이터는 배치 형태로 데이터를 보내지 않는다.

linger.ms

배치 형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정한다.
배치로 지정된 사이즈보다 작은 메시지는 buffer에서 대기하게 되는데,
이 옵션으로 지정된 시간이 넘어가면 배치 사이즈가 되지 않았어도 즉시 전송하게 된다.
0이 기본값인데, 0이면 즉시 전송을 의미한다.

profile
개발하는 개복치

0개의 댓글