kafka 적용하기

초보개발·2022년 12월 20일
0

kafka를 적용하게 된 이유

일단, 새롭게 추가된 도메인이 하나 있다. Group이라는 도메인인데, 사용자(user)가 속하게되는 그룹을 말한다. 사용자는 초대를 받아 그룹에 속하거나 새롭게 그룹을 만들 수 있다.
여기서 만약, 회원이 우리 서비스를 탈퇴하게 되었을 경우, 속한 group에도 회원이 사라져야한다. 이전처럼 feign을 이용하여 http 통신으로 구현할 수도 있겠지만, 별도의 응답이 필요없고 변경된 사항이 빠르게 반영되어야 하므로 카프카를 사용하게 되었다.

구현 방법

회원가입과 탈퇴를 담당하는 마이크로서비스, join service에서 사용자의 탈퇴 요청을 받고, 다시 user service로 요청을 보내 user 테이블에서 데이터가 최종적으로 삭제된다. user service는 탈퇴한 user의 id로 메시지를 만들어 deleted-user 토픽에 발행하게 된다. group service는 해당 토픽을 구독하고 있는 상태이므로 들어온 메시지를 consume 하여 group 테이블에도 반영되도록 하였다.

  1. user service의 kafka producer configuration
    kafka를 사용할 수 있도록 @EnableKafka 어노테이션을 추가해준다. 카프카 서버는 개발용도로 local에 띄워둔 서버를 사용했고 key, value가 String 타입이므로 StringSerializer를 추가해주었다.
  @Configuration
  @EnableKafka
  public class KafkaProducerConfig {

      @Bean
      public ProducerFactory<String, String> producerFactory() {
          Map<String, Object> properties = new HashMap<>();
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

          return new DefaultKafkaProducerFactory<>(properties);
      }

      @Bean
      public KafkaTemplate<String, String> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory());
      }
  }
  1. user service의 kafka producer
    topic에 삭제처리된 userId를 담은 메시지를 보내도록 구현했다.
  @Service
  @Slf4j
  @RequiredArgsConstructor
  public class KafkaProducer {

      private final KafkaTemplate<String, String> kafkaTemplate;

      private final ObjectMapper mapper;

      private final List<Field> fields = Collections.singletonList(new Field(FieldType.STRING, true, "userId"));

      private final Schema schema = Schema.builder()
              .type("struct")
              .fields(fields)
              .optional(false)
              .name("users")
              .build();

      public Long send(String topic, Long userId) {
          Payload payload = Payload.builder()
                  .userId(userId)
                  .build();

          KafkaUserDto userDto = KafkaUserDto.builder()
                  .schema(schema)
                  .payload(payload)
                  .build();

          String data = null;
          try {
              data = mapper.writeValueAsString(userDto);
          } catch (JsonProcessingException e) {
              e.printStackTrace();
          }

          kafkaTemplate.send(topic, data);
          log.info("[Kafka Producer SENT data - User service] : " + userDto);

          return userId;
      }
	}

아래와 같이 메시지가 생성될 것이다.

  1. group service의 kafka consumer configuaration
  @Configuration
  @EnableKafka
  public class KafkaProducerConfig {

      @Bean
      public ProducerFactory<String, String> producerFactory() {
          Map<String, Object> properties = new HashMap<>();
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

          return new DefaultKafkaProducerFactory<>(properties);
      }

      @Bean
      public KafkaTemplate<String, String> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory());
      }
  }
  1. group service의 kafka consumer
  @Service
  @Slf4j
  @RequiredArgsConstructor
  public class KafkaConsumer {

      private final GroupRepository groupRepository;

      private final GroupUsersRepository groupUsersRepository;

      private final ObjectMapper objectMapper;

      @KafkaListener(topics = "deleted-user")
      @Transactional
      public void deleteUserInGroup(String kafkaMessage) throws JsonProcessingException {
          Map<Object, Object> map = new HashMap<>();
          JsonNode jsonNode = objectMapper.readTree(kafkaMessage);
          Long userId = jsonNode.get("payload").get("userId").longValue();
          GroupUsers groupUser = groupUsersRepository.findByUserId(userId);
          
          if (groupUser.getGroupEntity() != null) {
              groupUsersRepository.delete(groupUser);
              GroupEntity group = groupUser.getGroupEntity();
              
              if (groupUsersRepository.countByGroupEntity(group) ==  0) {
                  groupRepository.delete(group);
              }
          }
      }
  }

느낀점

예전에 한번 카프카가 어떤 것인지 배운적이 있었는데 실제 프로젝트에 적용해본 것은 이번이 처음이다! 카프카는 이 밖에도 여러 곳에서 쓰인다는데, 책 한권 사서 내부 동작과 현업에는 어떻게 사용되는지 깊이있게 배워보고 나중에 개발할 때에도 적절한 곳에 카프카를 잘 써보고 싶다.

0개의 댓글