카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리는
카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용하여 애플리케이션을 개발한다.
카프카 클라이언트는 라이브러리이기 때문에 라이프사이클을 가진 프레임워크나 애플리케이션 위에서 구현하고 실행해야한다.
카프카에서 데이터의 시작점이라고 할 수 있다.
프로듀서 애플리케이션은 카프카에서 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전ㅅ송한다.
프로듀서는 파티션을 가지고 있는 브로커와 직접 통신한다.
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
test {
useJUnitPlatform()
}
카프카를 설치하기 위해서 build.gradle에 카프카를 설치해준다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "54.241.156.46:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
프로듀서는 브로커에 레코드를 전송하기 위해 전송하고자하는 토픽을 알고 있어야한다.
토픽을 지정하지 않고서는 브로커에 데이터를 전송할 수 없다.
Properties 객체에는 KafkaProducer 인스턴스를 생성하기 위한 프로듀서 옵션들을 key / value 값으로 선언한다.
프로듀서에 필수옵션은 반드시 선언을 해야하며, 선택옵션은 선언 하지 않아도 되지만, 기본값으로 설정되어 동작한다.
직렬화를 통해 데이터를 전달하기 때문에 직렬화 클래스를 선언한다.
KafkaProducer를 선언하는데 위에서 설정하였던 Properties를 삽입한다.
브로커로 데이터를 전달하기 위해 ProduceRecord를 선언해주고 토픽이름과, 메시지를 넣고 전송을 진행한다.
send 메서드에 record를 삽입하는데 바로 전송되는 것이 아닌, 프로듀서 내부에서 가지고 있다가 배치 형태로 묶어서 브로커에 전송한다. 해당 방식을 배치 전송이라고 한다.
그 후 flush를 통해 버퍼에 있는 레코드 배치를 브로커로 전송한다.
close를 통해 producer 인스턴스 리소스를 안전하게 종료한다.
프로듀서가 브로커에 데이터를 전송하기 전에 토픽을 생성하여 받을 수 있도록 구현해보자
bin/kafka-topics.sh --bootstrap-server {aws IP} --create \
> --topic test \
> --partitions 3
Created topic test.
test라는 이름의 토픽을 생성하고 파티션을 3개로 설정한다.
토픽이 생성되면 상단의 코드를 실행하여 브로커에 메시지를 전송한다.
정상적으로 보내졌다면 설정한 토픽에 데이터가 적재됬을 것이다.
bin/kafka-console-consumer.sh --bootstrap-server 54.241.156.46:9092 --topic test --from-beginning
testMessage
kafka-console-consumer.sh를 통해 확인 할 수 있으며 --from-beginning 옵션을 통해 해당 토픽의 모든 레코드를 확인 할 수 있다.
프로듀서에서 브로커로 레코드를 전송할 때는 파티셔너, 배치 생성 단계를 거친다.
전송 레코드에는 토픽, 파티션, 타임스탬프, 메시지 키, 메시지 값을 넣을 수 있다.
타임스탬프는 기본적으로 브로커 시간을 기준으로 설정되지만, 설정을 통해 레코드 생성시간 또는 임의로도 설정이 가능하다.
KafkaRecorder 인스턴스의 send()메서드가 호출되면 ProducerRecord는 파티셔너에서 어느 토픽의 파티션에 전송될 것인지 정해준다. 따로 설정하지 않으면 DefaultPartitioner로 설정되어 파티션이 정해진다.
파티션이 정해진 레코드는 버퍼에 쌓이고 이후 배치로 묶어서 전송함으로써 프로듀서의 처리량을 향상시키는데 도움을 준다.
프로듀서 API에는 2가지의 파티셔너가 존재한다.
1. UniformStickyPartitioner
2. RoundRobinPartitioner
UniformStickyPartitioner는 프로듀서의 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가지는 특징이 있다.
데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 전송함으로써 향상된 성능을 보장한다.
RoundRobinPartitioner는 카프카 2.4.0이전에 기본 파티셔너로 지정되있었다.
라운드로빈 특징으로 ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적다.
또한 사용자 지정 파티셔너를 생성하기위해 paritioner인터페이스를 제공하며 파티셔너를 통해 지정된 데이터는 어큐물레이터에 버퍼로 쌓인다. 센더는 어큐물레이터에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송한다.
추가적으로 프로듀서는 압축옵션을 통해 브로커로 전송시 압축방식을 정할 수 있는데, 압축옵션을 미지정하면 미압축 상태로 데이터가 전달된다.
압축옵션은 gzip, snappy, lz4, zstd를 지원한다. 압축을하면 네트워크 처리량에서는 이득을 볼 수 있지만, 압축을 하는데에 CPU 또는 메모리 리소스를 사용함으로 상황에 따라 옵션을 사용하는것이 중요하다. 또한 컨슈머도 압축을 해제 할 때 리소스를 사용하기 때문에 주의해야한다.
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름 : 포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에서 이슈가 발생하더라도 접속하는데에 이슈가 없도록 설정가능
key.serializer : 레코드의 메시지 키를 직렬화 하는 클래스를 지정한다.
value.serializer : 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.
ack : 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인 하는데에 사용
0, 1, -1(all) 중에 하나로 설정할 수 있다.
1은 기본값으로 리더 파티션에 데이터가 저장되면 전송 성공으로 판단
0은 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관 없이 성공으로 판단한다.
-1이나 all은 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공한 것으로 판단.
buffer.memory : 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정한다. 기본값 33554432(32MB)이다.
retires : 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다. 기본값 2147483647
batch.size : 배치로 전송할 레코드 최대 용량을 지정한다. 너무 작게 설정하면 프로듀서가 브로커로 더 자주 보내기 때문에 네트워크에 부담을 줄 수 있고, 너무 크게 설정하면 메모리를 더 많이 이용하게 되는 점을 유의하며 사용. 기본값 16384
linger.ms : 배치를 전송하기 전까지 기다리는 최소 시간이다. 기본값 0
partitioner.class : 레코드 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정
enable.idempotence : 멱등성 프로듀서로 동작할 지 여부를 설정. 기본값 false
transactional.id : 프로듀서가 레코드로 전달할 때 레코드를 트렌젝션 단위로 묶을지 여부를 설정한다.
프로듀서의 고유한 트렌젝션 아이디를 설정할 수 있다.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "hodu", "deuk");
메시지 전송시 토픽, 키, 값 순서대로 ProducerRecord 파라메터에 넣고 생성하여 사용가능하다.
bin/kafka-console-consumer.sh --bootstrap-server {aws IP} \
> --topic test \
> --property print.key=true \
> --property key.separator="-" \
> --from-beginning
null-testMessage
hodu-deuk
메시지 키가 지정된 데이터는 kafka-console-consumer 명령을 통해 확인 가능하다.
int partitionNo = 0;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "deuk", "ja");
파티션을 직접 지정하기 위해서는 토픽이름 바로 옆에 키를 지정 해주면 된다.
파티션 번호는 토픽에 존재하는 파티션 번호로 설정해야한다.
프로듀서 환경에 따라 특정 데이터를 가지는 레코드는 특정 파티션으로 보내야 할 경우가 생긴다.
기본 설정 파티셔너를 사용할 경우 메시지 키의 해시값을 파티션에 매칭하여 데이터를 전송하기 때문에 어디 파티션으로 들어가는지 알 수 가 없다. 때문에 사용자 정의 파티셔너를 사용하면 특정 파티션으로 보낼 수 있다.
package custompartitions;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("Need message Key");
}
if (((String) key).equals("Deuk")) {
return 0; //특정 키워드 발견 시 0 파티션으로 삽입
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; //특정 키워드가 아닐경우 해시값 파티션 매칭
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
위 코드 처럼 특정 데이터 발견시 파티션 0번으로 저장하게 설정하고, 나머지 데이터는 메시지 키의 해시값을 파티션에 매칭한다.
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 3dFgtTS8RQuraYRep3-TvQ
[main] INFO custompartitions.CustomProducer - test-0@3
[main] INFO custompartitions.CustomProducer - ProducerRecord(topic=test, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=Deuk, value=ja, timestamp=null)
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
중간의 결과를 보면 test 토픽의 0번째 파티션의 오프셋 번호 3이라는 것을 알 수 있다.
KafkaProducer의 send() 메서드는 Future 객체를 반환한다. 해당 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 정상 적으로 브로커에 적재가 되었는지에 대한 데이터가 포함되어 있다.
get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다.
package custompartitions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
private final static Logger logger = LoggerFactory.getLogger(CustomProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "54.241.156.46:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//커스텀 파티셔너 등록
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
int partitionNo = 0;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "Deuk", "ja");
RecordMetadata meta = producer.send(record).get();
logger.info(meta.toString());
logger.info("{}", record);
producer.flush();
producer.close();
}
}
핵심 코드는 다음과 같다.
RecordMetadata meta = producer.send(record).get();
logger.info(meta.toString());
하지만 위의 코드 처럼 동기식으로 전송결과를 받는것은 빠른 전송에 적합하지 않다. 때문에 Callback을 사용하여 결과를 비동기로 확인 할 수 있다. 프로듀서는 Callback 인터페이스를 제공하고 있다.
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaAsyncCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(KafkaAsyncCallback.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
logger.error(exception.getMessage(), exception);
}else{
logger.info("Callback result => " + metadata.toString());
}
}
}
프로듀서의 코드에서 변경되는 부분은 send에서 다음과 같다.
RecordMetadata meta = producer.send(record, new KafkaAsyncCallback());
[main] INFO custompartitions.CustomProducer - ProducerRecord(topic=test, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=Deuk, value=ja, timestamp=null)
[kafka-producer-network-thread | producer-1] INFO custompartitions.KafkaAsyncCallback - Callback result => test-0@6
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
정상적으로 콜백을 받는 것을 확인 할 수 있다.