프로젝트에 Kafka를 도입하기 이전에 먼저 Spring Server 2개를 가지고 테스트를 해보려 한다.
가장 먼저 EC2 환경에 쉽게 이식할수 있도록 로컬에서도 도커를 활용해 띄우려고 한다.
services:
kafka:
image: bitnami/kafka:latest
ports:
- '9092:9092'
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
해당 docker-compose를 통해서 도커를 활용해 kafka를 띄워놓았다.
이후 Spring Server를 로컬에서 가동시킨다.

implementation 'org.springframework.kafka:spring-kafka'
최신 버전으로 spring에 의존성을 추가해줬는지
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
bootstrap-servers: localhost:9092
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
프로듀서와 컨슈머의 데이터 변환 설정을 추가했다.
serialization
deserializer
package org.example.server1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class Producer {
private static final String TOPIC = "test";
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
log.info("프로듀서가 메세지 보냅니다.");
this.kafkaTemplate.send(TOPIC, message);
}
}
프로듀서의 설정으로 어떤 토픽에 연결할지 지정한다.
package org.example.server2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Consumer {
@KafkaListener(topics = "test", groupId = "junGroup")
public void listen(String message) {
log.info("컨슈머가 메세지 poll 합니다.");
log.info("junGroup 이 받은 message: " + message);
}
}
컨슈머는 해당 토픽의 이름과 데이터를 전달받는 그룹의 이름을 지정한다.
package org.example.server1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
@Slf4j
public class Controller {
private final Producer producer;
@PostMapping("kafka")
public String testApi(@RequestBody String message) {
log.info("API 수신 했습니다.");
producer.sendMessage("Jun 님이 message 보냅니다");
return "api 통신 성공";
}
}



이처럼 카프카를 사용해 메세지를 주고 받았다.
실제 서비스에서는 Json 형식으로 데이터를 주고 받아야 하기 때문에, Json 타입 처리를 진행해보려고 한다.
카프카 객체 Json 통신
카프카에서 이동시에 Json으로 값을 주고받기 위해서는
yml의 value 직렬화 부분에 Json 직렬화를 명시해야한다.
프로듀서
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
컨슈머
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
컨슈머의 경우에는
JsonDeserializer 는 기본적으로 모든 Java 패키지의 클래스 로딩을 허용하지 않는다
악의적인 공격을 방지하기 위함이다 !
따라서
properties:
spring.json.trusted.packages: '*'
or
spring.json.trusted.packages: 'org.example.domain’
직접적인 클래스 명시가 가능하다 !
처리가 필요하다 !
이렇게 사용하면 만드는 카프카 프로듀서가 모두 같은 설정으로 만들어지기에 클래스에서 직접 명시해줄수도 있다.
하지만 여기에선 그냥 이렇게 사용해보기로 한다 !
→ 따라서 해당 설정을 통해서 Json 연결을 해놓았다.
또한 이전처럼 Yml으로 전역의 카프카 설정을 해놓으면 추후에 더 많은 프로듀서 and 컨슈머가 생길시에 사용할수가 없다는 문제점으로 -> 객체에 직접 카프카 프로듀서 컨슈머 설정을 진행할려고 한다.
Producer 와 Consumer 클래스를 제작한다
package org.example.server1;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// Key & Value 직렬화 설정
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package org.example.server1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Value("${kafka.group}")
private String KafkaSpringGroup;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// consumer 그룹 설정
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
// Key & Value 직렬화 설정
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
받을떄 객체를 지정
package org.example.server2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
String KafkaServerIp = "127.0.0.1:9092";
String KafkaSpringGroup = "junGroup";;
@Bean
public ConsumerFactory<String, KafkaDto> consumerFactory() {
JsonDeserializer<KafkaDto> deserializer = new JsonDeserializer<>(KafkaDto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// consumer 그룹 설정
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
// Key & Value 직렬화 설정
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
아예 받을때 객체로 변환해서 받을수 있으나
이런 경우에는 해당 컨슈머는 그냥 해당 객체 전용으로만 사용해야한다 !
따라서 송신측에선 Json 으로 변환해서 보내구
수신측에선 String으로 받아서 객체로 변환하는것이 더 좋을거 같다는 판단이 들었다 !
우선 결과적으로 -> 해당 방법으로 수신하니

정상적으로 수신이 되는것을 확인할수 있다

server:
port: 8080
spring:
application:
name: server1
package org.example.server1;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class KafkaDto {
private String sender;
private String title;
private String content;
}
config
package org.example.server1;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
String KafkaServerIp = "127.0.0.1:9092";
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// Key & Value 직렬화 설정
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package org.example.server1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
String KafkaServerIp = "127.0.0.1:9093";
String KafkaSpringGroup = "junGroup";
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// Broker 서버 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// consumer 그룹 설정
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
// Key & Value 직렬화 설정
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
producer
package org.example.server1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class Producer {
// private static final String TOPIC1 = "test";
// private final KafkaTemplate<String, String> kafkaTemplate1;
//
// public void sendMessage(String message) {
// log.info("프로듀서가 메세지 보냅니다.");
// this.kafkaTemplate1.send(TOPIC1, message);
// }
private static final String TOPIC2 = "json";
private final KafkaTemplate<String, Object> kafkaTemplate2;
public void sendObject(KafkaDto kafkaDto) {
log.info("프로듀서가 객체 보냅니다.");
this.kafkaTemplate2.send(TOPIC2, kafkaDto);
}
}
consumer
package org.example.server2;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Consumer {
// @KafkaListener(topics = "test", groupId = "junGroup")
// public void listen1(String message) {
// log.info("컨슈머가 메세지 poll 합니다.");
// log.info("junGroup 이 받은 message: " + message);
// }
@KafkaListener(topics = "json", groupId = "objectGroup")
public void listen2(String str) {
log.info("컨슈머가 객체 poll 합니다.");
log.info("junGroup 이 받은 객체: " + str);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
KafkaDto kafkaDto;
try {
kafkaDto = mapper.readValue(str, KafkaDto.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
log.info(kafkaDto.toString());
}
}

성공적으로 데이터를 전달받았다.
이처럼 String 형식으로 받는다면 컨슈머 입장에서 자신이 원하는 형식의 DTO로 변환해서 사용할수 있고 하나의 토픽에 프로듀서가 같은 형식의 데이터를 준다는 보장이 없기에 ->
String 형식이 더 적합하다고 생각한다.

Json 으로 받을때 에러가 발생했다
직렬화 오류라고 한다
Kafka-(de)Serialize 에러, ErrorHandlingDeserializer
ErrorHandlingDeserializer 설정 | 폭간의 기술블로그
[여러개의 카프카가 존재할때 구성하는 설정] Spring Kafka 구성(Broker, Producer, Consumer)