하나의 카프카에 존재하는 두 토픽에 각각 다른 타입의 객체 전송하기

최창효·2023년 10월 7일
0
post-thumbnail
post-custom-banner

들어가기 전에

해당 글에서는 하나의 카프카에 존재하는 두 토픽에 각각 다른 타입의 객체 전송하는 방법을 빠르게 살펴봅니다. 카프카의 개념이나 작성된 코드의 의미에 대해 설명하지 않으며, 일부 세팅이 실제 운영환경에는 적합하지 않을 수 있습니다. 예제는 Spring환경에서 KafkaTemplate을 활용해 만들었습니다.

환경 설정

  • 주키퍼와 카프카를 실행한 뒤 해당 브로커에 4개의 토픽을 만들어야 합니다.

저는 도커를 이용해 kafka를 실행시켰습니다. 활용한 docker-compose.yml파일의 내용은 다음과 같습니다.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

docker-compose up -d명령어로 compose파일을 실행시킨 뒤 토픽을 생성했습니다.

docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t1
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t2
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t3
docker exec -i kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t4

하나의 토픽으로 객체 전달하기

간단하게 하나의 토픽에 객체를 전달하는 것부터 해보겠습니다

Producer

config파일

@Configuration
public class ProducerConfigBasic {
    @Bean
    public ProducerFactory<String, Dto> producerFactory() {
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String,Dto> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}
  • 객체를 데이터로 전달할 때는 JsonSerializer로 인코딩 합니다.

producerService

@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerBasic {
    private final KafkaTemplate<String, Dto> kafkaTemplate1;

    public void produce(){
        log.info("Basic Config Kafka Producer Starts");
        kafkaTemplate1.send("t1",new Dto("topic1",1L));
    }

}

Consumer

config파일

@Configuration
public class ConsumerConfigBasic {
    @Bean
    public ConsumerFactory<String, Dto> consumerFactory() {
        Map<String,Object> config = new HashMap<>();

        JsonDeserializer<Dto> deserializer = new JsonDeserializer<>(Dto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Dto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Dto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

consumerService

@Slf4j
@Component
public class ConsumerBasic {
    @KafkaListener(topics = "t1",groupId = "group-1",containerFactory = "kafkaListenerContainerFactory")
    public void listener(Dto dto){
        log.info("Listener for topic t1 - Basic. Consumed Data is {}",dto);
    }
}

두 개의 토픽에 서로 다른 객체를 넣기

이번에는 두 개의 토픽으로 서로 다른 객체를 주고받아 보겠습니다. 하나의 토픽을 이용한 예시를 보면 KafkaTemplate은 특정 타입의 Value를 가지고 있습니다. 그렇기 때문에 기존의 KafkaTemplate만으로는 두 개의 토픽에 서로 다른 객체를 넣을 수 없습니다.

해결 방법은 간단합니다. producer와 consumer모두 각각의 타입을 받을 수 있는 factory를 하나 더 만들어주면 됩니다!

Producer

config파일

@Configuration
public class ProducerConfigWithIndividual {
    @Bean
    public ProducerFactory<String, Dto> producerFactory1() {
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }
    
    @Bean
    public ProducerFactory<String, Dto2> producerFactory2() {
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String,Dto> kafkaTemplate1() {
        return new KafkaTemplate(producerFactory1());
    }

    @Bean
    public KafkaTemplate<String,Dto2> kafkaTemplate2() {
        return new KafkaTemplate(producerFactory2());
    }

}

producerService

@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerWithIndividual {
    private final KafkaTemplate<String, Dto> kafkaTemplate1;
    private final KafkaTemplate<String, Dto2> kafkaTemplate2;

    public void produce(){
        log.info("Individual Config Kafka Producer Starts");
        kafkaTemplate1.send("t1",new Dto("topic1",1L));
        kafkaTemplate2.send("t2",new Dto2("topic2",2L));
    }

}

Consumer

config파일

@Configuration
public class ConsumerConfigWithIndividual {

    @Bean
    public ConsumerFactory<String, Dto> consumerFactory1() {
        Map<String,Object> config = new HashMap<>();

        JsonDeserializer<Dto> deserializer = new JsonDeserializer<>(Dto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }
    
    @Bean
    public ConsumerFactory<String, Dto2> consumerFactory2() {
        Map<String,Object> config = new HashMap<>();

        JsonDeserializer<Dto2> deserializer = new JsonDeserializer<>(Dto2.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_2");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Dto> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, Dto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory1());
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Dto2> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, Dto2> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory2());
        return factory;
    }

}

consumerService

@Slf4j
@Component
public class ConsumerWithIndividual {

    @KafkaListener(topics = "t1",containerFactory = "kafkaListenerContainerFactory1")
    public void listener(Dto dto){
        log.info("Listener for topic t1 - WithIndividual. Consumed Data is {}",dto);
    }

    @KafkaListener(topics = "t2",containerFactory = "kafkaListenerContainerFactory2")
    public void listener2(Dto2 dto){
        log.info("Listener for topic t2 - WithIndividual. Consumed Data is {}",dto);
    }

}

거의 동일한 코드를 하나 더 추가함으로써 문제를 해결할 수 있습니다.


Object형태로 값 넘기기

하나의 factory에서 Object 타입으로 값을 선언해 두 개의 토픽에 서로 다른 객체를 넣는 것도 가능합니다.

Producer

config파일

@Configuration
public class ProducerConfigWithObject {
    @Bean
    public ProducerFactory<String,Object> producerFactory3() {
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate3() {
        return new KafkaTemplate(producerFactory3());
    }

}

producerService

@Slf4j
@Component
@RequiredArgsConstructor
public class ProducerWithObject {
    private final KafkaTemplate<String,Object> kafkaTemplate3;

    public void produce(){
        log.info("Object Config Kafka Producer Starts");
        kafkaTemplate3.send("t3",new Dto("topic3",3L));
        kafkaTemplate3.send("t4",new Dto2("topic4",4L));
    }

}

Consumer

config파일

@Configuration
public class ConsumerConfigWithObject {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory3() {
        Map<String,Object> config = new HashMap<>();

        JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_3");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory3() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory3());
        return factory;
    }
}

consumerService

@Slf4j
@Component
public class ConsumerWithObject {

    @KafkaListener(topics = "t3",containerFactory = "kafkaListenerContainerFactory3")
    public void listener3(Object dto){
        ConsumerRecord<String, Dto> result = (ConsumerRecord<String, Dto>) dto;
        log.info("Listener for topic t3 - WithObject. Consumed Data is {}",result.value());
    }

    @KafkaListener(topics = "t4",containerFactory = "kafkaListenerContainerFactory3")
    public void listener4(Object dto){
        ConsumerRecord<String, Dto2> result = (ConsumerRecord<String, Dto2>) dto;
        log.info("Listener for topic t4 - WithObject. Consumed Data is {}",result.value());
    }

}

References

profile
기록하고 정리하는 걸 좋아하는 개발자.
post-custom-banner

0개의 댓글