[AWS] Amazon SQS 적용해보기 - AWS SQS 메시지 받아오기

김강욱·2024년 5월 9일
0

Project-Evertrip

목록 보기
11/19
post-thumbnail

이번 포스팅에서는 SQS 대기열에 저장된 메시지를 받아오는 작업을 정리해보겠습니다.

Spring Cloud Aws 공식문서를 보면서 정리하였으니 자세한 내용은 아래 공식 문서에서 확인하세요.

Spring Cloud AWS 공식 문서

1. AwsSQSConfig 설정

@Configuration
@RequiredArgsConstructor
public class AwsSQSConfig {

    @Value("${cloud.aws.credentials.accessKey}")
    private String AWS_ACCESS_KEY;

    @Value("${cloud.aws.credentials.secretKey}")
    private String AWS_SECRET_KEY;

    @Value("${cloud.aws.region.static}")
    private String AWS_REGION;


    // 클라이언트 설정: region과 자격증명
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .credentialsProvider(() -> new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return AWS_ACCESS_KEY;
                    }

                    @Override
                    public String secretAccessKey() {
                        return AWS_SECRET_KEY;
                    }
                })
                .region(Region.of(AWS_REGION))
                .build();
    }

    // Listener Factory 설정 (Listener 쪽)
    @Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory
                .builder()
                .configure(options -> options
                        .acknowledgementMode(AcknowledgementMode.ALWAYS)
                        .acknowledgementInterval(Duration.ofSeconds(3))
                        .acknowledgementThreshold(5)
                        .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
                )
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

    // 메시지 발송을 위한 SQS 템플릿 설정 (Sender 쪽)
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }

}

해당 Config 파일은 이전 포스팅에서 다룬 그대로 적용해주었습니다.

이전 포스팅

SqsMessageListenerContainerFactory<Object>의 제네릭 타입에 변환하고자 하는 객체 타입을 집어넣으려 했으나 컴파일 에러가 발생했습니다.

그래서 공식 문서를 뒤져보니 반환 타입을 지정하는 내용은 존재하지 않았습니다. 대신 messageConverter를 지정해주는 메서드는 존재했습니다.

이런 식으로 SqsMessagingMessageConverter 객체에 MappingJackson2MessageConverter 객체를 세팅해서 내부적으로 JSON 타입의 메시지를 자바 객체로 역직렬화하는 구조인 거 같았습니다.

이를 적용하기 위해 설정 파일을 다음과 같이 수정해주었습니다.

@Configuration
public class AwsSQSConfig {

    @Value("${cloud.aws.credentials.accessKey}")
    private String AWS_ACCESS_KEY;

    @Value("${cloud.aws.credentials.secretKey}")
    private String AWS_SECRET_KEY;

    @Value("${cloud.aws.region.static}")
    private String AWS_REGION;




    // 클라이언트 설정: region과 자격증명
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .credentialsProvider(() -> new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return AWS_ACCESS_KEY;
                    }

                    @Override
                    public String secretAccessKey() {
                        return AWS_SECRET_KEY;
                    }
                })
                .region(Region.of(AWS_REGION))
                .build();
    }

    // Listener Factory 설정 (Listener 쪽)
    @Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory
                .builder()
                .configure(options -> options
                        .acknowledgementMode(AcknowledgementMode.ALWAYS)
                        .acknowledgementInterval(Duration.ofSeconds(3))
                        .acknowledgementThreshold(5)
                        .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
                        .messageConverter(messageConverter())
                )
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

    // 메시지 발송을 위한 SQS 템플릿 설정 (Sender 쪽)
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }

   @Bean
	public SqsMessagingMessageConverter messageConverter() {
    SqsMessagingMessageConverter messageConverter = new SqsMessagingMessageConverter();

    // JavaType 설정이 아닌 클래스 타입으로 직접 설정
    messageConverter.setPayloadTypeHeader(PostLogDto.class);

    MappingJackson2MessageConverter payloadConverter = new MappingJackson2MessageConverter();
    payloadConverter.setPrettyPrint(true);
    messageConverter.setPayloadMessageConverter(payloadConverter);

    return messageConverter;
}

2. @SqsListener를 사용하여 메시지 받기

공식 문서를 살펴보면

SQS 메시지를 받아올 수 있는 간단한 방법이 @SqsListener를 사용하는 것이고 해당 메서드를 구현한 클래스에서는 스프링 빈으로 등록해주어야 합니다.

그러면 스프링 프레임워크는 MessageListenerContainer를 생성하고 메시지가 수신될 때 해당 @SqsListener가 붙은 메서드를 호출하도록 MessagingMessageListenerAdapter를 설정한다고 합니다.

