SQS 적용해보기 & AWS SQS, DLQ, CloudWatch 한번에 파헤치기

파이 ఇ·2024년 7월 17일
1

SQS (Simple Queue Service) ?

SQS는 마이크로서비스, 분산 시스템 및 서버리스 애플리케이션을 위한 완전 관리형 메시지 대기열 입니다. 간단히 설명하자면 애플리케이션 간의 메세지를 주고받기 위한 아주 간단한 Queue(또는 메세지 브로커)라고 생각하시면 됩니다.

AWS SQS의 주요 개념

  1. Queue (대기열) : SQS는 메세지를 저장하는 데 사용되는 대기열 입니다. 대기열은 메세지 저장소이며, 메세지가 비동기적으로 보내지고 처리됩니다. 각 대기열은 고유한 이름을 가지며, 여러 프로듀서 및 컨슈머가 동시에 대기열에 접근할 수 있습니다.
  2. Message (메세지) : 메세지는 프로듀서가 컨슈머에게 전송하는 단위입니다. 메세지는 최대 256KB까지의 페이로드를 가질 수 있습니다.
  3. Producer (프로듀서) : 프로듀서는 메세지를 SQS 대기열에 보내는 애플리케이션 또는 서비스입니다. 프로듀서는 메세지를 대기열에 전송하고, 필요에 따라 메세지 속성과 디바운스(딜레이) 시간을 설정할 수 있습니다.
  4. Consumer (컨슈머) : 컨슈머는 SQS 대기열에서 메세지를 받아 처리하는 애플리케이션 또는 서비스입니다. 컨슈머는 대기열에서 메세지를 읽고, 해당 메세지에 대한 작업을 수행한 후 메세지를 삭제합니다. 여러 컨슈머가 대기열에 동시에 작업을 처리할 수 있으며, SQS는 메세지의 가용성과 처리 성능을 보장합니다.

SQS의 처리 과정

위 그림은 일반적인 Queue의 처리과정 입니다.

  • Procucer는 메세지를 생성하여 Queue로 메세지를 전달합니다.
  • Queue는 메세지를 일정 기간 가지고 있게 됩니다.
  • Consumer는 주기적으로 Queue를 Polling하면서 신규 메세지가 있다면 가져가서 처리합니다.
  • 처리가 끝나면 Queue로 Ack를 전송합니다. (메세지 아이디에 해당하는 Ack를 받으면 Queue에서 메세지를 제거합니다.)

이렇게 프로듀서와 컨슈머를 Queue라는 미들웨어로 분리하면, 각 시스템에 영향을 받지 않고 원하는 작업을 수행할 수 있습니다.

SQS 대기열

표준 대기열 (Standard Queue)

장점

  • 무제한에 가까운 메세지 전송을 지원(최대 처리량) 합니다.
  • 최소 1회 전달을 보장합니다. 단, 중복 수신이 될 수 있습니다.
  • Best-Effort-Ordering : 최대한 순서를 보장하고자 노력합니다. (하지만 신뢰할 수 는 없습니다.)

단점

  • 메세지의 순서를 보장할 수 없습니다.
  • 반드시 1번만 읽기 보장을 할 수 없습니다. (중복 읽기 가능성이 존재합니다.)

FIFO 대기열 (First In First Out Queue)

장점

  • 메세지의 순서를 보장합니다.
  • Exactly-Once Processing : 1번의 전송, 1번의 수신을 보장할 수 있습니다.

단점

  • Consumer가 하나여야 합니다.
  • Limited Throughput : 순서를 위해 초당 300TPS 제한이 있습니다.

1. SQS 적용하기

Overview

현재 본인은 e-commerce 도메인의 MSA 환경을 고려한 주문 & 선물하기 프로젝트를 진행하던 중 다음과 같은 시나리오에서 양방향 통신에 문제가 발생했습니다.

  1. Gift 프로젝트에서 Order 프로젝트로 주문 생성 API 요청을 전달합니다.
  2. Order 프로젝트는 해당 요청을 받아 주문건을 성공적으로 처리합니다.
  3. Gift 프로젝트는 또 한번 Order 프로젝트로 해당 주문건에 대한 결제를 요청합니다.
  4. Order 프로젝트는 결제를 완료한 후, Gift 프로젝트로 결제에 대한 응답을 보내줘야 합니다. 이 과정에서 양방향 통신 문제가 발생합니다.

