kafka 사용해보기

이재철·2021년 10월 17일
0

MSA

목록 보기
9/13

Catalog-Service (Consumer)

  • pom.xml
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  • KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

  // 접속 정보
  @Bean
  public ConsumerFactory<String, String> consumerFactory(){
    Map<String, Object> properties = new HashMap<>();

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
    /*
      그룹 아이디는 카프카에서 토픽에 쌓여있는 메세지를 가져가는 Consumer 들을 그룹핑할 수 있음.
      나중에 여러개의 Consumer가 데이터를 가져갈 때 특정한 Consumer 그룹을 만들어 놓고 전달하고자 하는 그룹을 지정할 수 있음.
     */
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);


    return new DefaultKafkaConsumerFactory<>(properties);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
      = new ConcurrentKafkaListenerContainerFactory<>();

    kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

    return kafkaListenerContainerFactory;
  }

}
  • KafkaConsumer
// 리스너를 이용해서 데이터를 가져오고 그 데이터를 데이터베이스에 업데이트 할려 함
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
  private final CatalogRepository catalogRepository;

  @KafkaListener(topics = "example-catalog-topic")
  public void updateQty(String kafkaMessage) {
    log.info("Kafka Message : -> {}", kafkaMessage);

    Map<Object, Object> map = new HashMap<>();
    ObjectMapper mapper = new ObjectMapper();

    try {
      map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
    }catch (JsonProcessingException e){
      e.printStackTrace();;
    }
    CatalogEntity entity = catalogRepository.findByProductId((String)map.get("productId"));

    if(entity != null){
      entity.setStock(entity.getStock() - (Integer) map.get("qty"));
      catalogRepository.save(entity);
    }
  }
}

Order-Service(Producer)

  • pom.xml 동일
  • KafkaProducerConfig
@EnableKafka
@Configuration
public class KafkaProducerConfig {

  // 접속 정보
  @Bean
  public ProducerFactory<String, String> producerFactory(){
    Map<String, Object> properties = new HashMap<>();

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    return new DefaultKafkaProducerFactory<>(properties);
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
  }

}
  • KafkaProducer
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
  private final KafkaTemplate<String, String> kafkaTemplate;

  public OrderDto send(String topic, OrderDto orderDto) {
    ObjectMapper mapper = new ObjectMapper();
    String jsonInString = "";

    try {
      jsonInString = mapper.writeValueAsString(orderDto);
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }

    kafkaTemplate.send(topic, jsonInString);
    log.info("Kafka Producer sent data from Order Micro Service : {}", orderDto);

    return orderDto;
  }
}

Order-Service 가 두 개 이상일 경우

  • 각각의 오더 서비스가 한번씩 실행(라운드 로빈)을 한다. -> 데이터가 동기화 되지 않는다
  • Order Service에 요청된 주문 정보를 DB가 아니라 Kafka의 Topic으로 전송하고, Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB에 저장
  • Field
@Data
@AllArgsConstructor
public class Field {
  private String type;
  private boolean optional;
  private String field;
}
  • Schema
@Data
@Builder
public class Schema {
  private String type;
  private List<Field> fields;
  private boolean optional;
  private String name;
}
  • Payload
@Data
@Builder
public class Payload {
  private String order_id;
  private String user_id;
  private String product_id;
  private int qty;
  private int unit_price;
  private int total_price;
}
  • KafkaOrderDto
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
  private Schema schema;
  private Payload payload;
}
  • OrderProducer

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderProducer {
  private final KafkaTemplate<String, String> kafkaTemplate;

  List<Field> fields = Arrays .asList(
    new Field("string", true, "order_id"),
    new Field("string", true, "user_id"),
    new Field("string", true, "product_id"),
    new Field("int32", true, "qty"),
    new Field("int32", true, "unit_price"),
    new Field("int32", true, "total_price")
  );
  Schema schema = Schema.builder()
                        .type("struct")
                        .fields(fields)
                        .optional(false)
                        .name("orders")
                        .build();

  public OrderDto send(String topic, OrderDto orderDto) {
    Payload payload = Payload.builder()
                              .order_id(orderDto.getOrderId())
                              .user_id(orderDto.getUserId())
                              .product_id(orderDto.getProductId())
                              .qty(orderDto.getQty())
                              .unit_price(orderDto.getUnitPrice())
                              .total_price(orderDto.getTotalPrice())
                              .build();

    KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

    ObjectMapper mapper = new ObjectMapper();
    String jsonInString = "";

    try {
      jsonInString = mapper.writeValueAsString(kafkaOrderDto);
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }

    kafkaTemplate.send(topic, jsonInString);
    log.info("Order Producer sent data from Order Micro Service : {}", kafkaOrderDto);

    return orderDto;
  }
}

결론

  • Order-Service 하고 Catalog-Service는 각각의 데이터를 사용하면서 수량 부분에 대해서 동기화시켜 주는 부분을 Kafka Topic을 사용
  • Order-Service가 두개 이상일 때 디비가 서로 달라서 데이터가 동기화 되지 않는다.
    그래서 주문이 들어오면 카프카로 메세지를 보내고 카프카에 저장된(토픽에 저장되어 있는) 값을
    단일 데이터베이스로 읽어 오기 위해서싱크 커낵트를 연결 해서 데이터를 동기화 했음
  • 응용해서 확장하고 싶으면
    • 데이터베이스에 저장되는 메시지 큐잉 서버를 이벤트 소싱이라고 해서 데이터가 전달되는 부분을 저장하는 파트와 저장된 데이터를 읽어오는 파트를 두개로 분리해서 만드는 cqrs라는 패턴을 이용하면 좀더 효율적으로 메시지징 기반의 시스템을 이용하고 시간 순서에 의해 메시지가 기록된 것을 데이터베이스에 업데이트구현할 수 있음

0개의 댓글