Dangil project(19)

Junyoung·2024년 5월 27일

Dangil Project

목록 보기
19/20

프로젝트에 Kafka를 도입하기 이전에 먼저 Spring Server 2개를 가지고 테스트를 해보려 한다.

가장 먼저 EC2 환경에 쉽게 이식할수 있도록 로컬에서도 도커를 활용해 띄우려고 한다.

services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - '9092:9092'
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

해당 docker-compose를 통해서 도커를 활용해 kafka를 띄워놓았다.

이후 Spring Server를 로컬에서 가동시킨다.

  1. gradle 의존성 추가 (server 1, 2)
implementation 'org.springframework.kafka:spring-kafka'

최신 버전으로 spring에 의존성을 추가해줬는지

  1. application.yml 설정 추가 (server 1, 2)
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

프로듀서와 컨슈머의 데이터 변환 설정을 추가했다.
serialization
deserializer

  1. Kafka 프로듀서 구현 (server 1)
package org.example.server1;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class Producer {
    private static final String TOPIC = "test";

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        log.info("프로듀서가 메세지 보냅니다.");
        this.kafkaTemplate.send(TOPIC, message);
    }
}

프로듀서의 설정으로 어떤 토픽에 연결할지 지정한다.

  1. Kafka 컨슈머 구현 (server 2)
package org.example.server2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class Consumer {

    @KafkaListener(topics = "test", groupId = "junGroup")
    public void listen(String message) {
        log.info("컨슈머가 메세지 poll 합니다.");
        log.info("junGroup 이 받은 message: " + message);
    }
}

컨슈머는 해당 토픽의 이름과 데이터를 전달받는 그룹의 이름을 지정한다.

  1. API 만들어서 카프카 통신해보기 (server 1)
package org.example.server1;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
@Slf4j
public class Controller {
    private final Producer producer;
    @PostMapping("kafka")
    public String testApi(@RequestBody String message) {
        log.info("API 수신 했습니다.");
        producer.sendMessage("Jun 님이 message 보냅니다");
        return "api 통신 성공";
    }
}
  1. 포스트맨으로 server 1 에 요청 보내고 연쇄 작용으로 프로듀서 메세지 통신

    server1 에서 api 수신하고 프로듀서가 메세지를 보냈다.

    server 2에서 해당 메세지를 정상적으로 수신했다.

이처럼 카프카를 사용해 메세지를 주고 받았다.


실제 서비스에서는 Json 형식으로 데이터를 주고 받아야 하기 때문에, Json 타입 처리를 진행해보려고 한다.

1. 컨슈머 Deserializer Json 형식

카프카 객체 Json 통신

카프카에서 이동시에 Json으로 값을 주고받기 위해서는

yml의 value 직렬화 부분에 Json 직렬화를 명시해야한다.

프로듀서

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

컨슈머

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*'

컨슈머의 경우에는

JsonDeserializer 는 기본적으로 모든 Java 패키지의 클래스 로딩을 허용하지 않는다

악의적인 공격을 방지하기 위함이다 !

따라서

properties:
spring.json.trusted.packages: '*'

or

spring.json.trusted.packages: 'org.example.domain’

직접적인 클래스 명시가 가능하다 !

처리가 필요하다 !

이렇게 사용하면 만드는 카프카 프로듀서가 모두 같은 설정으로 만들어지기에 클래스에서 직접 명시해줄수도 있다.

하지만 여기에선 그냥 이렇게 사용해보기로 한다 !

→ 따라서 해당 설정을 통해서 Json 연결을 해놓았다.

또한 이전처럼 Yml으로 전역의 카프카 설정을 해놓으면 추후에 더 많은 프로듀서 and 컨슈머가 생길시에 사용할수가 없다는 문제점으로 -> 객체에 직접 카프카 프로듀서 컨슈머 설정을 진행할려고 한다.

Producer 와 Consumer 클래스를 제작한다

package org.example.server1;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.server}")
    private String KafkaServerIp;
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Broker 서버 설정
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
        // Key & Value 직렬화 설정
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
package org.example.server1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    @Value("${kafka.server}")
    private String KafkaServerIp;

    @Value("${kafka.group}")
    private String KafkaSpringGroup;
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Broker 서버 설정
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
        // consumer 그룹 설정
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
        // Key & Value 직렬화 설정
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

