[MSA] 리액티브 마이크로서비스 개발 - 2부

min.c00·2023년 4월 10일
0

MSA + Spring Boot

목록 보기
6/7
post-thumbnail

이책은 스프링으로 하는 마이크로서비스 구축(스프링 부트와 스프링 클라우드를 이용한 도커/쿠버네티스 마이크로서비스) 책을 읽고 학습한 내용을 정리한 글입니다.

✏️ [MSA] 리액티브 마이크로서비스 개발 - 1부 에서는 리액티브 프로그래밍과 관련된 개념에 대해 정리했습니다. 2부에서는 책에서 설명하고 있는 리액티브 마이크로서비스의 설계와 프로세스에 대해 정리해보려고합니다. 소스코드는 여기를 참고해주세요


1. 포스팅 개요


  • Products, Recommendations, Reviews를 등록(Create)하는 프로세스를 다룬다.
  • 전체 프로세스는 1번부터 ~N번까지 번호로 구분하여 프로세스 단계를 제가 정리한 기준에 따라 표시할 예정입니다.
  • 설명 중간에 주요 개념에 대해 설명할 수도 있습니다.
  • 해당 포스터는 교재의 1부 - 스프링 부트를 사용한 마이크로서비스 개발에 대한 내용까지만을 포함합니다 .
  • 해당 포스터에서 정리할 등록(Create)기능외에 조회, 삭제 기능은 비슷한 로직이기에 다루지 않았습니다.

2. 전체 프로젝트 설계 및 프로세스 정리


1. Request(Product, Recommendations, Reviews 등록 요청)


2. ProductCompositeSerivce

public interface ProductCompositeService 

@PostMapping(
	value    = "/product-composite",
    consumes = "application/json")
void createCompositeProduct(@RequestBody ProductAggregate body);

}
  • /product-composite url path로 들어온 요청을 ProductCompisteService의 createCompositeProduct 메서드에서 받는다.

3. ProductCompositeSerivceImpl

@RestController
public class ProductCompositeServiceImpl implements ProductCompositeService {

    private static final Logger LOG = LoggerFactory.getLogger(ProductCompositeServiceImpl.class);

    private final ServiceUtil serviceUtil;
    private final ProductCompositeIntegration integration;

    @Autowired
    public ProductCompositeServiceImpl(ServiceUtil serviceUtil, ProductCompositeIntegration integration) {
        this.serviceUtil = serviceUtil;
        this.integration = integration;
    }

    @Override
    public void createCompositeProduct(ProductAggregate body) {

        try {

            LOG.debug("createCompositeProduct: creates a new composite entity for productId: {}", body.getProductId());

            Product product = new Product(body.getProductId(), body.getName(), body.getWeight(), null);
            integration.createProduct(product);

            if (body.getRecommendations() != null) {
                body.getRecommendations().forEach(r -> {
                    Recommendation recommendation = new Recommendation(body.getProductId(), r.getRecommendationId(), r.getAuthor(), r.getRate(), r.getContent(), null);
                    integration.createRecommendation(recommendation);
                });
            }

            if (body.getReviews() != null) {
                body.getReviews().forEach(r -> {
                    Review review = new Review(body.getProductId(), r.getReviewId(), r.getAuthor(), r.getSubject(), r.getContent(), null);
                    integration.createReview(review);
                });
            }

            LOG.debug("createCompositeProduct: composite entities created for productId: {}", body.getProductId());

        } catch (RuntimeException re) {
            LOG.warn("createCompositeProduct failed: {}", re.toString());
            throw re;
        }
    }
}
  • ProductCompositeService를 구현한 클래스
  • DI
    • ServiceUtil -> http 요청 처리 서포트
    • ProductCompositeIntegration -> 핵심 서비스로 처리를 위임
  • Products, Recommendations, Reviews 를 등록하는 로직을 ProductCompositeIntegration로 위임

4. ProductCompositeIntegration