또한 @SqsListener 어노테이션의 설정 값으로 QueueNameMessageListenerContainer를 설정할 수 있으며 SpEL(#{…​}) 또는 속성 자리 표시자(${…​}) 문법을 사용할 수 있다고 합니다.


SqsTransferListener 코드

@Component
public class SqsTransferListener {

    @SqsListener(value = "${cloud.aws.sqs.queue-name}",factory = "defaultSqsListenerContainerFactory")
    public void messageListener(PostLogDto message) {
        System.out.println("Listener : " + message);
    }
    
}

3. 테스트 해보기

이제 설정값들을 토대로 테스트를 해보도록 하겠습니다. 우선 제가 메시지를 받을 PostLogDto 클래스 형태에 맞는 메시지를 SQS에 저장해보겠습니다.

@Getter
@NoArgsConstructor
@ToString
public class PostLogDto {

    private Long postId;

    private ConstantPool.EventType eventType;

    private Long memberId;

    private String token;

    private String eventContent;

    private String createdAt;

}

AWS 홈페이지에서 SQS 메시지를 전송해주었고 대기하였습니다.

하지만 Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.evertrip.post.dto.sqs.PostLogDto] for GenericMessage ... 에러가 발생하였습니다.

String 타입을 PostLogDto 타입으로 변경할 수 없다고 뜨네요...

원인을 분석해보기 위해 디버깅 해보았습니다.

convert 과정에서 mimeType 체크

에러가 발생한 부분을 유심히 살펴보니 요청 받은 MimeType과 current MimeType이 맞지 않는 것을 확인할 수 있었습니다.

확실한 건 아니지만 추측해보면 SQS 메세지를 원하는 타입으로 변경하기 위해 지정해주었던 MappingJackson2MessageConverter는 JSON 형태의 메시지를 Java 객체로 변환하기 위한 Converter입니다.

SQS 메시지 요청의 MimeType은 text/plain 타입이고 내부적으로 사용하고 있는 MappingJackson2MessageConverter가 처리할 수 있는 MimeType은 application/json 타입이므로 컨버터 적용을 할 수 없는 게 아닐까 의심이 되었습니다.

⚙️ Converter 수정하기

해결 방법이 떠오르지 않아 커스텀 컨버터를 생성하여 넣어주기로 결정하였습니다.

CustomMessageConverter 코드

@Slf4j
public class TextPlainJsonMessageConverter extends AbstractMessageConverter {

    private ObjectMapper objectMapper;

    public TextPlainJsonMessageConverter(ObjectMapper objectMapper) {
        super(new MimeType("text", "plain", Charset.forName("UTF-8")));
        this.objectMapper = objectMapper;
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return true;
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof String) {
            String payload = (String) message.getPayload();
            try {
                return objectMapper.readValue(payload, PostLogDto.class);
            } catch (IOException e) {
                log.error("Unable to convert message", e);
                return null;
            }
        }
        return null;
    }
}

AbstractMessageConverter를 상속받아 내부적으로 수신한 SQS 메시지의 payload를 ObjectMapper를 사용하여 PostLogDto로 변환해주는 로직으로 오버라이딩 해주었습니다.

수정된 AwsSQSConfig 파일

@Configuration
@RequiredArgsConstructor
public class AwsSQSConfig {

    @Value("${cloud.aws.credentials.accessKey}")
    private String AWS_ACCESS_KEY;

    @Value("${cloud.aws.credentials.secretKey}")
    private String AWS_SECRET_KEY;

    @Value("${cloud.aws.region.static}")
    private String AWS_REGION;

    private ObjectMapper objectMapper;



    // 클라이언트 설정: region과 자격증명
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .credentialsProvider(() -> new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return AWS_ACCESS_KEY;
                    }

                    @Override
                    public String secretAccessKey() {
                        return AWS_SECRET_KEY;
                    }
                })
                .region(Region.of(AWS_REGION))
                .build();
    }

    // Listener Factory 설정 (Listener 쪽)
    @Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory
                .builder()
                .configure(options -> options
                        .acknowledgementMode(AcknowledgementMode.ALWAYS)
                        .acknowledgementInterval(Duration.ofSeconds(3))
                        .acknowledgementThreshold(5)
                        .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
                        .messageConverter(sqsMessagingMessageConverter(objectMapper))
                )
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

    // 메시지 발송을 위한 SQS 템플릿 설정 (Sender 쪽)
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }

    @Bean
    public SqsMessagingMessageConverter sqsMessagingMessageConverter(ObjectMapper objectMapper) {
        SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
        TextPlainJsonMessageConverter payloadConverter = new TextPlainJsonMessageConverter(objectMapper);
        converter.setPayloadMessageConverter(payloadConverter);
        return converter;
    }

}

이후 똑같은 방법으로 테스트를 진행하였더니 다음과 같이 정상적으로 SQS 메시지를 원하는 타입으로 받아오는 것을 확인할 수 있었습니다 ㅎㅎ

이번 포스팅에서는 SQS 대기열의 메시지를 서버에서 수신하는 방법에 대해 알아보았습니다.

Evertrip 프로젝트에서는 해당 메시지를 즉각적으로 수신하는 것이 아니라 Spring Scheduling을 통해 정해진 시간에 Amazon SQS에 메시지 수신을 요청해야하는 방식이므로 다소 차이가 있습니다.

다음 시간에는 이제껏 포스팅한 내용들을 바탕으로 프로젝트에 맞게 커스터마이징하여 사용하는 시간을 가져보도록 하겠습니다. 감사합니다.


참고 자료
Spring Cloud AWS 공식 문서

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보