이 글은
Spring Kafka
사용하기를 다루고있습니다. Kafka 설치 및 실행에 관한 글은 Ubuntu에 Kafka 서버 배포 를 참고해주세요.
dependency
implementation 'org.springframework.kafka:spring-kafka'
producer server
application.yml
spring:
kafka:
producer:
bootstrap-servers: 3.34.172.38:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
kafkatemplate
// KafkaTemplate.java
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}
@Slf4j
@RequiredArgsConstructor
@Service
public class UserProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendCreateUserRequest(UserConsumerRequestDto userConsumerRequest) {
log.info("[Kafka producer] >> create user document");
kafkaTemplate.send(Topic.FEED_USER_CREATE, userConsumerRequest.getUserId());
kafkaTemplate.send(Topic.SEARCH_USER_CREATE, userConsumerRequest);
}
public void sendUpdateUserRequest(UserConsumerRequestDto userConsumerRequest) {
log.info("[Kafka producer] >> update user document");
kafkaTemplate.send(Topic.SEARCH_USER_UPDATE, userConsumerRequest);
}
public void sendUpdateListRequest(UpdateListRequestDto updateListRequest) {
log.info("[Kafka producer] >> update follow/subscribe list");
kafkaTemplate.send(Topic.FEED_USER_LIST_UPDATE, updateListRequest);
}
public void sendDeleteUserRequest(Long userId) {
log.info("[Kafka producer] >> delete user document");
kafkaTemplate.send(Topic.FEED_USER_DELETE, userId);
kafkaTemplate.send(Topic.SEARCH_USER_DELETE, userId);
}
}
consumer server
application.yml
spring:
kafka:
consumer:
bootstrap-servers: 3.34.172.38:9092
group-id: group-id-ncns
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json:
trusted:
packages: "*"
deserializer:
value:
delegate:
class: org.springframework.kafka.support.serializer.JsonDeserializer
@kafkalistener
@Slf4j
@RequiredArgsConstructor
@Service
public class FeedConsumerService {
private static final String GROUP_ID = "group-id-ncns";
private final FeedService feedService;
@KafkaListener(topics = Topic.FEED_USER_CREATE, groupId = GROUP_ID)
public void consumeUser(Long userId) {
log.info("[Kafka consumer] >> create user document");
feedService.createFeedDocument(userId);
}
@KafkaListener(topics = Topic.FEED_USER_DELETE, groupId = GROUP_ID)
public void consumeUserDelete(Long userId) {
log.info("[Kafka consumer] >> delete user document");
feedService.deleteFeedDocument(userId);
}
@KafkaListener(topics = Topic.FEED_USER_LIST_UPDATE, groupId = GROUP_ID)
public void consumeFollow(UpdateListRequestDto updateListRequestDto) {
log.info("[Kafka consumer] >> update follow/subscribe list");
feedService.updateList(updateListRequestDto);
}
}
❗️ERROR
spring:
kafka:
...
key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:
kafka:
consumer:
...
properties:
spring:
json:
trusted:
packages: "*"
spring:
kafka:
...
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
json.type.mapping 으로 aliad:dto 형식
kafka:
consumer:
properties:
spring:
json:
type:
mapping: list:dev.ncns.sns.feed.dto.request.UpdateListRequestDto,
like:dev.ncns.sns.feed.dto.response.LikeResponseDto
https://waspro.tistory.com/647
https://ryumodrn.tistory.com/13
https://kobumddaring.tistory.com/50
[Kafka] class is not in the trusted packages 에러
jsonserializer