Kafka 세팅
Spring 세팅
Kafka과 Spring의 연결에 앞서
배포보다는 Kafka의 여러 설정들과 Spring에서 어떻게 다룰 수 있는지를
확인하기 위해 비교적 쉽게 Kafka는 도커 컴포즈로 세팅하고
Spring은 인텔리제이를 통해 호스트 머신에서 작동하게 세팅했다.
구글에 kafka를 검색하면 두 가지 Confluent Kafka와 Apache kafka가 소개된다.
Apache Kafka
Confluent Kafka두 가지의 차이점은
Apache Kafka가 오리지널 오픈소스 Kafka이고
Confluent Kafka는 위 오리지널 kafka에서 편의 기능 등을 개조, 확장한 버전의 kafka이다.
기존에 카프카를 다뤄봤을때는 아무것도 모르는 상태에서 Confluent버전의 카프카를 다뤄봤기 때문에
이번에는 오리지널 버전의 Apache Kafka를 다뤄봤다.
Conflunet, Apache 둘 모두 최신 버전 사용시 Zookeeper없이 사용이 가능하다.
(Kraft로 대체됨)
도커 컴포즈로 실행하고자 할때 yml을 통해 설정 할 수 있는 값들을 공식문서에서 찾아보면 그 수가 엄청나다.
Apache Kafka 공식 문서(Docker 부분)
이해가 안되는 설정값들도 존재하고 양이 너무 많기에 크게 연결, 보안 외에는 크게 건들지 않고 진행했다.
services:
kafka:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_KRAFT_MODE: 'true'
KAFKA_LISTENERS: PLAINTEXT://kafka:9092, CONTROLLER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_PROCESS_ROLES: broker, controller
KAFKA_NODE_ID: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- ./data:/var/lib/kafka/data
추후 SSL 인증 통신도 설정해볼 예정이다.
ports:
- "9092:9092"
#해당 카프카는 도커 내부의 9092포트를 사용하고
#도커 외부에서 도커의 9092포트로 통신을 시도할 경우 카프카 9092포트로 포워딩한다.
KAFKA_KRAFT_MODE: 'true'
#기존 zookeeper대신 사용할 kraft를 위한 설정값이다.
KAFKA_LISTENERS: PLAINTEXT://kafka:9092, CONTROLLER://kafka:9093
#카프카의 브로커가 수신할 URI 리스트
#PLAINTEXT://kafka:9092 별다른 암호화없이 도커 내부 기준 kafka:9092로 들어오는 요청 수신 대기
#CONTROLLER://kafka:9093 kraft 사용시 필요한 컨트롤러 리스너로 현재 ports에 9093을 등록하지 않았기에
#사실 의미없는 URI지만 kraft 사용시 위 값이 없으면 실행이 되지 않는다.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
#도커 외부에서 노출될 URI로 "도커의 카프카에 접속하고 싶다면
#도커 외부 호스트 머신의 localhost:9092로 접속해야 한다"와 같은 의미이다.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, CONTROLLER:PLAINTEXT
#리스너 별 보안 프로토콜 매핑인데 리스너에서의 보안 프로토콜과
#내부에서 받아들여질 프로토콜이 일치하여야한다.
#ex) PLAINTEXT:SSL같은 경우는 불가능하다. 컨트롤러는 예외로 CONTROLLER:SSL 가능
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
#컨트롤러로 담당할 리스너의 이름이다.
#임의로 지정이 가능하나 보안 프로토콜 매핑, 리스너도 일괄되게 수정해줘야 한다.
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
#Kraft에서 필요한 설정값으로 카프카 브로커 컨트롤러 투표자 목록이다.
#현재는 브로커가 하나밖에 없기에 하나의 후보밖에 없다.
#형태는 <node_id>@<hostname>:<port>
KAFKA_PROCESS_ROLES: broker, controller
#해당 kafka노드가 무슨 역할을 하는 애플리케이션인지를 나타낸다.
#현재 브로커이면서 컨트롤러 역할도 수행중
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
#토픽이 없으면 토픽 자동 생성 설정값
이 정도가 카프카를 실행시키고 스프링과 연결시키기에 필요한 최소한의 설정값들인 것 같다.
위 설정대로 실행하면 싱글 브로커로 실행된다.
스프링에서는 카프카와의 연결을 위해 Producer와 Consumer 그리고 필요하다면 Admin까지의 세팅이 필요하다.
각각
Producer
토픽에 데이터를 보내게 되는데
Spring의 특성상 클래스 형태를 json형태로 직렬화 시켜 보내게 된다.
Consumer
특정 토픽을 구독하고 해당 토픽에 추가되는 메세지를 역직렬화 시켜서 스프링에서 다룰 수 있다.
Admin
어드민의 경우 메세지의 전송, 수신을 제외한 나머지 기능들을 다룰 수 있다.
우선 카프카와의 통신을 위해 전반적인 yml 데이터가 필요하다.
spring:
...
...
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: single
auto-offset-reset: earliest
producer:
retries: 3
acks: all
topic:
name: test-topic
partitions: 3
replication-factor: 1
bootstrap-servers: localhost:9092
#카프카 서버의 주소이다. 카프카 yml에서 KAFKA_ADVERTISED_LISTENERS에서 지정해둔 주소와 일치시킨다.
# Conumser #
group-id: single
#컨슈머의 그룹 아이디로 여러개의 컨슈머가 같은 그룹 아이디를 소유 할 수 있다.
#이렇게 된다면 하나의 컨슈머만 데이터를 받게된다.
#컨슈머 아이디를 같게 하는 경우는 다 똑같은 역할을 하는 컨슈머를 한대로 묶을때 그룹 아이디를 묶는다.
#아이디의 값은 맘대로 설정할 수 있다.
auto-offset-rset: earliest
#컨슈머가 데이터를 수신할때 최신순으로 수신한다.
# Consumer #
# Producer #
retries: 3
데이터 전달할때 최대 몇번까지 재시도할지 정한다.
acks: all
#스프링이 브로커로부터 어떤 응답을 받을지에 대한 설정값이다.
#0 = 브로커의 응답을 받지 않음, 1 = 리더 브로커의 응답만 기다림, all = 모든 브로커의 응답을 기다림
# Producer #
# Topic #
#토픽의 값들은 임의로 정한것들인데 스프링 내부에서 처리해도 상관없다.
카프카와의 통신에 필요한 데이터들이다.
//KafkaProperties
@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
private String bootstrapServers;
private ConsumerProperties consumer;
private ProducerProperties producer;
private TopicProperties topic;
@Getter
@Setter
public static class ConsumerProperties {
private String groupId;
private String autoOffsetReset;
}
@Getter
@Setter
public static class ProducerProperties {
private int retries ;
private String acks;
}
@Getter
@Setter
public static class TopicProperties {
private String name;
private int partitions;
private short replicationFactor;
}
}
사용하기 유용하게 @ConfigurationProperties를 사용하여 가져와서 사용했다.
찾으면서 알게 된 내용
Consumer와 Producer 모두 사용하기전에 직접 Factory를 Bean으로 등록해서 사용해야 한다고 알고 있었는데, 실제로는 Spring-Kafka가 자동으로 Factory를 등록해준다고 한다.
Spring-Kafka에 KafkaAutoConfiguration이라는 클래스가 존재하는데 이 클래스가 그 역할을 하고 있었다.
따라서 기본적인 연결 및 작동을 원할때는 각각 따로 Factory를 Bean으로 등록하지 않아도 문제가 없다.
다른 추가적인 Consumer를 등록하고 싶다면 아래와 같이 등록하면 된다.
@Bean
public ConsumerFactory<String, String> consumerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
yml에 값을 잘 기재하였다면 이미 자동으로 등록하여 문제가 없지만 수동적으로 등록하고 싶다면
위와 같이 Bean으로 등록하고 나면 스프링에서 컨슈머를 사용할 수 있다.
컨슈머를 사용하는 방법으로는 크게 두 가지가 있다.
사용 방법으로는
@KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "kafkaListenerContainerFactory")
public void getTopicTestTopic(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
구독하고 싶은 토픽을 구독하고 그룹아이디를 설정하면 해당 메소드를 통해 원하는 작업을 진행 할 수 있다.
원하는 컨슈머가 있다면 containerFactory에서 원하는 컨슈머를 선택하면 된다.
사용 방법은
private KafkaConsumer<String, String> createConsumerProps(String topic) {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> newKafkaConsumer = new KafkaConsumer<>(consumerProps);
Collection<String> topicList = List.of(topic);
newKafkaConsumer.subscribe(topicList);
return newKafkaConsumer;
}
위 코드에서는 토픽만 넘겨받아 새로운 컨슈머를 만들고 있지만 아예 원하는 대로 컨슈머를 생성하여 사용할 수도 있다.
만들어진 컨슈머는 다음과 같이 사용할 수 있다.
public KafkaConsumerTopicMsgDto getTopicMessages(String topic) {
KafkaConsumer<String, String> newKafkaConsumer = createConsumerProps(topic);
List<String> messageList = new ArrayList<>();
try {
newKafkaConsumer.poll(Duration.ofSeconds(1));
Set<TopicPartition> assignedPartitions = newKafkaConsumer.assignment();
while (assignedPartitions.isEmpty()) {
newKafkaConsumer.poll(Duration.ofMillis(100));
assignedPartitions = newKafkaConsumer.assignment();
}
newKafkaConsumer.seekToBeginning(assignedPartitions);
int maxEmptyPolls = 3;
int emptyPollCount = 0;
while (emptyPollCount < maxEmptyPolls) {
ConsumerRecords<String, String> messages = newKafkaConsumer.poll(Duration.ofSeconds(1));
if (messages.isEmpty()) {
emptyPollCount++;
} else {
messages.forEach(record -> messageList.add(record.value()));
emptyPollCount = 0;
}
}
newKafkaConsumer.commitSync();
} catch (Exception e) {
return new KafkaConsumerTopicMsgDto(List.of("Error: " + e.getMessage()));
} finally {
newKafkaConsumer.close();
}
return new KafkaConsumerTopicMsgDto(messageList.isEmpty() ? List.of("No messages found") : messageList);
}
이 코드는 새로만든 컨슈머를 통해 해당 컨슈머가 구독하고 있는 토픽의 모든 메세지를 리턴하고 있다.
하지만 이 방법은 새로운 컨슈머를 생성하고 해제를 수동적으로 진행하기 때문에
특별한 상황이 아니라면 정해진 컨슈머만 사용하는 방법을 추구해야 할 것 같다.
카프카 프로듀서도 컨슈머와 마찬가지로 기본적인 factory는 자동 생성이 된다.
수동적으로 만들고 싶다면 다음과 같이 하면 된다.
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
값들은 Consumer와 중복되는 값들도 있다.
Producer KafkaTemplate를 활용하기 때문에 Template도 같이 등록해야 한다.
사용 방법은 간단한다.
public String sendMessage(KafkaProducerSendMsgDto body) {
String topic = body.getTopic();
String message = body.getMessage();
kafkaTemplate.send(topic, message);
return "success to send message " + message;
}
kafkaTemplate.send() 를 통해 원하는 토픽에 원하는 데이터를 넣으면 된다.
카프카는 기본적으로 1MB의 데이터만 보낼 수 있기에 큰 데이터를 보낸다면
배치, 압축, 청크의 방법으로 새롭게 ProducerFactory를 정의하여 해당 Template을 사용하면 된다.
카프카 어드민은 AdminClient라는 클래스를 사용하게 되는데,
이 AdminClient는 ProducerFactory, ConsumerFactory와는 달리 자동 생성이 되지 않음으로
수동적으로 등록해줘야 한다.
@Bean
public AdminClient createKafkaAdminClient(KafkaAdmin kafkaAdmin) {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
위와 같이 등록하고
public List<String> getTopicList() throws ExecutionException, InterruptedException {
return adminClient.listTopics()
.listings()
.get()
.stream()
.map(TopicListing::name)
.toList();
}
public void createNewTopic(String topic) {
KafkaProperties.TopicProperties topicProperties = kafkaProperties.getTopic();
NewTopic newTopic = new NewTopic(
topic,
topicProperties.getPartitions(),
topicProperties.getReplicationFactor()
);
Collection<NewTopic> topicList = new ArrayList<>();
topicList.add(newTopic);
adminClient.createTopics(topicList);
}
이렇게 토픽 리스트를 받아온다거나 토픽을 만드는 것과 같은 작업을 수행할 수 있다.