토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지가 존재.
예를들어 컨슈머가 파티션 개수보다 많을 경우, 파티션은 컨슈머에 1대1로 할당되고 남는 컨슈머는 데이터를 처리하지 못하고 불필요한 스레드로 남게 된다.
_consumer_offsets
토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.Poll()
메소드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어있다.비명시 오프셋 커밋
이라고 부른다.auto.commit.interval.ms
에 설정된 값과 함께 사용되는데, poll
메소드가 auto.commit.interval.ms
에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋한다.poll()
메소드를 호출할 때 커밋을 수행하므로 코드상에서 따로 커밋 관련 코드를 작성할 필요없음.비 명시적 오프셋 커밋
은 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있다.명시적으로 오프셋을 커밋하려면 poll()
메소드 호출 이후에 반환받은 데이터의 처리가 완료되고 commitSync()
메소드를 호출하면 된다.
commitSync()
메소드는 poll() 메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.commitSync()
메소드는 브로커에 커밋 요청을 하고 커밋이 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다.이를 해결하기 위해 commitAsync() 메소드를 사용하여 커밋 요청을 전송하고 응답이 오기전까지 데이터 처리를 수행할 수 있다.
Fetcher
인스턴스가 생성되어 poll()메소드를 호출하기 전에 미리 레코드들을 내부 큐로 가져온다.poll()
메소드를 호출하면 컨슈머는 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.옵션 명 | 설명 |
---|---|
bootstrap.servers | 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정가능한다. |
key.deserializer | 레코드의 메시지 키를 역직렬화하는 클래스를 지정한다 |
value.deserializer | 레코드의 메시지 값을 역직렬화하는 클래스를 지정한다. |
옵션 명 | 설명 |
---|---|
group.id | 컨슈머 그룹 id를 지정한다. subscribe() 메소드로 토픽을 구독하여 사용할 때는 이 옵션을 필수로 넣어야 한다. 기본값은 null 이다. |
auto.offset.reset | 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다. 이미 컨슈머 오프셋이 있다면 이 옵션 값은 무시된다. 이 옵션은 latest ,earliest , none 중 설정가능하다.latest : 가장 최근에 넣은(높은) 오프셋부터 읽기 시작 earilest : 가장 처음에 넣은(낮은) 오프셋부터 읽기 시작 none : 컨슈머 그룹이 커밋한 기록이 있는지 찾아보고 커밋 기록이 없으면 오류 반환, 존재한다면 기존 커밋 기록 이후 오프셋부터 읽기시작한다 |
enable.auto.commit | 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true 다 |
auto.commit.interval.ms | 자동 커밋(enable.auto.commit=true)일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초)이다. |
max.poll.records | Poll() 메소드를 통해 반환되는 레코드 개수를 지정한다 기본값은 500이다. |
session.timeout.ms | 컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 이 시간 내에 하트 비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 하트비트 시간 간격의 3배로 설정한다. |
heartbeat.interval.ms | 하트 비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다. |
max.poll.interval.ms | Poll() 메소드를 호출하는 간격의 최대 시간을 지정한다. poll() 메소드를 호출한 이후에 데이터를 처리하는데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작 |
isolation.level | 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. read_committed, read_uncommited로 설정할 수 있다. read_commited : 커밋이 완료된 레코드만 읽는다. read_uncommited : 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다. |
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
logger.info("{}","start");
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record : records){
logger.info("{}",record.toString());
}
consumer.commitSync();
}
commitSync()
메소드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있다.commitSync()
메소드를 호출해야 한다.자동 커밋
이나 비동기 오프셋 커밋
보다는 동일 시간당 데이터 처리량이 적다는 특징이 있다.commitSync()
에 파라미터가 들어가지 않으면 poll()로 반환된 가장 마지막 레코드의 오프셋을 기준으로 커밋된다. 만약 개별 레코드 단위로 매번 오프셋을 커밋하고 싶다면 Map<TopicPartition,OffsetAndMetaData>
인스턴스를 파라미터로 넣으면된다. configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
logger.info("{}","start");
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record : records){
logger.info("{}",record.toString());
}
consumer.commitAsync();
}
물론 callback 함수를 따로 구현해줘야한다.
consumer.commitAsync(new OffsetCommitcallback() {
public void onComplete(Map<TopicPartiton, OffsetAndMetadata> offsets, Exception e){
/// 코드 서술
}
})
컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스 가 일어난다.
poll() 메소드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복처리할 수도 있다.
리밸런스 발생시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야한다.
ConsumerrebalanceListener
인터페이스를 지원한다.ConsumerRebalanceListener
인터페이스로 구현된 클래스는 onPartitionAssigned()
메소드와 onPartitionRevoked()
메소드로 이루어져있다.따라서 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서는 리밸런스가 시작하기 직전에 커밋을 하면되므로
onPartitonRevoked()
메소드에 커밋을 구현하여 처리할 수 있다.
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singletonList(TOPIC_NAME),new RebalanceListener());
logger.info("{}","start");
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record : records){
logger.info("{}",record.toString());
}
consumer.commitSync();
}
}
private static class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.warn("Partitions are assigned");
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.warn("Partitions are revoked");
consumer.commitSync();
}
}
컨슈머를 운영할 때 subscribe()
메소드를 사용하여 구독형태로 사용하는 것 외에도 직접 파티션을 컨슈머에 명시적으로 할당하여 운영할 수 있다.
TopicPartition
인스턴스를 지닌 자바 컬렉션 타입을 파라미터로 받는다.TopicPartition
클래스는 카프카 라이브러리 내/외부에서 사용되는 토픽, 파티션의 정보를 담는 객체로 사용된다.private final static int TOPIC_NUMBER = 0;
public static void main(String ...args){
//...
consumer = new KafkaConsumer<String, String>(configs);
// consumer.subscribe(Collections.singletonList(TOPIC_NAME),new RebalanceListener());
// 컨슈머가 특정 파티션을 바라보도록
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
//...
}
assign()
메소드가 사용된 것을 볼 수 있다.subscribe()
와는 다르게 직접 컨슈머가 특정 토픽, 특정파티션에 할당되므로 리밸런싱 하는 과정이 없다.assignment()
메소드는 Set<TopicPartition> 인스턴스를 반환한다.토픽이름
과 파티션 번호
가 포함된 객체이다. // 컨슈머가 특정 파티션을 바라보도록
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
Set<TopicPartition> assignedTopicPartition = consumer.assignment();
logger.info("topicInfo = {}", assignedTopicPartition);
컨슈머 애플리케이션은 안전하게 종료되어야 한다.
세션 타임아웃
이 발생할때까지 컨슈머 그룹에 남게된다.컨슈머 랙이 일어나면 데이터 처리 지연이 발생하게 된다
컨슈머를 안전하게 종료하기 위해 KafkaConsumer
클래스는 wakeup() 메소드를 지원한다.
wakeup()
메소드가 실행된 이후 poll() 메소드가 호출되면 WakeupException 예외가 발생한다.마지막에는 close()
메소드를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알려주면 종료가 완료되었다고 볼수 있다.
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record.toString());
}
consumer.commitSync();
}
}catch (WakeupException we){
logger.warn("Wakeup consumer");
} finally{
consumer.close();
}
그렇다면 wakeup() 메소드는 어디서 호출하면 될까?
자바 어플리케이션의 경우 코드 내부에 셧다운 훅(shotdown hook)
을 구현하여 안전한 종료를 명시적으로 구현할 수 있다
셧다운 훅이란 사용자 또는 운영체제로부터 종료 요청을 받으면 실행하는 스레드를 뜻한다.
public class Consumer {
private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
//...
public static void main(String ...args){
//...
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
//...
}
//...
static class ShutdownThread extends Thread{
@Override
public void run() {
logger.info("Shutdown hook");
consumer.wakeup();
}
}
kill -TERM {프로세스 번호}
를 호출하여 셧다운 훅을 발생시킬 수 있다.https://bigsun84.tistory.com/355 보면 kill에 대한 자세한 옵션 나와있음
실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는 것만큼 카프카에 설정된 내부 옵션을 확인하는 것도 중요함. 내부옵션을 가장확인하기 쉬운방법은 브로커 중 한대에 접속하여 카프카 브로커 옵션을 확인
하는 것이지만 매우 번거로운 작업
ACL(Access Control List)
이 적용된 클러스터의 리소스 접근 권한 규칙 추가를 할 수 있다.AdminClient
클래스로 해당 토픽의 파티션을 늘릴 수 있다.public class Admin {
public static void main(String ...args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(properties);
}
}
프로듀서 API
와 컨슈머 API
와는 다르게 추가설정 없이 클러스터 정보에 대한 설정만 하면 된다.KafkaAdminClient
를 반환받는다.메소드명 | 설명 |
---|---|
describeCluster(DescribeClusterOptions options) | 브로커의 정보조회 |
listTopics(ListTopicsOptions options) | 토픽 리스트 조회 |
listConsumerGroups(ListConsumerGroupsOptions options) | 컨슈머 그룹 조회 |
createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options ) | 신규 토픽생성 |
createPartitions(Map<String, NewPartition> newPartitions, CreatePartitionsOptions options) | 파티션 개수 변경 |
createAcls(Collection<AclBinding> acls, CreateAclsOptions options) | 접근 제어 규칙 생성 |
public class Admin {
static final Logger logger = LoggerFactory.getLogger(Admin.class);
public static void main(String ...args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(properties);
logger.info("==GET Broker information");
try {
for (Node node : admin.describeCluster().nodes().get()){
logger.info("log={}",node);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigsResult = admin.describeConfigs(Collections.singleton(configResource));
describeConfigsResult.all().get().forEach((broker,config)->{
config.entries().forEach(configEntry -> logger.info(configEntry.name() + "=" + configEntry.value()));
});
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singleton("test")).all().get();
logger.info("{}",topicInformation);
어드민 API 종료
admin.close();