토픽의 레코드를 전송할 때 특정 파티션을 지정하여 전송할 수 있다.

이번 포스팅을 통해 여러 파티션으로 토픽이 전송되는 것을 확인해볼 예정이므로, server.properties의 num.partitions를 3 정도로 설정해둔다.
ProducerWithPartitionTest.java 전체코드
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerWithPartitionTest {
private final static Logger logger = LoggerFactory.getLogger(ProducerWithPartitionTest.class);
private final static String TOPIC_NAME = "topic-with-partition";
private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for(int i = 1; i <= 2; i++) {
for(int j = 1; j <= 5; j++) {
ProducerRecord<String, String> record;
if(j == 1) {
/**
* 1. 토픽의 값만 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 메시지값 설정
* (토픽명, 메시지값은 필수 설정값)
*/
record = new ProducerRecord<>(TOPIC_NAME, "NoKey" + i + "-Value" + j);
} else if(j == 2) {
/**
* 2. 토픽의 파티션 번호 지정, 키,값 쌍으로 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 파티션 번호, 3. 메시지키, 4. 메시지값 설정
*/
record = new ProducerRecord<>(TOPIC_NAME, 2, "Key" + i, "Key" + i + "-Value" + j);
} else {
/**
* 3. 토픽의 키,값 쌍으로 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 메시지키, 3. 메시지값 설정
*/
record = new ProducerRecord<>(TOPIC_NAME, "Key" + i, "Key" + i + "-Value" + j);
}
producer.send(record);
logger.info("{}", record);
}
}
producer.flush();
producer.close();
}
}
1부터 5까지 2번씩 반복하며, 레코드를 생성하고 전송하는 코드이다.
if(j == 1) {
/**
* 1. 토픽의 값만 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 메시지값 설정
* (토픽명, 메시지값은 필수 설정값)
*/
record = new ProducerRecord<>(TOPIC_NAME, "NoKey" + i + "-Value" + j);
}
j가 1일 경우 값만 전송한다.
파라미터로 1. 토픽명 2. 메시지값 만을 설정해준다.
else if(j == 2) {
/**
* 2. 토픽의 파티션 번호 지정, 키,값 쌍으로 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 파티션 번호, 3. 메시지키, 4. 메시지값 설정
*/
record = new ProducerRecord<>(TOPIC_NAME, 2, "Key" + i, "Key" + i + "-Value" + j);
}
j가 2일 경우 2번 파티션에 메시지 키-값 쌍으로 보낸다.
이 때는 1. 토픽명, 2. 파티션 번호, 3. 메시지 키, 4. 메시지값 순으로 설정해준다.
else {
/**
* 3. 토픽의 키,값 쌍으로 보낼 경우
* 레코드 생성 시 생성자의 파라미터로 1. 토픽명, 2. 메시지키, 3. 메시지값 설정
*/
record = new ProducerRecord<>(TOPIC_NAME, "Key" + i, "Key" + i + "-Value" + j);
}
j가 3~5일 경우 파티션 지정없이 키-값 쌍의 레코드를 전송한다.
이 경우 1. 토픽명, 2. 메시지키, 3.메시지값 순으로 설정한다.

그럼 이제 이런 레코드들이 차례대로 생성될 것인데, 각각의 레코드들이 어느 파티션으로 전송되는지에 주목하여 확인해보자.

어플리케이션을 기동하고, 카프카 설정에 대한 로그를 확인한다.

로그로 찍어준 레코드 정보를 확인해보자.
키를 설정하지 않은 레코드는 key=null, 키를 설정해준 레코드는 key=각 키값을 로그로 확인할 수 있다.
더불어 파티션을 지정하지 않은 레코드는 partition=null, 파티션을 지정한 레코드는 지정해준 파티션 번호를 확인할 수 있다.

그렇다면 실제로도 토픽에 이 내용이 반영되었는지 확인해보자.
server.properties 설정에 맞게 토픽 생성 시 3개의 파티션이 각각 생성되었다.

0번 파티션을 확인한다.
key1의 3~5번 레코드들이 저장되어 있다.

1번 파티션을 확인한다.
key2의 3~5번 레코드들과 키가 null인 레코드가 저장되어 있다.

2번 파티션을 확인한다.
key1의 2번, key2의 2번 레코드와 키가 null인 레코드가 저장되어 있다.

어떤 규칙에 의해 각각의 파티션에 분배된 것일까?
메시지 키가 존재하는 경우에는 해당 키의 해시값을 이용하여 파티션에 분배된다. 이로 인해 key가 동일한 경우 동일한 파티션으로 전송된다.
그러므로 같은 키로 지정된 Key1의 레코드들은 연달아 0번 파티션으로, Key2의 레코드들은 연달아 1번 파티션으로 분배된 것이다.
메시지 키를 지정하지 않은 경우 메시지 키가 null로 설정된다.
이 경우 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(레코드 전송 묶음)로 라운드 로빈 방식으로 전송된다.
그러므로 키를 지정해주지 않아 키가 null인 레코드들은 1번, 2번 파티션에 나뉘어 저장된 것이다.
그리고 키를 지정하였으나, 파티션을 특정한 경우의 레코드들은
해시값에 의한 파티션 분배 원칙을 따르는 것이 아니라 지정해준 파티션 번호인 2번 파티션으로 저장된다.
[KAFKA] kafka-console-producer.sh 포스팅 참고
https://velog.io/@namyj97/KAFKA-kafka-console-producer.sh