주문 서비스와 선물하기 서비스 간의 메세지 브로커를 두고 메세지 기반 비동기 통신을 하며 Gift → Order의 방향성을 제거하기 위해, SQS 도입을 결정했습니다.

  • Order : SqsMessageSender
  • Gift : SqsMessageListener

SQS 대기열 생성하기

해당 프로젝트에선 메세지의 순서를 지켜야 하기때문에 FIFO 유형을 선택했습니다. FIFO 유형의 SQS를 만들고 싶다면 이름 뒤에 꼭 .fifo를 붙여줘야 합니다

메세지 Config 설정

기본값으로 설정했습니다. 해당 구성은 메세지의 크기, 보존 기간 등을 설정할 수 있습니다.

  • 표시 제한 기간
    • 한 컨슈머가 대기열에서 수신한 메세지가 다른 메세지 컨슈머에게 보이지 않게 되는 시간
    • 컨슈머가 메세지를 가져가고, 처리 및 삭제하지 못하면 다른 컨슈머에게 메세지가 보이게 됩니다. (즉, 1번만 보여야 하는 메세지라면 시간 내 삭제가 필요합니다.)
    • 기본값은 30초 입니다. (컨슈머가 처리하는 시간에 따라 조정하시면 됩니다.)
    • 한번 설정하면 큐 내부의 모든 메세지에 대해 적용됩니다.
    • 최적의 성능을 위해 표시 제한 시간 초과는 AWS SDK 읽기 제한 시간보다 길게 설정해야 합니다.
  • 메세지 보존 기간
    • Amazon SQS가 삭제되지 않은 메세지를 보관하는 시간입니다.
    • 보존기간 범위 1분 → 14일까지 가능합니다.
    • 대기열, 배달하지 못한 메세지 대기열 통합 시간입니다. (배달되지 못한 메세지 대기열의 보존 기간을 원래 대기열의 보존 기간보다 길게 잡아야 합니다.)
  • 전송 지연
    • 이 대기열에 추가된 각 메세지의 첫 번째 전송에 대한 지연 시간입니다.
    • 지정된 시간동안 메세지는 컨슈머에게 소비되지 않고 지연된다.
    • 0초~15분까지 설정 가능합니다.
    • Standard 는 한번 설정된 시간은 계속 흘러 지연되었다가 진행합니다.
    • FIFO 는 설정 변경하면 이후 메시지부터 적용됩니다.
  • 최대 메세지 크기
    • 이 대기열에 최대 메시지 크기입니다.
    • 1바이트 ~ 256KB까지 가능합니다.
    • 256KB보다 큰 메시지 전송시에는 Amazon SQS Extended Client Library 를 이용하여 전송 가능합니다.
    • Amazon S3에 메시지 로드에 대한 참조가 포함된 메시지 전송합니다.
    • 최대 크기는 2GB까지 가능합니다.
  • 메세지 수신 대기 시간
    • 폴링이 메시지를 사용할 수 있을 때까지 기다리는 최대 시간입니다.
    • 0초 ~ 20초까지 가능합니다.
    • 긴 폴링은 빈 응답수 (ReceiveMessage 요청에 사용할 수 메시지가 없는경우)와 잘못된 빈 응답(메시지를 사용할 수 있지만 응답에 포함되지 않은경우)를 제거 하여 Amazon SQS 사용 비용 줄이기 가능합니다.
    • 수신 요청이 최대 메시지 수를 수집하면 즉시 반환합니다.
    • 0으로 설정하면 짧은 폴링이 됩니다.
  • 콘텐츠 기반 중복 제거
    • Amazon SQS 는 메시지 본문에 기반하여 중복 제거 ID를 자동 생성 가능합니다.

암호화

SQS 메세지에 대한 암호화입니다. 비활성화 해주었습니다.

액세스 정책

액세스 정책도 기본값으로 진행했습니다. 액세스 정책은 이 대기열에 액세스 할 수 있는 계정 및 사용자와 허용되는 작업을 정의합니다.

