리뷰는 본 프로젝트의 핵심으로서, 최대한 많은 리뷰를 입력받고 이를 분석해 가치있는 정보를 제공하고 있습니다. 이러한 리뷰의 업로드는 다음과 같은 과정을 통해 진행됩니다.
AWS S3를 통한 사진 업로드와 AI 리뷰 분석 모델을 통한 리뷰 분석 요청이라는 2개의 외부 서비스를 거친 후 ㄹㅣ뷰 데이터가 DB에 저장됩니다. 즉, 동기로 작동하고 하나의 트랜잭션으로 묶이기 때문에, 클라이언트가 리뷰를 업로드할 때 세 가지 과정이 모두 완료되는 긴 시간을 기다려야 하고 외부 서비스 중 하나라도 실패하면 리뷰 업로드가 실패하는 강결합 구조이다.
따라서 트랜잭션을 분리해 안정성을 확보하고, 필요한 작업이 완료되면 클라이언트에게 반환하고 비동기로 나머지 작업을 진행하도록 변경해야 한다.
@EnableAsync
@Configuration
public class AsyncConfig {
private static final int EXECUTOR_CORE_POOL_SIZE = 50;
private static final int EXECUTOR_QUEUE_CAPACITY = 5000;
private static final int EXECUTOR_MAX_POOL_SIZE = Integer.MAX_VALUE;
private static final String EXECUTOR_THREAD_NAME_PREFIX = "async-task-executor-";
@Bean
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(EXECUTOR_CORE_POOL_SIZE);
executor.setQueueCapacity(EXECUTOR_QUEUE_CAPACITY);
executor.setMaxPoolSize(EXECUTOR_MAX_POOL_SIZE);
executor.setThreadNamePrefix(EXECUTOR_THREAD_NAME_PREFIX);
executor.initialize();
return executor;
}
}
@EnableAsync
: 비동기 처리 기능을 활성화한다CorePoolSize
: 동시에 실행시킬 쓰레드의 개수 (default: 1)MaxPoolSize
: 쓰레드 풀의 최대 사이드 (default: Integer.MAX_VALUE)QueueCapacity
: 큐의 사이즈 (default: Integer.MAX_VALUE)java.util.concurrent.RejectedExecutionException
예외가 발생된다.비동기 처리할 코드를 Runnable로 선언한 후 Executor.execute(runnable)
를 사용하는 것이 정석이겠지만, 스프링의 특징인 어노테이션을 활용한다면 굉장히 쉽고 코드가 깔끔해집니다. 단지, 비동기 처리를 할 함수에 @Async
를 붙이고 실행하면 된다. 만약 비동기 처리된 메소드의 반환값을 받고 싶으면, Future
, CompleteFuture
, ListenableFuture
을 사용하고 반환값 없이 실행만 하고 싶다면 void를 사용하면 된다.
public class ReviewService {
~~
@Transactional
public Long create(String partnerDomain, String reservationPartnerCustomId, ReviewCreateRequest reviewCreateRequest, List<MultipartFile> reviewImageFiles) {
// Get reservation
final Reservation reservation = reservationService.findByPartnerDomainAndPartnerCustomId(partnerDomain, reservationPartnerCustomId);
validateAlreadyReviewedByReservationId(reservation.getId());
// Create review
Review review = reviewCreateRequest.toEntity(reservation);
reviewRepository.save(review);
reservation.getTravelProduct().addReviewInfo(review.getRating());
// Create review images
if (reviewImageFiles != null) {
reviewImageService.createAll(reviewImageFiles, review);
}
// Create review tag
reviewTagService.createAll(review);
// Impl Requesting review inference through kafka
return review.getId();
}
public class ReviewImageService {
~~
@Async
public void createAll(List<MultipartFile> reviewImageFiles, Review review) {
for (MultipartFile reviewImageFile : reviewImageFiles) {
ReviewImage reviewImage = ReviewImage.builder()
.fileName(uploadReviewImageFilesOnS3(reviewImageFile))
.review(review)
.build();
reviewImage = reviewImageRepository.save(reviewImage);
review.addReviewImage(reviewImage);
}
}
}
public class ReviewTagService {
~~
@Async
public void createAll(Review review) {
// Ask inference review tag
ReviewTagInferenceRequest reviewTagInferenceRequest = new ReviewTagInferenceRequest(review.getContent());
ReviewTagInferenceResponse reviewTagInferenceResponse = reviewTagInferenceClient.inferenceReview(reviewTagInferenceRequest);
// Create review tags
List<ReviewTagCreateRequest> reviewTagCreateRequests = reviewTagInferenceResponse.getBody().getResults().stream().map(ReviewTagCreateRequest::new).toList();
for (ReviewTagCreateRequest reviewTagCreateRequest : reviewTagCreateRequests) {
ReviewTag reviewTag = reviewTagCreateRequest.toEntity(review);
reviewTag = reviewTagRepository.save(reviewTag);
review.addReviewTag(reviewTag);
}
}
}
비동기와 멀티쓰레드를 아는 사람은 본 코드가 Thread Safe하지 않은 코드처럼 보인다.
AWS S3에 접근하기 위한 AWSS3Client의 인스턴스 변수가 여러 쓰레드로부터 공유가 되고 있지만, 이미 ThreadSafe처리되어 어노테이션으로 문서화되어 있다.
AI 리뷰분석 모델 서버에 API 요청하기 위해 사용하는 OpenFeign 또한 커스텀 Decoder, Encoder, Logger, Retryer, ParamEncoder, ErrorDecoder를 사용하지 않는 이상 Thread Safe하다.
하지만 문제는 사진 업로드와 리뷰 분석을 비동기로 처리하므로 성공을 보장할 수 없다. 만약 잠깐의 네트워크 오류로 외부 서비스 작업이 실패한다면 정보의 손실이 클 것이다. 이를 위한 최소한의 안정장치를 조사해보던 도중 OpenFeign과 비동기 처리에 함께 쓰이는 시리즈 중 하나로 Retryer가 있었다. 특정 상황에서 함수를 재시도하도록 설정하는 어노테이션이다.
Spring Retry와 AOP관련된 의존성을 추가하고, 비동기 작업을 위해 추가한 것이니까 AsyncConfig에@EnableRetry
를 추가한다.
implementation 'org.springframework.retry:spring-retry:1.3.4'
implementation 'org.springframework:spring-aspects:5.3.30'
@EnableRetry
@EnableAsync
@Configuration
public class AsyncConfig {
@Retryable
: 재시도할 함수 설정value
: 재시도 타켓의 Exception typemaxAttempt
: 최대 재시도 횟수 (default : 3)backoff
: 각 재시도 전 대기하는 것에 대한 설정delay
: 대기 시간, ms (defaut : 1000ms)@Async
@Retryable(
value = {ExternalServiceException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 10000)
)
private String uploadReviewImageFilesOnS3(MultipartFile reviewImage) {
try {
String fileName = System.currentTimeMillis() + "_" + reviewImage.getOriginalFilename();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType(reviewImage.getContentType());
metadata.setContentLength(reviewImage.getSize());
amazonS3Client.putObject(s3ImageBucketName, fileName, reviewImage.getInputStream(), metadata);
return fileName;
} catch (SdkClientException e) {
throw new ExternalServiceException(AWS_S3_CLIENT_ERROR);
} catch (IOException e) {
throw new DomainLogicException(REVIEW_IMAGE_FILE_IO_ERROR);
}
}
public class ReviewTagService {
~~
@Async
@Retryable(
value = {FeignClientException.class, FeignServerException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 10000)
)
public void createAll(Review review) {
// Ask inference review tag
ReviewTagInferenceRequest reviewTagInferenceRequest = new ReviewTagInferenceRequest(review.getContent());
ReviewTagInferenceResponse reviewTagInferenceResponse = reviewTagInferenceClient.inferenceReview(reviewTagInferenceRequest);
// Create review tags
List<ReviewTagCreateRequest> reviewTagCreateRequests = reviewTagInferenceResponse.getBody().getResults().stream().map(ReviewTagCreateRequest::new).toList();
for (ReviewTagCreateRequest reviewTagCreateRequest : reviewTagCreateRequests) {
ReviewTag reviewTag = reviewTagCreateRequest.toEntity(review);
reviewTag = reviewTagRepository.save(reviewTag);
review.addReviewTag(reviewTag);
}
}
public interface ReviewTagInferenceClient {
@GetMapping
ReviewTagInferenceResponse inferenceReview(@RequestBody ReviewTagInferenceRequest reviewTagInferenceRequest) throws FeignException;
}
이 과정을 통해 리뷰 업로드 과정이 목표로 했던 비동기 약결합 구조로 변경되었다. 외부 시스템에 영향을 받지 않는 안정적인 구조이며, API 반환 속도도 8.5 sec -> 0.2 sec으로 단축할 수 있었다. 외부 시스템을 WAS 서버에 연결하는 과정에서 고민해야 하는 점을 겪고 개선하는 경험을 할 수 있었고, 정답인지는 모르겠지만 나름대로의 문제 분석과 해결을 통한 개선을 하였다.
이보다 더 좋은 방식이 있는지가 궁금하고, 기능이 외부 시스템 간의 통신을 통해 구현하는 MSA는 어떻게 굴러가는 지도 궁금해졌다.