@EnableBinding(ProductCompositeIntegration.MessageSources.class)
@Component
public class ProductCompositeIntegration implements ProductService, RecommendationService, ReviewService {

    private static final Logger LOG = LoggerFactory.getLogger(ProductCompositeIntegration.class);

    private final WebClient webClient;
    private final ObjectMapper mapper;

    private final String productServiceUrl;
    private final String recommendationServiceUrl;
    private final String reviewServiceUrl;

    private MessageSources messageSources;

    public interface MessageSources {

        String OUTPUT_PRODUCTS = "output-products";
        String OUTPUT_RECOMMENDATIONS = "output-recommendations";
        String OUTPUT_REVIEWS = "output-reviews";

        @Output(OUTPUT_PRODUCTS)
        MessageChannel outputProducts();

        @Output(OUTPUT_RECOMMENDATIONS)
        MessageChannel outputRecommendations();

        @Output(OUTPUT_REVIEWS)
        MessageChannel outputReviews();
    }

    @Autowired
    public ProductCompositeIntegration(
        WebClient.Builder webClient,
        ObjectMapper mapper,
        MessageSources messageSources,

        @Value("${app.product-service.host}") String productServiceHost,
        @Value("${app.product-service.port}") int    productServicePort,

        @Value("${app.recommendation-service.host}") String recommendationServiceHost,
        @Value("${app.recommendation-service.port}") int    recommendationServicePort,

        @Value("${app.review-service.host}") String reviewServiceHost,
        @Value("${app.review-service.port}") int    reviewServicePort
    ) {

        this.webClient = webClient.build();
        this.mapper = mapper;
        this.messageSources = messageSources;

        productServiceUrl        = "http://" + productServiceHost + ":" + productServicePort;
        recommendationServiceUrl = "http://" + recommendationServiceHost + ":" + recommendationServicePort;
        reviewServiceUrl         = "http://" + reviewServiceHost + ":" + reviewServicePort;
    }

    @Override
    public Product createProduct(Product body) {
        messageSources.outputProducts().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
        return body;
    }
    
    @Override
    public Recommendation createRecommendation(Recommendation body) {
        messageSources.outputRecommendations().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
        return body;
    }
    
    @Override
    public Review createReview(Review body) {
        messageSources.outputReviews().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
        return body;
    }
}
  • WebClient(논블로킹 동기 REST API 요청 담당), MessageSource(이벤트 기반 비동기 서비스를 위한 MessageCannel 등록), 각 serviceURL(핵심 마이크로서비스 비즈니스 로직 요청 URL) DI.
  • Products, Recommendations, Reviews 엔티티 등록이벤티 기반 비동기 처리
  • messageSources 인스턴스는 MessageChannel과 @Output을 함께 사용해서 MessageChannel을 등록한다. 즉, 각 핵심 서비스별로 등록 요청이 오면 토픽에 메시지를 게시한다.
  • 그리고 각 핵심 서비는 게신된 메시지를 소비하여 데이터를 등록한다.