리드라이브와 배달 못한 편지 대기열

비활성화로 설정했습니다.

  • 리드라이브 허용 정책 설정은 메시지 처리 실패 시 대응 방법을 정의하는 것이고, 그중 하나의 옵션이 배달 못한 편지 대기열 설정입니다. 이 설정을 통해 실패한 메시지를 어떻게 처리할지 결정할 수 있습니다.
  • 배달 못한 편지 대기열 설정은 실패한 메시지를 위한 DLQ(Dead Letter Queue)를 지정하는 부분입니다. 메시지가 설정된 재시도 횟수를 초과하여 여전히 처리되지 못했을 때, 해당 메시지를 DLQ로 보내서 관리할 수 있게 해 줍니다. 이렇게 DLQ를 사용하면 실패한 메시지를 분리하여 문제 해결을 위한 분석이나 후속 조치를 취할 수 있습니다.

태그

작성하지 않고 하단의 대기열 생성 버튼을 눌러줍니다.

IAM 설정하기

오른쪽 상단의 사용자 생성 버튼을 눌러줍니다.

사용자 이름을 설정해 준 뒤 오른쪽 하단의 다음 버튼을 눌러줍니다.

직접 정책 연결 → AmazonSQSFullAccess 권한을 체크한 뒤 오른쪽 하단의 다음 버튼을 눌러줍니다.

태그 추가는 입력없이 skip한 후 사용자 생성 버튼을 눌러줍니다.

액세스 키 만들기

새로 만든 사용자에 들어가 액세스 키1 아래에 있는 액세스 키 만들기를 눌러줍니다.

Command Line Interface(CLI)를 선택 후 다음 버튼 클릭합니다.

액세스 키 만들기 버튼을 클릭합니다.

csv 파일 다운로드 후 액세스 키 생성을 완료합니다.

프로젝트 구성

Java 11
SpringBoot 2.7.16

build.gradle

// aws sqs
implementation platform('software.amazon.awssdk:bom:2.15.0')
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.1.RELEASE'

application.yml

example.order:
  base-url: http://localhost:8080/

cloud:
  aws:
    access-key: ${aws.access-key}
    secret-key: ${aws.secret-key}

AWS IAM에서 만든 access-key와 secret-key를 .yml에 작성해줍니다. access-key와 secret-key는 노출되면 안되기 때문에 application-secret.yml 파일을 따로 작성하여 저장했습니다.

AwsSqsConfig 작성

@Configuration
public class AwsSqsConfig {
	@Value("${cloud.aws.access-key}")
	private String awsAccessKey;

	@Value("${cloud.aws.secret-key}")
	private String awsSecretKey;
	
	@Bean
	public AmazonSQSAsync amazonSQSAsync() {
		AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(
			new BasicAWSCredentials(awsAccessKey, awsSecretKey));

		return AmazonSQSAsyncClientBuilder
			.standard()
			.withRegion(Regions.AP_NORTHEAST_2)
			.withCredentials(awsCredentialsProvider)
			.build();
	}
	
	@Bean
	public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
		return new QueueMessagingTemplate(amazonAsync());
	}
}

QueueMessageTemplate를 사용하여 SQS Queue에 메세지를 보낼 수 있습니다.

AwsSqsSender 작성

public class AwsSqsSender {
	private final String SQS_QUEUE_NAME = "SQS를 생성할 당시 작성한 이름.fifo";
	private final QueueMessagingTemplate queueMessagingTemplate;
	
	public void messageSender(Message message) {
		try {
				Map<String, Object> headers = new HashMap<>();
				headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "item-queues");
				headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
				queueMessagingTemplate.convertAndSend(SQS_QUEUE_NAME, message, headers);
		} catch (Exception e) {
				throw new RuntimeException(e);
		}
	}
}

FIFO Queue로 전송한다면 메세지 헤더에 FIFO Queue를 나타내는 값을 실어서 보낼 수 있습니다. FIFO Queue는 SQS_GROUP_ID_HEADER로 메세지 그룹핑이 필요합니다. 또한 SQS_DEDUPLICATION_ID_HEADER를 지정하여 중복을 제거합니다. 중복을 구분할 내용은 메세지 중복 키를 받아서 convertAndSend() 메서드로 메세지를 전송합니다.

