이번 주제는 Amazon의 SQS(Simple Queue Service) 튜토리얼 입니다.
전체 소스는 github 에 있으니 참고 바랍니다.
여기서 대기열이란 말이 많이 나오는데요. 큐 라고 생각하면 됩니다.
Amazon SQS는 마이크로 서비스, 분산 시스템 및 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있도록 지원하는 완전관리형 메시지 대기열 서비스 입니다.
SQS를 사용하면 메시지를 손실하거나 다른 서비스를 가동할 필요 없이 소프트웨어 구성 요소 간에 어떤 볼륨의 메시지든 전송, 저장 및 수신할 수 있습니다.
AWS 콘솔, CLI 또는 SDK를 사용하여 SQS를 시작할 수 있습니다.
우리는 개발자니 SDK를 사용하여 SQS를 구성하도록 하겠습니다.
대기열 유형에는 두 가지 유형이 존재합니다.
표준대기열
순서보장이 되지 않는 일반적인 대기열 입니다.
순서보장은 되지 않지만 처리량은 빠릅니다?..
메시지가 가끔 2개 이상의 메시지가 중복이 될 수 있습니다.
거의 무제한의 API 호출 수를 지원합니다.
FIFO 대기열 (순서보장이 되는 대기열 입니다.)
순서보장이 되는 대기열 입니다. (FIFO 선입선출)
메시지는 정확히 한번 처리가 됩니다. 중복이란 없습니다.
배치 처리 시 API 호출 수는 초당 3,000개의 트랜잭션을 지원합니다.
배치 처리가 아닌 경우 초당 최대 300개의 API 호출을 지원합니다.
docker에 localstack을 사용하여 개발환경에서도 AWS를 사용할 수 있게 설정합니다.
version: '3.3'
services:
localstack:
image: localstack/localstack:0.11.3
privileged: true
ports:
- 28080:28080
- 4576:4576
- 4575:4575
#- 4567:4567 # apigateway
#- 4568:4568 # kinesis
#- 4569:4569 # dynamodb
#- 4570:4570 # dynamodbstreams
#- 4571:4571 # elasticache
#- 4572:4572 # s3
#- 4573:4573 # firehose
#- 4574:4574 # lambda
#- 4597:4597 # ec2
environment:
- DATA_DIR=/tmp/localstack/data
- DEBUG=1
- DEFAULT_REGION=ap-southeast-2
- DOCKER_HOST=unix:///var/run/docker.sock
- LAMBDA_EXECUTOR=docker-reuse
- PORT_WEB_UI=28080
- SERVICES=sqs,sns
- HOSTNAME=localstack
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- localstack:/tmp/localstack/data
volumes:
localstack:
docker 실행
docker-compose up -d
docker 종료
docker-compose down
dependencies {
...
implementation platform('software.amazon.awssdk:bom:2.15.20')
implementation 'software.amazon.awssdk:sqs'
...
}
@Bean
public SqsAsyncClient awsSqsAsyncClient() {
return SqsAsyncClient.builder()
.endpointOverride(URI.create("http://localhost:4576"))
.region(Region.AP_NORTHEAST_2)
.build();
}
public String createQueue(String qName) {
System.out.println("Create Queue");
try {
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder().queueName(qName).build();
sqsAsyncClient.createQueue(createQueueRequest);
System.out.println("return queue Url");
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);
return getQueueUrlResponse.get().queueUrl();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("Create Queue");
}
}
public List<String> listQueue(String qName) {
System.out.println("List Queues");
try {
ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(qName).build();
CompletableFuture<ListQueuesResponse> listQueuesResponse = sqsAsyncClient.listQueues(listQueuesRequest);
listQueuesResponse.get().queueUrls().forEach(System.out::println);
return listQueuesResponse.get().queueUrls();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("List Queues");
}
}
public String sendMessage(String qName, String message) {
System.out.println("Send Message");
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);
SendMessageRequest sendMessageRequest = SendMessageRequest
.builder()
.queueUrl(getQueueUrlResponse.get().queueUrl())
.messageBody(message)
.delaySeconds(5)
.build();
return sqsAsyncClient.sendMessage(sendMessageRequest).get().messageId();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("");
}
}
public List<ReceiveMessage> receiveMessages(String qName) {
System.out.println("Receive messages");
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);
String queueUrl = getQueueUrlResponse.get().queueUrl();
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
// .visibilityTimeout(30)
// .waitTimeSeconds(10)
.build();
List<ReceiveMessage> receiveMessages = sqsAsyncClient.receiveMessage(receiveMessageRequest).get().messages()
.stream()
.map(message ->
ReceiveMessage.builder().body(message.body())
.messageId(message.messageId())
.receiptHandle(message.receiptHandle()).build())
.collect(Collectors.toList());
receiveMessages.forEach(
receiveMessage -> {
System.out.println("Delete Messages : " + receiveMessage.getMessageId());
sqsAsyncClient.deleteMessage(
DeleteMessageRequest.builder()
.receiptHandle(receiveMessage.getReceiptHandle())
.queueUrl(queueUrl)
.build());
}
);
return receiveMessages;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("Receive messages");
}
}