spring.cloud.stream:
  defaultBinder: rabbit
  default.contentType: application/json
  bindings:
    output-products:
      destination: products
      producer:
        required-groups: auditGroup
    output-recommendations:
      destination: recommendations
      producer:
        required-groups: auditGroup
    output-reviews:
      destination: reviews
      producer:
        required-groups: auditGroup
  • 위의 코드는 Spring Cloud Stream 프레임워크를 사용하여 메시지 기반 애플리케이션을 구성하는 설정 파일입니다.

    • spring.cloud.stream은 Spring Cloud Stream 프레임워크에서 사용되는 속성을 설정하는 데 사용됩니다. defaultBinder는 바인더(binder)의 이름을 설정하며, 여기서는 RabbitMQ를 기본 바인더로 사용하도록 구성되어 있습니다. default.contentType는 메시지를 직렬화하는 데 사용되는 기본 콘텐츠 유형을 설정합니다. 이 경우에는 JSON 형식의 메시지를 사용하도록 설정되어 있습니다.
    • bindings는 Spring Cloud Stream 프레임워크에서 메시지 소스(source) 및 메시지 sink를 정의하는 데 사용됩니다. output-products는 이 구성에서 정의된 메시지 sink의 이름입니다. destination은 해당 sink가 구독하는 RabbitMQ 큐의 이름을 지정합니다. 이 경우 products 큐를 대상으로 하도록 구성되어 있습니다.
    • 마지막으로, producer.required-groups는 메시지를 생성하는 데 사용되는 그룹 이름을 정의합니다. 이 그룹은 RabbitMQ의 exchange와 바인딩되어 있으며, exchange를 통해 메시지가 큐로 전달됩니다. 따라서 이 설정은 auditGroup 그룹에 속한 프로듀서가 메시지를 생성할 때, 해당 메시지가 auditGroup에 속한 exchange로 전달됨을 의미합니다.

💡 논블로킹(Non-Blocking) 동기 서비스이벤트 기반 비동기 처리 서비스의 개념과 효과적으로 사용하는 방법

  • 논블로킹 동기 서비스는 요청이 들어오면 해당 요청에 대한 응답을 반환할 때까지 블로킹하지 않습니다. 즉, 다른 요청을 처리할 수 있으며 요청 처리가 완료될 때까지 기다릴 필요가 없습니다. 이 방식은 단일 스레드에서 많은 요청을 처리할 수 있으므로 서버의 확장성을 향상시킬 수 있습니다. 또한, 이 방식은 동기화 오버헤드가 적으며 코드가 간결하고 이해하기 쉽습니다. 그러나, 하나의 요청이 처리되는 동안 다른 요청을 처리할 수 있도록 블로킹되지 않기 때문에, 해당 서비스에서 발생한 오류가 다른 요청에 영향을 미칠 수 있습니다.
  • 반면에, 이벤트 기반 비동기 서비스는 요청을 받은 후 해당 요청을 처리하는 이벤트를 등록하고, 이벤트 처리기가 이벤트를 처리할 때까지 기다립니다. 이 방식은 비동기적으로 동작하므로 서버는 요청 처리 완료를 기다리지 않고 다른 요청을 처리할 수 있습니다. 이러한 방식은 많은 양의 요청을 처리할 때 효율적입니다. 또한, 이 방식은 오류가 발생하더라도 해당 요청을 처리하는 동안 다른 요청을 처리할 수 있습니다. 그러나 이벤트 기반 비동기 서비스는 다수의 이벤트 및 콜백 함수가 필요하므로 코드가 복잡해질 수 있습니다. 또한, 이 방식은 처리된 결과가 언제 도착할지 예측하기 어렵기 때문에, 일부 상황에서는 처리된 결과를 기다리는 것이 불가피합니다.

📌 따라서, 논블로킹 동기 서비스는 간결하고 이해하기 쉬운 코드를 작성하고 서버 확장성을 향상시키는 데 유용합니다. 반면에, 이벤트 기반 비동기 서비스는 대규모 요청 처리와 오류 처리가 필요한 경우에 유용합니다.

  • 논블로킹 동기 서비스조회(Select) 작업에 특히 유용합니다. 블로킹 동기 방식은 일반적으로 서버에 대한 단일 연결(connection)에서 한 번에 하나의 요청을 처리하므로, 대용량의 조회 작업에서는 서버 성능이 떨어질 수 있습니다. 하지만 논블로킹 동기 서비스는 다중 연결(multi-connection)을 통해 다수의 요청을 동시에 처리할 수 있기 때문에 대용량의 조회 작업에 효율적입니다.
  • 반면에, 이벤트 기반 비동기 서비스등록이나 삭제 처리와 같은 이벤트 기반 작업에 유용합니다. 이러한 작업은 일반적으로 상태 변경을 기다리지 않아도 되는 비동기 작업이기 때문입니다. 이벤트 기반 비동기 서비스는 이러한 작업에 최적화되어 있으며, 서버가 대규모의 동시 요청을 처리하는 데 필요한 확장성을 제공합니다.

