1 : 1 프로세스 + 1 쓰레드(컨슈머)
2 : 1 프로세스 + n 쓰레드(동일 컨슈머 그룹)
3 : 1 프로세스 + n 쓰레드(다수 컨슈머 그룹)
프로세스 + n 쓰레드(동일 컨슈머 그룹)
main
package com.example.kafkatester;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootApplication
public class KafkaTesterApplication {
private static String TOPIC_NAME = "hello-kafka";
private static String GROUP_ID = "testgroup";
private static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
private static int CONSUMER_COUNT = 3;
private static List<ConsumerWorker> workerThreads = new ArrayList<>();
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
workerThreads.add(worker);
executorService.execute(worker);
}
}
static class ShutdownThread extends Thread {
public void run() {
workerThreads.forEach(ConsumerWorker::shutdown);
System.out.println("Bye");
}
}
}
ConsumerWorker
package com.example.kafkatester;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWorker implements Runnable {
private Properties prop;
private String topic;
private String threadName;
private KafkaConsumer<String, String> consumer;
ConsumerWorker(Properties prop, String topic, int number) {
this.prop = prop;
this.topic = topic;
this.threadName = "consumer-thread-" + number;
}
@Override
public void run() {
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(threadName + " >> " + record.value());
}
consumer.commitSync();
}
} catch (WakeupException e) {
System.out.println(threadName + " trigger WakeupException");
} finally {
consumer.commitSync();
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}