해당 글에서는 하나의 카프카에 존재하는 두 토픽에 각각 다른 타입의 객체 전송하는 방법을 빠르게
살펴봅니다. 카프카의 개념이나 작성된 코드의 의미에 대해 설명하지 않으며
, 일부 세팅이 실제 운영환경에는 적합하지 않을 수 있습니다. 예제는 Spring환경에서 KafkaTemplate을 활용해 만들었습니다.
저는 도커를 이용해 kafka를 실행시켰습니다. 활용한 docker-compose.yml파일의 내용은 다음과 같습니다.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose up -d
명령어로 compose파일을 실행시킨 뒤 토픽을 생성했습니다.
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t1
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t2
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t3
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t4
간단하게 하나의 토픽에 객체를 전달하는 것부터 해보겠습니다
@Configuration
public class ProducerConfigBasic {
@Bean
public ProducerFactory<String, Dto> producerFactory() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String,Dto> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerBasic {
private final KafkaTemplate<String, Dto> kafkaTemplate1;
public void produce(){
log.info("Basic Config Kafka Producer Starts");
kafkaTemplate1.send("t1",new Dto("topic1",1L));
}
}
@Configuration
public class ConsumerConfigBasic {
@Bean
public ConsumerFactory<String, Dto> consumerFactory() {
Map<String,Object> config = new HashMap<>();
JsonDeserializer<Dto> deserializer = new JsonDeserializer<>(Dto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Dto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Dto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Slf4j
@Component
public class ConsumerBasic {
@KafkaListener(topics = "t1",groupId = "group-1",containerFactory = "kafkaListenerContainerFactory")
public void listener(Dto dto){
log.info("Listener for topic t1 - Basic. Consumed Data is {}",dto);
}
}
이번에는 두 개의 토픽으로 서로 다른 객체를 주고받아 보겠습니다. 하나의 토픽을 이용한 예시를 보면 KafkaTemplate은 특정 타입
의 Value를 가지고 있습니다. 그렇기 때문에 기존의 KafkaTemplate만으로는 두 개의 토픽에 서로 다른 객체를 넣을 수 없습니다.
해결 방법은 간단합니다. producer와 consumer모두 각각의 타입을 받을 수 있는 factory를 하나 더
만들어주면 됩니다!
@Configuration
public class ProducerConfigWithIndividual {
@Bean
public ProducerFactory<String, Dto> producerFactory1() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public ProducerFactory<String, Dto2> producerFactory2() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String,Dto> kafkaTemplate1() {
return new KafkaTemplate(producerFactory1());
}
@Bean
public KafkaTemplate<String,Dto2> kafkaTemplate2() {
return new KafkaTemplate(producerFactory2());
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerWithIndividual {
private final KafkaTemplate<String, Dto> kafkaTemplate1;
private final KafkaTemplate<String, Dto2> kafkaTemplate2;
public void produce(){
log.info("Individual Config Kafka Producer Starts");
kafkaTemplate1.send("t1",new Dto("topic1",1L));
kafkaTemplate2.send("t2",new Dto2("topic2",2L));
}
}
@Configuration
public class ConsumerConfigWithIndividual {
@Bean
public ConsumerFactory<String, Dto> consumerFactory1() {
Map<String,Object> config = new HashMap<>();
JsonDeserializer<Dto> deserializer = new JsonDeserializer<>(Dto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConsumerFactory<String, Dto2> consumerFactory2() {
Map<String,Object> config = new HashMap<>();
JsonDeserializer<Dto2> deserializer = new JsonDeserializer<>(Dto2.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_2");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Dto> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, Dto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Dto2> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, Dto2> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
}
@Slf4j
@Component
public class ConsumerWithIndividual {
@KafkaListener(topics = "t1",containerFactory = "kafkaListenerContainerFactory1")
public void listener(Dto dto){
log.info("Listener for topic t1 - WithIndividual. Consumed Data is {}",dto);
}
@KafkaListener(topics = "t2",containerFactory = "kafkaListenerContainerFactory2")
public void listener2(Dto2 dto){
log.info("Listener for topic t2 - WithIndividual. Consumed Data is {}",dto);
}
}
거의 동일한 코드를 하나 더 추가함으로써 문제를 해결할 수 있습니다.
하나의 factory에서 Object 타입
으로 값을 선언해 두 개의 토픽에 서로 다른 객체를 넣는 것도 가능합니다.
@Configuration
public class ProducerConfigWithObject {
@Bean
public ProducerFactory<String,Object> producerFactory3() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String,Object> kafkaTemplate3() {
return new KafkaTemplate(producerFactory3());
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerWithObject {
private final KafkaTemplate<String,Object> kafkaTemplate3;
public void produce(){
log.info("Object Config Kafka Producer Starts");
kafkaTemplate3.send("t3",new Dto("topic3",3L));
kafkaTemplate3.send("t4",new Dto2("topic4",4L));
}
}
@Configuration
public class ConsumerConfigWithObject {
@Bean
public ConsumerFactory<String, Object> consumerFactory3() {
Map<String,Object> config = new HashMap<>();
JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_3");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory3() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory3());
return factory;
}
}
@Slf4j
@Component
public class ConsumerWithObject {
@KafkaListener(topics = "t3",containerFactory = "kafkaListenerContainerFactory3")
public void listener3(Object dto){
ConsumerRecord<String, Dto> result = (ConsumerRecord<String, Dto>) dto;
log.info("Listener for topic t3 - WithObject. Consumed Data is {}",result.value());
}
@KafkaListener(topics = "t4",containerFactory = "kafkaListenerContainerFactory3")
public void listener4(Object dto){
ConsumerRecord<String, Dto2> result = (ConsumerRecord<String, Dto2>) dto;
log.info("Listener for topic t4 - WithObject. Consumed Data is {}",result.value());
}
}