AwsSqsListnerConfig 작성

@Component
public class AwsSqsListnerConfig {
	
	@Bean
	public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
			SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
			factory.setAmazonSqs(amazonSQSAsync);
			factory.setAutoStartup(true); // 초기화 후 컨테이너를 자동으로 시작할 지 여부를 결정합니다.
			return factory;
	}
	
	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer(
		SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory,
		QueueMessageHandler queueMessageHandler,
		ThreadPoolTaskExecutor messageThreadPoolTaskExecutor) {
		SimpleMessageListenerContainer container = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
		container.setMessageHandler(queueMessageHandler);
		container.setTaskExecutor(messageThreadPoolTaskExecutor);
		return container;
	}

	@Bean
	@ConditionalOnMissingBean(QueueMessageHandler.class)
	public QueueMessageHandlerFactory queueMessageHandlerFactory(AmazonSQSAsync amazonSQSAsync) {
		QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
		factory.setAmazonSqs(amazonSQSAsync);

		MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
		messageConverter.setStrictContentTypeMatch(false);
		factory.setArgumentResolvers(Collections.singletonList(new PayloadMethodArgumentResolver(messageConverter)));
		return factory;
	}
	
	@Bean
	@ConditionalOnMissingBean(QueueMessageHandler.class)
	public QueueMessageHandler queueMessageHandler(QueueMessageHandlerFactory queueMessageHandlerFactory) {
		return queueMessageHandlerFactory.createQueueMessageHandler();
	}

	@Bean
	public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setThreadNamePrefix("sqs-");
		taskExecutor.setCorePoolSize(8);
		taskExecutor.setMaxPoolSize(100);
		taskExecutor.afterPropertiesSet();
		return taskExecutor;
	}
}

메세지를 수신을 위한 핸들러와 리스너에 대한 정보를 설정해줍니다.

AwsSqsMessageListner

public class AwsSqsMessageListner {
	
