[Spring] SpringBoot Kafka

나르·2022년 2월 17일
0

Spring

목록 보기
20/25
post-thumbnail

이 글은 Spring Kafka 사용하기를 다루고있습니다. Kafka 설치 및 실행에 관한 글은 Ubuntu에 Kafka 서버 배포 를 참고해주세요.

dependency

Spring-Kafka

implementation 'org.springframework.kafka:spring-kafka'

producer server

application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: 3.34.172.38:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

kafkatemplate

// KafkaTemplate.java
@Override
	public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
		ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
		return doSend(producerRecord);
	}
@Slf4j
@RequiredArgsConstructor
@Service
public class UserProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendCreateUserRequest(UserConsumerRequestDto userConsumerRequest) {
        log.info("[Kafka producer] >> create user document");
        kafkaTemplate.send(Topic.FEED_USER_CREATE, userConsumerRequest.getUserId());
        kafkaTemplate.send(Topic.SEARCH_USER_CREATE, userConsumerRequest);
    }

    public void sendUpdateUserRequest(UserConsumerRequestDto userConsumerRequest) {
        log.info("[Kafka producer] >> update user document");
        kafkaTemplate.send(Topic.SEARCH_USER_UPDATE, userConsumerRequest);
    }

    public void sendUpdateListRequest(UpdateListRequestDto updateListRequest) {
        log.info("[Kafka producer] >> update follow/subscribe list");
        kafkaTemplate.send(Topic.FEED_USER_LIST_UPDATE, updateListRequest);
    }

    public void sendDeleteUserRequest(Long userId) {
        log.info("[Kafka producer] >> delete user document");
        kafkaTemplate.send(Topic.FEED_USER_DELETE, userId);
        kafkaTemplate.send(Topic.SEARCH_USER_DELETE, userId);
    }

}

consumer server

application.yml

spring:
  kafka:
    consumer: 
      bootstrap-servers: 3.34.172.38:9092
      group-id: group-id-ncns
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: "*"
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer

@kafkalistener

@Slf4j
@RequiredArgsConstructor
@Service
public class FeedConsumerService {

    private static final String GROUP_ID = "group-id-ncns";

    private final FeedService feedService;

    @KafkaListener(topics = Topic.FEED_USER_CREATE, groupId = GROUP_ID)
    public void consumeUser(Long userId) {
        log.info("[Kafka consumer] >> create user document");
        feedService.createFeedDocument(userId);
    }

    @KafkaListener(topics = Topic.FEED_USER_DELETE, groupId = GROUP_ID)
    public void consumeUserDelete(Long userId) {
        log.info("[Kafka consumer] >> delete user document");
        feedService.deleteFeedDocument(userId);
    }

    @KafkaListener(topics = Topic.FEED_USER_LIST_UPDATE, groupId = GROUP_ID)
    public void consumeFollow(UpdateListRequestDto updateListRequestDto) {
        log.info("[Kafka consumer] >> update follow/subscribe list");
        feedService.updateList(updateListRequestDto);
    }

}

❗️ERROR

  • json -> org.springboot.kafka...jsonserializer 써야함
spring:
  kafka:
		...
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
  • class is not in the trusted packages 에러
spring:
  kafka:
    consumer: 
      ...
      properties:
        spring:
          json:
            trusted:
              packages: "*"
  • This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'
spring:
  kafka:
		...
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  • MSA 구조로 인해 각 서버의 dto 패키지가 상이함

json.type.mapping 으로 aliad:dto 형식

  kafka:
    consumer:
      properties:
        spring:
          json:
            type:
              mapping: list:dev.ncns.sns.feed.dto.request.UpdateListRequestDto,
              			like:dev.ncns.sns.feed.dto.response.LikeResponseDto

https://stackoverflow.com/questions/70252047/this-error-handler-cannot-process-serializationexceptions-directly-please-con

공식문서

kafka option

https://waspro.tistory.com/647

https://ryumodrn.tistory.com/13
https://kobumddaring.tistory.com/50

[Kafka] class is not in the trusted packages 에러
jsonserializer

config.java 하는법

profile
💻 + ☕ = </>

0개의 댓글