SQS는 마이크로서비스, 분산 시스템 및 서버리스 애플리케이션을 위한 완전 관리형 메시지 대기열 입니다. 간단히 설명하자면 애플리케이션 간의 메세지를 주고받기 위한 아주 간단한 Queue(또는 메세지 브로커)라고 생각하시면 됩니다.
위 그림은 일반적인 Queue의 처리과정 입니다.
이렇게 프로듀서와 컨슈머를 Queue라는 미들웨어로 분리하면, 각 시스템에 영향을 받지 않고 원하는 작업을 수행할 수 있습니다.
장점
단점
장점
단점
현재 본인은 e-commerce 도메인의 MSA 환경을 고려한 주문 & 선물하기 프로젝트를 진행하던 중 다음과 같은 시나리오에서 양방향 통신에 문제가 발생했습니다.
주문 서비스와 선물하기 서비스 간의 메세지 브로커를 두고 메세지 기반 비동기 통신을 하며 Gift → Order의 방향성을 제거하기 위해, SQS 도입을 결정했습니다.
해당 프로젝트에선 메세지의 순서를 지켜야 하기때문에 FIFO 유형을 선택했습니다. FIFO 유형의 SQS를 만들고 싶다면 이름 뒤에 꼭 .fifo를 붙여줘야 합니다
기본값으로 설정했습니다. 해당 구성은 메세지의 크기, 보존 기간 등을 설정할 수 있습니다.
SQS 메세지에 대한 암호화입니다. 비활성화 해주었습니다.
액세스 정책도 기본값으로 진행했습니다. 액세스 정책은 이 대기열에 액세스 할 수 있는 계정 및 사용자와 허용되는 작업을 정의합니다.
비활성화로 설정했습니다.
작성하지 않고 하단의 대기열 생성 버튼을 눌러줍니다.
오른쪽 상단의 사용자 생성 버튼을 눌러줍니다.
사용자 이름을 설정해 준 뒤 오른쪽 하단의 다음 버튼을 눌러줍니다.
직접 정책 연결 → AmazonSQSFullAccess 권한을 체크한 뒤 오른쪽 하단의 다음 버튼을 눌러줍니다.
태그 추가는 입력없이 skip한 후 사용자 생성 버튼을 눌러줍니다.
새로 만든 사용자에 들어가 액세스 키1 아래에 있는 액세스 키 만들기를 눌러줍니다.
Command Line Interface(CLI)를 선택 후 다음 버튼 클릭합니다.
액세스 키 만들기 버튼을 클릭합니다.
csv 파일 다운로드 후 액세스 키 생성을 완료합니다.
Java 11
SpringBoot 2.7.16
// aws sqs
implementation platform('software.amazon.awssdk:bom:2.15.0')
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.1.RELEASE'
example.order:
base-url: http://localhost:8080/
cloud:
aws:
access-key: ${aws.access-key}
secret-key: ${aws.secret-key}
AWS IAM에서 만든 access-key와 secret-key를 .yml에 작성해줍니다. access-key와 secret-key는 노출되면 안되기 때문에 application-secret.yml 파일을 따로 작성하여 저장했습니다.
@Configuration
public class AwsSqsConfig {
@Value("${cloud.aws.access-key}")
private String awsAccessKey;
@Value("${cloud.aws.secret-key}")
private String awsSecretKey;
@Bean
public AmazonSQSAsync amazonSQSAsync() {
AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(awsAccessKey, awsSecretKey));
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion(Regions.AP_NORTHEAST_2)
.withCredentials(awsCredentialsProvider)
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonAsync());
}
}
QueueMessageTemplate를 사용하여 SQS Queue에 메세지를 보낼 수 있습니다.
public class AwsSqsSender {
private final String SQS_QUEUE_NAME = "SQS를 생성할 당시 작성한 이름.fifo";
private final QueueMessagingTemplate queueMessagingTemplate;
public void messageSender(Message message) {
try {
Map<String, Object> headers = new HashMap<>();
headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "item-queues");
headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
queueMessagingTemplate.convertAndSend(SQS_QUEUE_NAME, message, headers);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
FIFO Queue로 전송한다면 메세지 헤더에 FIFO Queue를 나타내는 값을 실어서 보낼 수 있습니다. FIFO Queue는 SQS_GROUP_ID_HEADER로 메세지 그룹핑이 필요합니다. 또한 SQS_DEDUPLICATION_ID_HEADER를 지정하여 중복을 제거합니다. 중복을 구분할 내용은 메세지 중복 키를 받아서 convertAndSend() 메서드로 메세지를 전송합니다.
@Component
public class AwsSqsListnerConfig {
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setAutoStartup(true); // 초기화 후 컨테이너를 자동으로 시작할 지 여부를 결정합니다.
return factory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory,
QueueMessageHandler queueMessageHandler,
ThreadPoolTaskExecutor messageThreadPoolTaskExecutor) {
SimpleMessageListenerContainer container = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
container.setMessageHandler(queueMessageHandler);
container.setTaskExecutor(messageThreadPoolTaskExecutor);
return container;
}
@Bean
@ConditionalOnMissingBean(QueueMessageHandler.class)
public QueueMessageHandlerFactory queueMessageHandlerFactory(AmazonSQSAsync amazonSQSAsync) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(amazonSQSAsync);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setStrictContentTypeMatch(false);
factory.setArgumentResolvers(Collections.singletonList(new PayloadMethodArgumentResolver(messageConverter)));
return factory;
}
@Bean
@ConditionalOnMissingBean(QueueMessageHandler.class)
public QueueMessageHandler queueMessageHandler(QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadNamePrefix("sqs-");
taskExecutor.setCorePoolSize(8);
taskExecutor.setMaxPoolSize(100);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
메세지를 수신을 위한 핸들러와 리스너에 대한 정보를 설정해줍니다.
public class AwsSqsMessageListner {
@SqsListner(value = "SQS 이름.fifo", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void readMessage(String message) {
log.info("message" : {} ", message);
}
}
물론 그런 경우는 매우 희박하겠지만, 다양한 이유로 메세지를 못 받을 가능성 역시 배제할 수 없습니다. 이런 경우를 위해 AWS SQS는 DLQ라는 기능을 제공합니다.
DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 특수한 유형의 메시지 대기열입니다.
DLQ에 쌓인 메시지들을 보면 왜 이 메시지들이 컨슈머에 의해 처리되지 못했는지를 알 수 있습니다. 만약 컨슈머 애플리케이션의 버그를 찾아서 수정했다면 AWS 콘솔 화면에서 redrive를 수행함으로써 해당 메시지들을 다시 소스 큐에 집어넣을 수 있습니다.
DLQ에 메시지가 쌓인다면, 컨슈머 어플리케이션을 디버깅해서 왜 컨슘이 실패했는지를 분석하고 패치한 다음, DLQ redrive를 하면 다시 메시지들을 소스 큐로 편하게 클릭 한 방으로 전송할 수 있습니다.
주의사항
- 메세지 최대 보관 기간은 원래 Queue의 보관 기간보다 길어야 합니다.
- FIFO 큐의 DLQ 또한 FIFO 큐여야 하며, 마찬가지로 Standard 큐의 DLQ 역시 Standard여야 합니다.
AWS의 SQS로 들어가 대기열을 새로 생성해줍니다. SQS를 FIFO로 설정해두었으니 DLQ 또한 FIFO로 설정해줍니다.
여기서 메세지 보존기간은 꼭 SQS에서 설정한 시간보다 길게 잡아줘야 합니다. 본인은 SQS에서 4일 SQS-DLQ에서는 8일로 잡았습니다.
DLQ에 들어온 메세지는 DLQ에 들어온 시간부터 타임을 체크하는 것이 아닌 메세지가 생성된 기준이기 때문입니다.
SQS와 같이 암호화는 비활성화 해줬습니다.
엑세스 정책은 기본값으로 설정합니다.
모두 기본값으로 설정 해준 뒤 오른쪽 하단 대기열 생성 버튼을 눌러줍니다.
이제 처음에 만들었던 SQS로 돌아가서 상단의 편집 버튼을 눌러줍니다.
배달 못한 편지 대기열로 들어가 활성화 버튼을 눌러주고 방금 생성했던 dlq를 선택합니다. 최대 수신 수란 실패 시 재시도하는 수를 말합니다. 기존의 Queue에서 메시지 폴링 작업을 행할 시 해당 메시지의 수신 수가 증가하게 되는데 최대 수신 수가 넘어갈 시 기존 Queue에 저장되었던 메시지가 DLQ로 넘어가게 됩니다.
설정이 끝나면 저장 버튼을 눌러주면 끝입니다.
만약 DLQ에 메세지가 들어온다면 처리되지 못한 메세지가 있다는 뜻이니, 메세지가 전달되지 못한 이유를 빨리 해결하기 위해 CloudWatch를 통해 알람 설정을 해보겠습니다.
Amazon CloudWatch는 AWS 리소스와 AWS에서 실시간으로 실행 중인 애플리케이션을 모니터링 하는 서비스 입니다. 지표를 감시해 알림을 보내거나 임계값을 위반한 경우 모니터링 중인 리소스를 자동으로 변경하는 경보를 생성할 수 있습니다. 예를 들어 경보는 인스턴스 중지, auto scaling 및 Amazon SNS 작업 시작, 종료 등으로 구성할 수 있습니다.
AWS의 CloudWatch로 들어가 왼쪽 메뉴바에 있는 경보 → 모든 경보로 들어가 줍니다.
오른쪽 상단에 있는 경보 생성 버튼을 눌러줍니다.
맨 처음 지표 및 조건 지정에서 지표를 선택 버튼을 눌러줍니다.
SQS → 대기열 지표를 눌러줍니다.
만들어둔 DLQ가 여러개 뜨는데 지표 이름이 NumberOfMessagesReceived인 DLQ를 체크해준 뒤 지표 선택 버튼을 눌러줍니다.
통계 → 합계로 선택해줍니다. 그래야 DLQ에 메세지가 한개라도 들어오면 경보를 받을 수 있습니다.
조건에서는 보다 크거나 같음 선택 → … 보다에서 1을 입력해줍니다. 이 또한 마찬지로 메세지가 1개라도 들어오면 경보 울립니다. 다음 버튼을 눌러줍니다.
알림에서는 새 주제 생성을 체크해주고 해당 주제의 이름을 정해줍니다. 알림을 수신할 이메일 엔드포인트는 알림을 받을 이메일의 주소를 적어주면 됩니다.
이름 및 설명 추가 부분에서 경보 이름을 지정해 주고 해당 경보에 대한 설명을 작성해 준 뒤 다음 버튼을 누르고 그 다음 생성 버튼을 눌러줍니다.
이후 DLQ에 메세지가 들어오게 되면 아래와 같은 알람이 들어옵니다.