	@SqsListner(value = "SQS 이름.fifo", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
	public void readMessage(String message) {
		log.info("message" : {} ", message);
	}
}
  • deletionPolicy : 메세지를 받은 이후의 삭제 정책 입니다.
    • ALWAYS : 메서드로 message가 들어오면 무조건 삭제 요청을 보낸다.
    • NEVER : 절대 삭제 요청을 보내지 않는다.
    • NO_REDRIVE : redrive policy(DLQ)가 정의되지 않았으면 메세지를 삭제한다.
    • ON_SUCCESS : SqsListner 애노테이션이 붙은 메서드에서 에러가 나지 않으면 메세지를 삭제한다.
      (에러가 난 경우는 삭제하지 않음.)

하지만 Listner가 SQS에서 메세지를 제대로 전달받지 못하면 어떡하지?

물론 그런 경우는 매우 희박하겠지만, 다양한 이유로 메세지를 못 받을 가능성 역시 배제할 수 없습니다. 이런 경우를 위해 AWS SQS는 DLQ라는 기능을 제공합니다.

2. DLQ 적용하기

DLQ (Dead Letter Queue)란 ?

DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 특수한 유형의 메시지 대기열입니다.

DLQ 어떻게 활용할까요 ?

DLQ에 쌓인 메시지들을 보면 왜 이 메시지들이 컨슈머에 의해 처리되지 못했는지를 알 수 있습니다. 만약 컨슈머 애플리케이션의 버그를 찾아서 수정했다면 AWS 콘솔 화면에서 redrive를 수행함으로써 해당 메시지들을 다시 소스 큐에 집어넣을 수 있습니다.

DLQ에 메시지가 쌓인다면, 컨슈머 어플리케이션을 디버깅해서 왜 컨슘이 실패했는지를 분석하고 패치한 다음, DLQ redrive를 하면 다시 메시지들을 소스 큐로 편하게 클릭 한 방으로 전송할 수 있습니다.

주의사항

  • 메세지 최대 보관 기간은 원래 Queue의 보관 기간보다 길어야 합니다.
  • FIFO 큐의 DLQ 또한 FIFO 큐여야 하며, 마찬가지로 Standard 큐의 DLQ 역시 Standard여야 합니다.

DLQ를 위한 대기열 생성

AWS의 SQS로 들어가 대기열을 새로 생성해줍니다. SQS를 FIFO로 설정해두었으니 DLQ 또한 FIFO로 설정해줍니다.

여기서 메세지 보존기간은 꼭 SQS에서 설정한 시간보다 길게 잡아줘야 합니다. 본인은 SQS에서 4일 SQS-DLQ에서는 8일로 잡았습니다.
DLQ에 들어온 메세지는 DLQ에 들어온 시간부터 타임을 체크하는 것이 아닌 메세지가 생성된 기준이기 때문입니다.

SQS와 같이 암호화는 비활성화 해줬습니다.

엑세스 정책은 기본값으로 설정합니다.

모두 기본값으로 설정 해준 뒤 오른쪽 하단 대기열 생성 버튼을 눌러줍니다.

SQS 설정

이제 처음에 만들었던 SQS로 돌아가서 상단의 편집 버튼을 눌러줍니다.

배달 못한 편지 대기열로 들어가 활성화 버튼을 눌러주고 방금 생성했던 dlq를 선택합니다. 최대 수신 수란 실패 시 재시도하는 수를 말합니다. 기존의 Queue에서 메시지 폴링 작업을 행할 시 해당 메시지의 수신 수가 증가하게 되는데 최대 수신 수가 넘어갈 시 기존 Queue에 저장되었던 메시지가 DLQ로 넘어가게 됩니다.

설정이 끝나면 저장 버튼을 눌러주면 끝입니다.

3. CloudWatch를 이용하여 DLQ에 메세지가 들어온다면 알람 보내기

만약 DLQ에 메세지가 들어온다면 처리되지 못한 메세지가 있다는 뜻이니, 메세지가 전달되지 못한 이유를 빨리 해결하기 위해 CloudWatch를 통해 알람 설정을 해보겠습니다.

CloudWatch란 ?

Amazon CloudWatch는 AWS 리소스와 AWS에서 실시간으로 실행 중인 애플리케이션을 모니터링 하는 서비스 입니다. 지표를 감시해 알림을 보내거나 임계값을 위반한 경우 모니터링 중인 리소스를 자동으로 변경하는 경보를 생성할 수 있습니다. 예를 들어 경보는 인스턴스 중지, auto scaling 및 Amazon SNS 작업 시작, 종료 등으로 구성할 수 있습니다.

CloudWatch 적용하기

AWS의 CloudWatch로 들어가 왼쪽 메뉴바에 있는 경보 → 모든 경보로 들어가 줍니다.

오른쪽 상단에 있는 경보 생성 버튼을 눌러줍니다.

맨 처음 지표 및 조건 지정에서 지표를 선택 버튼을 눌러줍니다.

SQS → 대기열 지표를 눌러줍니다.

만들어둔 DLQ가 여러개 뜨는데 지표 이름이 NumberOfMessagesReceived인 DLQ를 체크해준 뒤 지표 선택 버튼을 눌러줍니다.

통계 → 합계로 선택해줍니다. 그래야 DLQ에 메세지가 한개라도 들어오면 경보를 받을 수 있습니다.

조건에서는 보다 크거나 같음 선택 → … 보다에서 1을 입력해줍니다. 이 또한 마찬지로 메세지가 1개라도 들어오면 경보 울립니다. 다음 버튼을 눌러줍니다.

알림에서는 새 주제 생성을 체크해주고 해당 주제의 이름을 정해줍니다. 알림을 수신할 이메일 엔드포인트는 알림을 받을 이메일의 주소를 적어주면 됩니다.

이름 및 설명 추가 부분에서 경보 이름을 지정해 주고 해당 경보에 대한 설명을 작성해 준 뒤 다음 버튼을 누르고 그 다음 생성 버튼을 눌러줍니다.

이후 DLQ에 메세지가 들어오게 되면 아래와 같은 알람이 들어옵니다.

끝!

profile
⋆。゚★⋆⁺₊⋆ ゚☾ ゚。⋆ ☁︎。₊⋆

0개의 댓글