외부 서비스 요청 결과에 따라 리뷰 업로드 요청 결과가 달라지는 강결합 구조를 개선해보자.

sckwon770·2023년 11월 2일
1

스프링 부트

목록 보기
10/10
post-thumbnail

동기

리뷰는 본 프로젝트의 핵심으로서, 최대한 많은 리뷰를 입력받고 이를 분석해 가치있는 정보를 제공하고 있습니다. 이러한 리뷰의 업로드는 다음과 같은 과정을 통해 진행됩니다.

AWS S3를 통한 사진 업로드와 AI 리뷰 분석 모델을 통한 리뷰 분석 요청이라는 2개의 외부 서비스를 거친 후 ㄹㅣ뷰 데이터가 DB에 저장됩니다. 즉, 동기로 작동하고 하나의 트랜잭션으로 묶이기 때문에, 클라이언트가 리뷰를 업로드할 때 세 가지 과정이 모두 완료되는 긴 시간을 기다려야 하고 외부 서비스 중 하나라도 실패하면 리뷰 업로드가 실패하는 강결합 구조이다.


개선

따라서 트랜잭션을 분리해 안정성을 확보하고, 필요한 작업이 완료되면 클라이언트에게 반환하고 비동기로 나머지 작업을 진행하도록 변경해야 한다.


ThreadPoolTaskExecutor

ThreadPoolTaskExecutor 설정

@EnableAsync
@Configuration
public class AsyncConfig {

    private static final int EXECUTOR_CORE_POOL_SIZE = 50;
    private static final int EXECUTOR_QUEUE_CAPACITY = 5000;
    private static final int EXECUTOR_MAX_POOL_SIZE = Integer.MAX_VALUE;
    private static final String EXECUTOR_THREAD_NAME_PREFIX = "async-task-executor-";

    @Bean
    public Executor asyncTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(EXECUTOR_CORE_POOL_SIZE);
        executor.setQueueCapacity(EXECUTOR_QUEUE_CAPACITY);
        executor.setMaxPoolSize(EXECUTOR_MAX_POOL_SIZE);
        executor.setThreadNamePrefix(EXECUTOR_THREAD_NAME_PREFIX);
        executor.initialize();
        return executor;
    }
}
  • @EnableAsync : 비동기 처리 기능을 활성화한다
  • CorePoolSize : 동시에 실행시킬 쓰레드의 개수 (default: 1)
  • MaxPoolSize : 쓰레드 풀의 최대 사이드 (default: Integer.MAX_VALUE)
  • QueueCapacity : 큐의 사이즈 (default: Integer.MAX_VALUE)

ThreadPoolTaskExecutor의 기본적인 동작원리

  • CorePoolSize만큼 이미 쓰레드가 점유되고 있으면, 넘치는 쓰레드 요청은 큐에 넣는다.
  • 쓰레드가 모두 점유되어 있고 QueueCapacity만큼 쓰레드가 찼으면, 넘치는 쓰레드 요청을 MaxPoolSize만큼 쓰레드 풀에 저장한다.
  • 쓰레드 풀까지 가득찬다면, java.util.concurrent.RejectedExecutionException 예외가 발생된다.

적용

비동기 처리할 코드를 Runnable로 선언한 후 Executor.execute(runnable)를 사용하는 것이 정석이겠지만, 스프링의 특징인 어노테이션을 활용한다면 굉장히 쉽고 코드가 깔끔해집니다. 단지, 비동기 처리를 할 함수에 @Async를 붙이고 실행하면 된다. 만약 비동기 처리된 메소드의 반환값을 받고 싶으면, Future, CompleteFuture, ListenableFuture을 사용하고 반환값 없이 실행만 하고 싶다면 void를 사용하면 된다.

public class ReviewService {
	~~
    
	@Transactional
	public Long create(String partnerDomain, String reservationPartnerCustomId, ReviewCreateRequest reviewCreateRequest, List<MultipartFile> reviewImageFiles) {
        // Get reservation
        final Reservation reservation = reservationService.findByPartnerDomainAndPartnerCustomId(partnerDomain, reservationPartnerCustomId);
        validateAlreadyReviewedByReservationId(reservation.getId());

        // Create review
        Review review = reviewCreateRequest.toEntity(reservation);
        reviewRepository.save(review);
        reservation.getTravelProduct().addReviewInfo(review.getRating());

        // Create review images
        if (reviewImageFiles != null) {
            reviewImageService.createAll(reviewImageFiles, review);
        }

        // Create review tag
        reviewTagService.createAll(review);
        // Impl Requesting review inference through kafka

        return review.getId();
}
public class ReviewImageService {
	~~
    
    @Async
    public void createAll(List<MultipartFile> reviewImageFiles, Review review) {
        for (MultipartFile reviewImageFile : reviewImageFiles) {
            ReviewImage reviewImage = ReviewImage.builder()
                    .fileName(uploadReviewImageFilesOnS3(reviewImageFile))
                    .review(review)
                    .build();

            reviewImage = reviewImageRepository.save(reviewImage);
            review.addReviewImage(reviewImage);
        }
    }
}
public class ReviewTagService {
	~~
    