📌 따라서, 논블로킹 동기 서비스는 대용량의 조회 작업에, 이벤트 기반 비동기 서비스는 등록이나 삭제 처리와 같은 이벤트 기반 작업에 특히 유용합니다. 그러나 이는 일반적인 패턴일 뿐, 구체적인 상황에 따라 다르게 적용될 수도 있습니다.


5. MessageProcessor


@EnableBinding(Sink.class)
public class MessageProcessor {

    private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);

    private final ProductService productService;

    @Autowired
    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

        LOG.info("Process message created at {}...", event.getEventCreatedAt());

        switch (event.getEventType()) {

        case CREATE:
            Product product = event.getData();
            LOG.info("Create product with ID: {}", product.getProductId());
            productService.createProduct(product);
            break;

        case DELETE:
            int productId = event.getKey();
            LOG.info("Delete recommendations with ProductID: {}", productId);
            productService.deleteProduct(productId);
            break;

        default:
            String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
            LOG.warn(errorMessage);
            throw new EventProcessingException(errorMessage);
        }

        LOG.info("Message processing done!");
    }
}
  • @EnableBinding(Sink.class)은 토픽에 게시된 메시지를 소비하려면 메시지를 게시하고자 MessageChannel을 바인딩 했던 것 처럼 SubscribalChannel을 바인딩 해야한다.
  • 각 핵심 서비스의 메시지 프로세서(MessageProcessor)는 하나의 토픽만 수신하므로 Sink 인터페이스로 토픽을 바인딩한다.
  • @StreamListener(target=Sink.INPUT) 어노테이션이 붙은 process 메서드는 switch case로 등록/삭제 요청인지 구분한다. 등록 요청이 오면productService.createProduct(product)를 실행한다.
  • api 마이크로서비스로 등록을 요청한다.
spring.cloud.stream:
  defaultBinder: rabbit
  default.contentType: application/json
  bindings.input:
    destination: products
    group: productsGroup
  • 위의 코드로 토픽을 바인딩 한다. (products 토픽을 방인딩한다.)

6. ProductService


@RestController
public class ProductServiceImpl implements ProductService {

    private static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class);

    private final ServiceUtil serviceUtil;

    private final ProductRepository repository;

    private final ProductMapper mapper;

    @Autowired
    public ProductServiceImpl(ProductRepository repository, ProductMapper mapper, ServiceUtil serviceUtil) {
        this.repository = repository;
        this.mapper = mapper;
        this.serviceUtil = serviceUtil;
    }

    @Override
    public Product createProduct(Product body) {

        if (body.getProductId() < 1) throw new InvalidInputException("Invalid productId: " + body.getProductId());

        ProductEntity entity = mapper.apiToEntity(body);
        Mono<Product> newEntity = repository.save(entity)
            .log()
            .onErrorMap(
                DuplicateKeyException.class,
                ex -> new InvalidInputException("Duplicate key, Product Id: " + body.getProductId()))
            .map(e -> mapper.entityToApi(e));

        return newEntity.block();
    }
}
  • mongodb에 Products 등록(Recommendations, Reviews도 동일함)
  • Mono객체로 감싸는 이유는 비동기로 처리하기 때문.
spring.data.mongodb:
  host: localhost
  port: 27017
  database: product-db
  • application.yml 파일내에 mongodb 설정

여기까지 리액티브 마이크로서비스 개발 중 "등록" 프로세스의 전반적인 프로세스에 대해 정리했습니다. 설명이 부족했다면 댓글로 남겨주시면 감사하겠습니다. 추가로 전체 코드는 첨부 해놨으니 실습해보면 좋을것 같습니다.

0개의 댓글