AWS SQS와 AWS Lambda로 서버 간 통신 구현하기에서 이어지는 글이다!
먼저 build.gradle에 dependency를 추가해준다.
implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.0")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.0.0'
이 프로젝트에서는 spring boot가 3.1.4라서 spring cloud aws 3.0.0 버전을 사용하였다
그리고 SQS 설정에 필요한 클래스를 작성해준다.
@Configuration
public class SqsConfig {
@Value("${spring.cloud.aws.credentials.access-key}")
private String accessKey;
@Value("${spring.cloud.aws.credentials.secret-key}")
private String secretKey;
@Bean
public SqsAsyncClient sqsAsyncClient() {
return SqsAsyncClient.builder()
.credentialsProvider(() -> new AwsCredentials() {
@Override
public String accessKeyId() {
return accessKey;
}
@Override
public String secretAccessKey() {
return secretKey;
}
})
.region(Region.AP_NORTHEAST_2)
.build();
}
@Bean
public SqsTemplate sqsTemplate() {
return SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient())
.build();
}
}
참고로 @Value
를 사용해서 시크릿 키 값을 위한 설정 파일 application-cloud.yml을 작성할 때 띄어쓰기에 주의해야 한다. (관련 오류로 꽤 고생했었던 기억이 있다.)
spring:
mvc:
async:
request-timeout: 60000
servlet:
multipart:
max-file-size: 20MB # 최대 파일 사이즈
max-request-size: 20MB # 최대 요청 사이즈
cloud:
aws:
credentials:
access-key: ${AWS_ACCESS_KEY_ID} # IAM에서 생성한 access-key
secret-key: ${AWS_SECRET_ACCESS_KEY} # IAM에서 생성한 secret-key
region:
static: ap-northeast-2 # 버킷 region
s3:
bucket: ai-styling-s3 # 버킷 이름
sqs:
queue:
name: responseQueue # SQS 이름
url1: ${AWS_SQS_URL_WORDS} # SQS URL
url2: ${AWS_SQS_URL_SENTENCES} # SQS URL
stack:
auto: false
google:
search:
serp:
api:
key: ${GOOGLE_API_KEY} # 구글 API 키
application-cloud.yml은 이렇게 작성하고 ${ }값들은 환경변수로 지정해주었다.
이제 Spring에서 SQS로 메시지를 보내보자.
위 설정에서 등록한 SqsTemplate
으로 간단하게 메시지를 보낼 수 있다.
private void sendRequestAIServer(String inputs, List<String> imageUrls) {
sqsTemplate.send(requestWordsQueueUrl, PromptWithWordsRequest.of(inputs, imageUrls));
}
https://velog.velcdn.com/images/gda05189/post/55f50ab1-b0fb-4616-bd64-ce36653a008b/image.png
공식 문서에 잘 나와 있듯이 SQS 엔드포인트 이름이랑, 전달할 내용을 인자로 담아서 호출해주면 된다.
나는 사용자 이미지랑 텍스트 프롬프트를 담아서 요청 메시지를 보냈다.
SQS 메시지 수신도 매우 간단하다.
@SqsListener("responseQueue")
private void receiveMessage(final String message) throws JsonProcessingException {
}
이렇게 @SqsListener("SQS Endpoint Name)"
을 사용하면 매개변수인 String message를 통해서 메시지를 수신할 수 있다.
이때 수신한 메시지를 사용자의 요청을 처리한 메서드로 보낼 수 있을지 고민에 빠졌다.
비동기 방식인 메시지 큐를 잘 활용하려면 요청과 응답 처리 모두 비동기로 처리해야 효율적일 것 같은데, 이걸 어떻게 구현해야 할지 막막했다.
그래서 이것저것 삽질해보다가 SqsTemplate에 있는 메서드들을 하나씩 살펴보기 시작했다.
@Override
public <T> CompletableFuture<Optional<org.springframework.messaging.Message<T>>> receiveAsync(
Consumer<SqsReceiveOptions> from, Class<T> payloadClass) {
그 중 receiveAsync
라는 메서드를 발견했다.
이 메서드는 응답 값을 CompletableFuture
을 통해 전달하는 것을 알게 되었다.
추가로 CompletableFuture
를 찾아보면서 자바스크립트에 Promise
랑 비슷한 개념으로 Future
에 응답이 올 때까지 non-blocking 방식으로 기다리고 있다가 응답이 오면 Complete
해서 지연된 응답 처리를 해주는 방식이라는 것을 알게 되었다. (그래서 이름이 CompleteableFuture
...)
그래서 CompletableFuture
를 이용해서 비동기 방식을 구현해보기로 결정했다.
sqsTemplate.send(requestWordsQueueUrl, PromptWithWordsRequest.of(inputs, imageUrls));
이렇게 SQS로 요청 메시지를 보내고 나서
final CompletableFuture<String> future = new CompletableFuture<>();
queue.add(future);
CompletableFuture
객체를 생성한 후 Queue에 넣어주었다.
이렇게 한 이유는 응답 메시지를 받으면 CompletableFuture
에 complete
를 해서 지연된 응답을 처리하기 위해서였다.
또한 Queue를 사용한 이유는 메시지 큐에서 아이디어를 얻어서 마찬가지로 Queue를 통해 Service 레이어의 메서드들 간의 통신을 하기 위해서이다.
이때 사용한 자료구조는 ConcurrentLinkedQueue
이었다. (동시성 문제를 예방하기 위해)
private static final ConcurrentLinkedQueue<CompletableFuture<String>> queue = new ConcurrentLinkedQueue<>();
그래서 receiveMessage
로 SQS의 응답 메시지를 수신하면 CompletableFuture
에 complete한 후 이 메시지를 사용자의 요청을 처리하는 getImageWithWords
로 Queue를 통해 넘겨주었다.
// receiveMessage 메서드 코드 일부
final CompletableFuture<String> pendingFuture = queue.peek();
if (null != pendingFuture)
pendingFuture.complete(responseMessage);
// getImageWithWords 메서드 코드 일부
Objects.requireNonNull(queue.peek())
.thenCompose(s -> queue.poll());
여기서 thenCompose
은 CompletableFuture
가 complete
되면 지연된 응답을 위해 queue에서 꺼내온다.
설명이 복잡한데 완성된 코드는 아래와 같다.
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class StylingService {
@Value("${spring.cloud.aws.sqs.queue.url1}")
private String requestWordsQueueUrl;
private final SqsTemplate sqsTemplate;
private final UserRepository userRepository;
private final StylingRepository stylingRepository;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ConcurrentLinkedQueue<CompletableFuture<String>> queue = new ConcurrentLinkedQueue<>();
public CompletableFuture<String> getImageWithWords(final StylingWordsRequest request, final Long id) throws JsonProcessingException {
final User user = userRepository.findById(id).orElseThrow(
() -> new Exception400(ErrorCode.USER_NOT_FOUND)
);
final List<String> imageUrls = deserializeImageUrls(user.getUserImages());
sendRequestAIServer(request.inputs(), imageUrls);
initializeResponseOrderManagement();
return waitForAIResponse();
}
private List<String> deserializeImageUrls(final String imgUrls) throws JsonProcessingException {
if (null == imgUrls) return List.of();
return objectMapper.readValue(imgUrls, new TypeReference<List<String>>() {
});
}
private void sendRequestAIServer(String inputs, List<String> imageUrls) {
sqsTemplate.send(requestWordsQueueUrl, PromptWithWordsRequest.of(inputs, imageUrls));
}
private void initializeResponseOrderManagement() {
final CompletableFuture<String> future = new CompletableFuture<>();
queue.add(future);
}
private CompletableFuture<String> waitForAIResponse() {
return Objects.requireNonNull(queue.peek())
.thenCompose(s -> queue.poll());
}
@SqsListener("responseQueue")
private void receiveMessage(final String message) throws JsonProcessingException {
if(message == null) return;
final String parseMessage = parseMessage(message);
completeAIResponse(parseMessage);
}
private String parseMessage(String message) throws JsonProcessingException {
return objectMapper.readValue(message, String.class);
}
private void completeAIResponse(String responseMessage) {
final CompletableFuture<String> pendingFuture = queue.peek();
if (null != pendingFuture)
pendingFuture.complete(responseMessage);
}
}
마틴 파울러의 리팩토링이라는 책을 읽었어서
나름대로 이해하기 쉽도록 메서드도 분리하고 메서드 이름도 고민해서 적어봤다.
Controller에서는 응답 API 스펙에 맞도록 thenApply()
를 사용해서 Mapping 해주었다.
@PostMapping("/styling/words")
@PreAuthorize("hasRole('ROLE_USER')")
@Operation(summary = "스타일 이미지 선택", description = "사용자가 이미지를 클릭하면 관련 단어를 서버로 전달")
@Parameter(name = "request.inputs", description = "이미지 관련 단어들")
public CompletableFuture<ApiResponse<String>> getImageWithWords(
@RequestBody @Valid final StylingWordsRequest request,
@AuthenticationPrincipal final CustomUserDetails userDetails) throws JsonProcessingException {
final var imageResponseFuture = stylingService.getImageWithWords(request, userDetails.getId());
return imageResponseFuture.thenApply(ApiResponse::ok);
}
전체 코드는 여기서 확인할 수 있다.
이 구현 과정에서 느낀점은 알고리즘을 풀면서 배운 자료구조를 실제로 활용하여 구현하게 되어서 조금 신기했다.
그리고 CompletableFuture
를 공부하면서 Spring Webflux
의 WebClient
에 대해 알게 되었는데, 놀랍게도 다음 기능인 외부 API 호출에서 활용하게 되었다.
이 외부 API 호출 구현 과정은 다음 글에 이어진다!