받을떄 객체를 지정

package org.example.server2;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    String KafkaServerIp = "127.0.0.1:9092";

    String KafkaSpringGroup = "junGroup";;
    @Bean
    public ConsumerFactory<String, KafkaDto> consumerFactory() {
        JsonDeserializer<KafkaDto> deserializer = new JsonDeserializer<>(KafkaDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        Map<String, Object> config = new HashMap<>();
        // Broker 서버 설정
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
        // consumer 그룹 설정
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
        // Key & Value 직렬화 설정
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

아예 받을때 객체로 변환해서 받을수 있으나
이런 경우에는 해당 컨슈머는 그냥 해당 객체 전용으로만 사용해야한다 !

따라서 송신측에선 Json 으로 변환해서 보내구

수신측에선 String으로 받아서 객체로 변환하는것이 더 좋을거 같다는 판단이 들었다 !

우선 결과적으로 -> 해당 방법으로 수신하니

정상적으로 수신이 되는것을 확인할수 있다


2. 컨슈머 Deserializer String 형식 -> ObjectMapper

server:
  port: 8080

spring:
  application:
    name: server1
package org.example.server1;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class KafkaDto {
  private String sender;
  private String title;
  private String content;
}

config

package org.example.server1;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    String KafkaServerIp = "127.0.0.1:9092";
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Broker 서버 설정
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
        // Key & Value 직렬화 설정
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
package org.example.server1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    String KafkaServerIp = "127.0.0.1:9093";

    String KafkaSpringGroup = "junGroup";
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Broker 서버 설정
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
        // consumer 그룹 설정
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
        // Key & Value 직렬화 설정
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

producer

package org.example.server1;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class Producer {

//    private static final String TOPIC1 = "test";
//    private final KafkaTemplate<String, String> kafkaTemplate1;
//
//    public void sendMessage(String message) {
//        log.info("프로듀서가 메세지 보냅니다.");
//        this.kafkaTemplate1.send(TOPIC1, message);
//    }

    private static final String TOPIC2 = "json";
    private final KafkaTemplate<String, Object> kafkaTemplate2;

    public  void sendObject(KafkaDto kafkaDto) {
        log.info("프로듀서가 객체 보냅니다.");
        this.kafkaTemplate2.send(TOPIC2,  kafkaDto);
    }
}

consumer

package org.example.server2;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class Consumer {

//    @KafkaListener(topics = "test", groupId = "junGroup")
//    public void listen1(String message) {
//        log.info("컨슈머가 메세지 poll 합니다.");
//        log.info("junGroup 이 받은 message: " + message);
//    }

    @KafkaListener(topics = "json", groupId = "objectGroup")
    public void listen2(String str) {
        log.info("컨슈머가 객체 poll 합니다.");
        log.info("junGroup 이 받은 객체: " + str);

        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        KafkaDto kafkaDto;
        try {
            kafkaDto = mapper.readValue(str, KafkaDto.class);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        log.info(kafkaDto.toString());
    }
}

성공적으로 데이터를 전달받았다.

이처럼 String 형식으로 받는다면 컨슈머 입장에서 자신이 원하는 형식의 DTO로 변환해서 사용할수 있고 하나의 토픽에 프로듀서가 같은 형식의 데이터를 준다는 보장이 없기에 ->
String 형식이 더 적합하다고 생각한다.


  • 트러블 슈팅

    Json 으로 보내고

Json 으로 받을때 에러가 발생했다

직렬화 오류라고 한다

  1. 송신측 헤더에서 받는 패키지를 허용을 지정해줬고,
  2. 수신측에서 DTO를 제작하여 받을때 클래스를 명시해서 받았고,
    이 에러를 해결할수 있었다.

Kafka-(de)Serialize 에러, ErrorHandlingDeserializer

ErrorHandlingDeserializer 설정 | 폭간의 기술블로그


  • 참고

카프카 기본 설정

[여러개의 카프카가 존재할때 구성하는 설정] Spring Kafka 구성(Broker, Producer, Consumer)

@KafkaListener

[리스너 여러가지 설정법] Consume Messages

profile
라곰

0개의 댓글