[AWS][Tutorial] SQS

jijang·2021년 3월 2일
1

AWS

목록 보기
1/1
post-thumbnail

Amazon SQS Tutorial

이번 주제는 Amazon의 SQS(Simple Queue Service) 튜토리얼 입니다.
전체 소스는 github 에 있으니 참고 바랍니다.

여기서 대기열이란 말이 많이 나오는데요. 큐 라고 생각하면 됩니다.

개요

Amazon SQS는 마이크로 서비스, 분산 시스템 및 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있도록 지원하는 완전관리형 메시지 대기열 서비스 입니다.

SQS를 사용하면 메시지를 손실하거나 다른 서비스를 가동할 필요 없이 소프트웨어 구성 요소 간에 어떤 볼륨의 메시지든 전송, 저장 및 수신할 수 있습니다.

AWS 콘솔, CLI 또는 SDK를 사용하여 SQS를 시작할 수 있습니다.
우리는 개발자니 SDK를 사용하여 SQS를 구성하도록 하겠습니다.

SQS 구성

대기열 유형

대기열 유형에는 두 가지 유형이 존재합니다.

  • 표준대기열

    순서보장이 되지 않는 일반적인 대기열 입니다.
    순서보장은 되지 않지만 처리량은 빠릅니다?..
    메시지가 가끔 2개 이상의 메시지가 중복이 될 수 있습니다.
    거의 무제한의 API 호출 수를 지원합니다.

  • FIFO 대기열 (순서보장이 되는 대기열 입니다.)

    순서보장이 되는 대기열 입니다. (FIFO 선입선출)
    메시지는 정확히 한번 처리가 됩니다. 중복이란 없습니다.
    배치 처리 시 API 호출 수는 초당 3,000개의 트랜잭션을 지원합니다.
    배치 처리가 아닌 경우 초당 최대 300개의 API 호출을 지원합니다.

사전 인프라 설정

docker에 localstack을 사용하여 개발환경에서도 AWS를 사용할 수 있게 설정합니다.

  • docker-compose.yml
version: '3.3'

services:
  localstack:
    image: localstack/localstack:0.11.3
    privileged: true
    ports:
      - 28080:28080
      - 4576:4576
      - 4575:4575
      #- 4567:4567   # apigateway
      #- 4568:4568  # kinesis
      #- 4569:4569   # dynamodb
      #- 4570:4570  # dynamodbstreams
      #- 4571:4571  # elasticache
      #- 4572:4572   # s3
      #- 4573:4573  # firehose
      #- 4574:4574   # lambda
      #- 4597:4597   # ec2
    environment:
      - DATA_DIR=/tmp/localstack/data
      - DEBUG=1
      - DEFAULT_REGION=ap-southeast-2
      - DOCKER_HOST=unix:///var/run/docker.sock
      - LAMBDA_EXECUTOR=docker-reuse
      - PORT_WEB_UI=28080
      - SERVICES=sqs,sns
      - HOSTNAME=localstack
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - localstack:/tmp/localstack/data

volumes:
  localstack:

docker 실행

docker-compose up -d

docker 종료

docker-compose down

프로젝트 의존성 추가 (gradle)

dependencies {
  ...
  implementation platform('software.amazon.awssdk:bom:2.15.20')
  implementation 'software.amazon.awssdk:sqs'
  ...
}

SQS 대기열 처리

SQS Client Config

@Bean
  public SqsAsyncClient awsSqsAsyncClient() {
    return SqsAsyncClient.builder()
        .endpointOverride(URI.create("http://localhost:4576"))
        .region(Region.AP_NORTHEAST_2)
        .build();
  }

대기열 생성

public String createQueue(String qName) {
    System.out.println("Create Queue");

    try {
      CreateQueueRequest createQueueRequest = CreateQueueRequest.builder().queueName(qName).build();

      sqsAsyncClient.createQueue(createQueueRequest);

      System.out.println("return queue Url");

      GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
      CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);

      return getQueueUrlResponse.get().queueUrl();

    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      throw new RuntimeException("Create Queue");
    }
  }

대기열 조회

public List<String> listQueue(String qName) {

    System.out.println("List Queues");

    try {
      ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(qName).build();
      CompletableFuture<ListQueuesResponse> listQueuesResponse = sqsAsyncClient.listQueues(listQueuesRequest);

      listQueuesResponse.get().queueUrls().forEach(System.out::println);

      return listQueuesResponse.get().queueUrls();

    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      throw new RuntimeException("List Queues");
    }
  }

SQS 메시지 처리

메시지 보내기

public String sendMessage(String qName, String message) {

    System.out.println("Send Message");

    try {
      GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
      CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);

      SendMessageRequest sendMessageRequest = SendMessageRequest
          .builder()
          .queueUrl(getQueueUrlResponse.get().queueUrl())
          .messageBody(message)
          .delaySeconds(5)
          .build();

      return sqsAsyncClient.sendMessage(sendMessageRequest).get().messageId();

    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      throw new RuntimeException("");
    }
  }

메시지 수신 후 삭제

public List<ReceiveMessage> receiveMessages(String qName) {

    System.out.println("Receive messages");

    try {
      GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(qName).build();
      CompletableFuture<GetQueueUrlResponse> getQueueUrlResponse = sqsAsyncClient.getQueueUrl(getQueueUrlRequest);

      String queueUrl = getQueueUrlResponse.get().queueUrl();

      ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
          .queueUrl(queueUrl)
          .maxNumberOfMessages(10)
//          .visibilityTimeout(30)
//          .waitTimeSeconds(10)
          .build();

      List<ReceiveMessage> receiveMessages = sqsAsyncClient.receiveMessage(receiveMessageRequest).get().messages()
          .stream()
          .map(message ->
              ReceiveMessage.builder().body(message.body())
                  .messageId(message.messageId())
                  .receiptHandle(message.receiptHandle()).build())
          .collect(Collectors.toList());

      receiveMessages.forEach(
          receiveMessage -> {
            System.out.println("Delete Messages : " + receiveMessage.getMessageId());
              sqsAsyncClient.deleteMessage(
                  DeleteMessageRequest.builder()
                      .receiptHandle(receiveMessage.getReceiptHandle())
                      .queueUrl(queueUrl)
                      .build());
          }
      );

      return receiveMessages;

    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      throw new RuntimeException("Receive messages");
    }
  }

참고
AmazonSQS
AmazonSQS SDK

0개의 댓글