이책은 스프링으로 하는 마이크로서비스 구축(스프링 부트와 스프링 클라우드를 이용한 도커/쿠버네티스 마이크로서비스) 책을 읽고 학습한 내용을 정리한 글입니다.
✏️ [MSA] 리액티브 마이크로서비스 개발 - 1부 에서는 리액티브 프로그래밍과 관련된 개념에 대해 정리했습니다. 2부에서는 책에서 설명하고 있는 리액티브 마이크로서비스의 설계와 프로세스에 대해 정리해보려고합니다. 소스코드는 여기를 참고해주세요
public interface ProductCompositeService
@PostMapping(
value = "/product-composite",
consumes = "application/json")
void createCompositeProduct(@RequestBody ProductAggregate body);
}
@RestController
public class ProductCompositeServiceImpl implements ProductCompositeService {
private static final Logger LOG = LoggerFactory.getLogger(ProductCompositeServiceImpl.class);
private final ServiceUtil serviceUtil;
private final ProductCompositeIntegration integration;
@Autowired
public ProductCompositeServiceImpl(ServiceUtil serviceUtil, ProductCompositeIntegration integration) {
this.serviceUtil = serviceUtil;
this.integration = integration;
}
@Override
public void createCompositeProduct(ProductAggregate body) {
try {
LOG.debug("createCompositeProduct: creates a new composite entity for productId: {}", body.getProductId());
Product product = new Product(body.getProductId(), body.getName(), body.getWeight(), null);
integration.createProduct(product);
if (body.getRecommendations() != null) {
body.getRecommendations().forEach(r -> {
Recommendation recommendation = new Recommendation(body.getProductId(), r.getRecommendationId(), r.getAuthor(), r.getRate(), r.getContent(), null);
integration.createRecommendation(recommendation);
});
}
if (body.getReviews() != null) {
body.getReviews().forEach(r -> {
Review review = new Review(body.getProductId(), r.getReviewId(), r.getAuthor(), r.getSubject(), r.getContent(), null);
integration.createReview(review);
});
}
LOG.debug("createCompositeProduct: composite entities created for productId: {}", body.getProductId());
} catch (RuntimeException re) {
LOG.warn("createCompositeProduct failed: {}", re.toString());
throw re;
}
}
}
@EnableBinding(ProductCompositeIntegration.MessageSources.class)
@Component
public class ProductCompositeIntegration implements ProductService, RecommendationService, ReviewService {
private static final Logger LOG = LoggerFactory.getLogger(ProductCompositeIntegration.class);
private final WebClient webClient;
private final ObjectMapper mapper;
private final String productServiceUrl;
private final String recommendationServiceUrl;
private final String reviewServiceUrl;
private MessageSources messageSources;
public interface MessageSources {
String OUTPUT_PRODUCTS = "output-products";
String OUTPUT_RECOMMENDATIONS = "output-recommendations";
String OUTPUT_REVIEWS = "output-reviews";
@Output(OUTPUT_PRODUCTS)
MessageChannel outputProducts();
@Output(OUTPUT_RECOMMENDATIONS)
MessageChannel outputRecommendations();
@Output(OUTPUT_REVIEWS)
MessageChannel outputReviews();
}
@Autowired
public ProductCompositeIntegration(
WebClient.Builder webClient,
ObjectMapper mapper,
MessageSources messageSources,
@Value("${app.product-service.host}") String productServiceHost,
@Value("${app.product-service.port}") int productServicePort,
@Value("${app.recommendation-service.host}") String recommendationServiceHost,
@Value("${app.recommendation-service.port}") int recommendationServicePort,
@Value("${app.review-service.host}") String reviewServiceHost,
@Value("${app.review-service.port}") int reviewServicePort
) {
this.webClient = webClient.build();
this.mapper = mapper;
this.messageSources = messageSources;
productServiceUrl = "http://" + productServiceHost + ":" + productServicePort;
recommendationServiceUrl = "http://" + recommendationServiceHost + ":" + recommendationServicePort;
reviewServiceUrl = "http://" + reviewServiceHost + ":" + reviewServicePort;
}
@Override
public Product createProduct(Product body) {
messageSources.outputProducts().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
return body;
}
@Override
public Recommendation createRecommendation(Recommendation body) {
messageSources.outputRecommendations().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
return body;
}
@Override
public Review createReview(Review body) {
messageSources.outputReviews().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
return body;
}
}
spring.cloud.stream:
defaultBinder: rabbit
default.contentType: application/json
bindings:
output-products:
destination: products
producer:
required-groups: auditGroup
output-recommendations:
destination: recommendations
producer:
required-groups: auditGroup
output-reviews:
destination: reviews
producer:
required-groups: auditGroup
위의 코드는 Spring Cloud Stream 프레임워크를 사용하여 메시지 기반 애플리케이션을 구성하는 설정 파일입니다.
💡 논블로킹(Non-Blocking) 동기 서비스와 이벤트 기반 비동기 처리 서비스의 개념과 효과적으로 사용하는 방법
📌 따라서, 논블로킹 동기 서비스는 간결하고 이해하기 쉬운 코드를 작성하고 서버 확장성을 향상시키는 데 유용합니다. 반면에, 이벤트 기반 비동기 서비스는 대규모 요청 처리와 오류 처리가 필요한 경우에 유용합니다.
📌 따라서, 논블로킹 동기 서비스는 대용량의 조회 작업에, 이벤트 기반 비동기 서비스는 등록이나 삭제 처리와 같은 이벤트 기반 작업에 특히 유용합니다. 그러나 이는 일반적인 패턴일 뿐, 구체적인 상황에 따라 다르게 적용될 수도 있습니다.
@EnableBinding(Sink.class)
public class MessageProcessor {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
private final ProductService productService;
@Autowired
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
LOG.info("Process message created at {}...", event.getEventCreatedAt());
switch (event.getEventType()) {
case CREATE:
Product product = event.getData();
LOG.info("Create product with ID: {}", product.getProductId());
productService.createProduct(product);
break;
case DELETE:
int productId = event.getKey();
LOG.info("Delete recommendations with ProductID: {}", productId);
productService.deleteProduct(productId);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
LOG.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
LOG.info("Message processing done!");
}
}
spring.cloud.stream:
defaultBinder: rabbit
default.contentType: application/json
bindings.input:
destination: products
group: productsGroup
@RestController
public class ProductServiceImpl implements ProductService {
private static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class);
private final ServiceUtil serviceUtil;
private final ProductRepository repository;
private final ProductMapper mapper;
@Autowired
public ProductServiceImpl(ProductRepository repository, ProductMapper mapper, ServiceUtil serviceUtil) {
this.repository = repository;
this.mapper = mapper;
this.serviceUtil = serviceUtil;
}
@Override
public Product createProduct(Product body) {
if (body.getProductId() < 1) throw new InvalidInputException("Invalid productId: " + body.getProductId());
ProductEntity entity = mapper.apiToEntity(body);
Mono<Product> newEntity = repository.save(entity)
.log()
.onErrorMap(
DuplicateKeyException.class,
ex -> new InvalidInputException("Duplicate key, Product Id: " + body.getProductId()))
.map(e -> mapper.entityToApi(e));
return newEntity.block();
}
}
spring.data.mongodb:
host: localhost
port: 27017
database: product-db
여기까지 리액티브 마이크로서비스 개발 중 "등록" 프로세스의 전반적인 프로세스에 대해 정리했습니다. 설명이 부족했다면 댓글로 남겨주시면 감사하겠습니다. 추가로 전체 코드는 첨부 해놨으니 실습해보면 좋을것 같습니다.