    @Async
    public void createAll(Review review) {
        // Ask inference review tag
        ReviewTagInferenceRequest reviewTagInferenceRequest = new ReviewTagInferenceRequest(review.getContent());
        ReviewTagInferenceResponse reviewTagInferenceResponse = reviewTagInferenceClient.inferenceReview(reviewTagInferenceRequest);

        // Create review tags
        List<ReviewTagCreateRequest> reviewTagCreateRequests = reviewTagInferenceResponse.getBody().getResults().stream().map(ReviewTagCreateRequest::new).toList();
        for (ReviewTagCreateRequest reviewTagCreateRequest : reviewTagCreateRequests) {
            ReviewTag reviewTag = reviewTagCreateRequest.toEntity(review);
            reviewTag = reviewTagRepository.save(reviewTag);
            review.addReviewTag(reviewTag);
        }
    }
}

ThreadSafe

비동기와 멀티쓰레드를 아는 사람은 본 코드가 Thread Safe하지 않은 코드처럼 보인다.

AWS S3에 접근하기 위한 AWSS3Client의 인스턴스 변수가 여러 쓰레드로부터 공유가 되고 있지만, 이미 ThreadSafe처리되어 어노테이션으로 문서화되어 있다.

AI 리뷰분석 모델 서버에 API 요청하기 위해 사용하는 OpenFeign 또한 커스텀 Decoder, Encoder, Logger, Retryer, ParamEncoder, ErrorDecoder를 사용하지 않는 이상 Thread Safe하다.

feign 컨트리뷰터 kdavisk6의 답변이다 (https://github.com/OpenFeign/feign/issues/875)

@Retryable

하지만 문제는 사진 업로드와 리뷰 분석을 비동기로 처리하므로 성공을 보장할 수 없다. 만약 잠깐의 네트워크 오류로 외부 서비스 작업이 실패한다면 정보의 손실이 클 것이다. 이를 위한 최소한의 안정장치를 조사해보던 도중 OpenFeign과 비동기 처리에 함께 쓰이는 시리즈 중 하나로 Retryer가 있었다. 특정 상황에서 함수를 재시도하도록 설정하는 어노테이션이다.

Spring Retry와 AOP관련된 의존성을 추가하고, 비동기 작업을 위해 추가한 것이니까 AsyncConfig에@EnableRetry를 추가한다.

implementation 'org.springframework.retry:spring-retry:1.3.4'
implementation 'org.springframework:spring-aspects:5.3.30'
@EnableRetry
@EnableAsync
@Configuration
public class AsyncConfig {

적용

  • @Retryable : 재시도할 함수 설정
  • value : 재시도 타켓의 Exception type
  • maxAttempt : 최대 재시도 횟수 (default : 3)
  • backoff : 각 재시도 전 대기하는 것에 대한 설정
    - delay : 대기 시간, ms (defaut : 1000ms)

AWS 예외를 캐치한 ExternalServiceException 예외의 경우 재시도

@Async
@Retryable(
        value = {ExternalServiceException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 10000)
)
private String uploadReviewImageFilesOnS3(MultipartFile reviewImage) {
        try {
            String fileName = System.currentTimeMillis() + "_" + reviewImage.getOriginalFilename();
            ObjectMetadata metadata = new ObjectMetadata();
            metadata.setContentType(reviewImage.getContentType());
            metadata.setContentLength(reviewImage.getSize());

            amazonS3Client.putObject(s3ImageBucketName, fileName, reviewImage.getInputStream(), metadata);
            return fileName;

        } catch (SdkClientException e) {
            throw new ExternalServiceException(AWS_S3_CLIENT_ERROR);
        } catch (IOException e) {
            throw new DomainLogicException(REVIEW_IMAGE_FILE_IO_ERROR);
        }
}

Feign 예외가 발생할 경우 재시도

public class ReviewTagService {
	~~
    
	@Async
    @Retryable(
            value = {FeignClientException.class, FeignServerException.class},
            maxAttempts = 3,
            backoff = @Backoff(delay = 10000)
    )
    public void createAll(Review review) {
        // Ask inference review tag
        ReviewTagInferenceRequest reviewTagInferenceRequest = new ReviewTagInferenceRequest(review.getContent());
        ReviewTagInferenceResponse reviewTagInferenceResponse = reviewTagInferenceClient.inferenceReview(reviewTagInferenceRequest);

        // Create review tags
        List<ReviewTagCreateRequest> reviewTagCreateRequests = reviewTagInferenceResponse.getBody().getResults().stream().map(ReviewTagCreateRequest::new).toList();
        for (ReviewTagCreateRequest reviewTagCreateRequest : reviewTagCreateRequests) {
            ReviewTag reviewTag = reviewTagCreateRequest.toEntity(review);
            reviewTag = reviewTagRepository.save(reviewTag);
            review.addReviewTag(reviewTag);
        }
    }
public interface ReviewTagInferenceClient {

    @GetMapping
    ReviewTagInferenceResponse inferenceReview(@RequestBody ReviewTagInferenceRequest reviewTagInferenceRequest) throws FeignException;
}

마무리

이 과정을 통해 리뷰 업로드 과정이 목표로 했던 비동기 약결합 구조로 변경되었다. 외부 시스템에 영향을 받지 않는 안정적인 구조이며, API 반환 속도도 8.5 sec -> 0.2 sec으로 단축할 수 있었다. 외부 시스템을 WAS 서버에 연결하는 과정에서 고민해야 하는 점을 겪고 개선하는 경험을 할 수 있었고, 정답인지는 모르겠지만 나름대로의 문제 분석과 해결을 통한 개선을 하였다.

이보다 더 좋은 방식이 있는지가 궁금하고, 기능이 외부 시스템 간의 통신을 통해 구현하는 MSA는 어떻게 굴러가는 지도 궁금해졌다.

profile
늘 학습하고 적용하고 개선하는 개발자

0개의 댓글