[Spring] Spring Batch + Amazon SQS 적용해보기

김강욱·2024년 5월 15일
0

Project-Evertrip

목록 보기
12/19
post-thumbnail

이번 포스팅에서는 Amazon SQS에 저장된 메시지들을 Spring Batch를 이용하여 배치 처리 작업을 해보는 시간을 가져보도록 하겠습니다.

📝 해당 기술을 왜 사용하는지?

먼저 Evertrip 프로젝트에 대해서 왜 Amazon SQSSpring Batch를 사용했는지에 대해 말씀드리겠습니다.

해당 프로젝트에서는 게시글을 조회하는 사용자의 이벤트들을 기록하여 게시글 작성자에게 통계를 보여주는 기능을 제공하고 있습니다.

예를 들어, 특정 게시글을 조회하는 사용자들이 해당 게시글에 머문 시간이나 내린 스크롤 깊이, 댓글 작성 등에 대한 이벤트들을 DB에 저장하고 게시글 작성자가 통계 페이지를 조회할 때 해당 이벤트들의 평균값을 조회할 수 있도록 설계를 해야했습니다.

이러한 이벤트들은 빈번하게 발생하기 때문에 이벤트 발생마다 서버에 직접적으로 전달한다면 부하가 크게 발생하게 됩니다.

또한 해당 이벤트들은 실시간성이 중요하지 않습니다. 즉, 즉각적으로 바로 DB에 반영하지 않더라도 서비스 이용에 크게 방해가 되지 않기 때문에 일정 주기에 따라 한꺼번에 DB에 반영해주는 편이 성능적으로 훨씬 이점을 많이 줄 것이라 판단하였습니다.

게시글 조회에서 발생된 이벤트들을 저장하고 있는 공간이 따로 있고 서버에서는 일정 주기에 따라 해당 저장 공간에서 기록된 이벤트들을 꺼내와서 한꺼번에 배치 작업을 하는 식으로 설계를 해야겠다 판단하였고 그에 적합한 기술들이 바로 Amazon SQSSpring Batch였습니다. 일정 주기에 따른 작업을 처리하기 위해 Spring Scheduler도 사용하였습니다.

해당 기술 관련 내용은 아래를 참고하시면 좋을 것 같습니다.

[Spring] Spring Batch란?
[AWS] Amazon SQS란?
[Spring] Spring Scheduler 적용해보기



📝 아키텍처 설계

다음으로 아키텍처 설계에 대해서 살펴보도록 하겠습니다. 현재 사용자 이벤트 관련 처리에 대한 흐름은 아래와 같습니다.

크게 세 가지 흐름으로 볼 수 있습니다

1. 사용자의 이벤트 전달(Produce)

특정 게시글을 조회하는 사용자들은 게시글 조회 페이지를 나갈 때 각종 이벤트를 SQS에 전달하게 됩니다.

예를 들어, 다음과 같은 형식으로 SQS에 JSON 형태로 메시지를 전송하게 됩니다.

{
   "postId" : 1,
   "eventType" : "HISTORY",
   "memberId" : 1,
   "eventContent" : "MEMBER IS VISETED",
   "createdAt" : "2024-05-09 13:13:00"
}

2. Spring Scheduling을 통해 SQS에서 메시지 수신

백앤드 서버에서는 Spring Scheduling 설정을 통해 일정 주기를 설정하여 SQS에 메시지 전송 요청을 보내게 됩니다.

3. 수신한 메시지들을 Spring Batch를 사용하여 배치 처리

SQS에서 수신한 메시지들을 Spring Batch를 사용하여 Chunk 단위로 데이터를 처리하여 DB에 저장해줍니다.

이런 흐름으로 사용자의 이벤트들이 주기적으로 DB에 저장하며 대용량 데이터 처리를 하고 있습니다.

그리고 위 흐름도의 녹색으로 색칠된 과정에 대해서도 설명해보도록 하겠습니다.

Lambda와 DLQ(Dead Letter Queue) 사용

SQS에서 메시지 처리를 실패할 시 해당 메시지는 DLQ로 이동하게 됩니다.

해당 사진의 최대 수신 수(MaximumReceives)로 설정된 최대 재시도 횟수를 초과할 시 해당 메시지가 DLQ로 이동하도록 설정이 되어있는 상태입니다.

DLQ에 쌓여진 실패 메시지들은 AWS Lambda를 통해 다시 SQS로 전달할 수 있습니다.

AWS Lambda에 대한 설정은 아래 글에서 확인하실 수 있습니다.

[AWS] AWS Lambda를 사용하여 DLQ에서 SQS로 메시지 재전송하기



📝 실제 코드 적용해보기

이제 작성한 코드를 살펴보도록 하겠습니다.

Build.gradle 설정

dependencies {
    // 스프링 배치 의존성 설정
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	// 메시지 큐 의존성 추가, Spring 3.0 버전 이상
	implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
	implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'
}

AWS SQS 설정 파일

@Configuration
@RequiredArgsConstructor
public class AwsSQSConfig {

    @Value("${cloud.aws.credentials.accessKey}")
    private String AWS_ACCESS_KEY;

    @Value("${cloud.aws.credentials.secretKey}")
    private String AWS_SECRET_KEY;

    @Value("${cloud.aws.region.static}")
    private String AWS_REGION;

    private ObjectMapper objectMapper;



    // 클라이언트 설정: region과 자격증명
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .credentialsProvider(() -> new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return AWS_ACCESS_KEY;
                    }

                    @Override
                    public String secretAccessKey() {
                        return AWS_SECRET_KEY;
                    }
                })
                .region(Region.of(AWS_REGION))
                .build();
    }


    // 메시지 수신을 받기 위한 SqsTemplate 빈 생성
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient())
                .messageConverter(sqsMessagingMessageConverter(objectMapper)).build();

    }

    @Bean
    public SqsMessagingMessageConverter sqsMessagingMessageConverter(ObjectMapper objectMapper) {
        SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
        TextPlainJsonMessageConverter payloadConverter = new TextPlainJsonMessageConverter(objectMapper);
        converter.setPayloadMessageConverter(payloadConverter);
        return converter;
    }

}

해당 AwsSQSConfig 클래스에서는 Amazon SQS와의 통신을 위한 클라이언트 및 SqsTemplate을 설정하고 있습니다.

ACCESS_KEYSECRET_KEY, REGION 정보를 바탕으로 SQS와의 통신을 담당하는 SqsAsyncClient를 스프링 빈으로 등록해주고 있습니다.

실제 Amazon SQS와 통신하여 메시지를 수신을 담당하는 SqsTemplateSqsAsyncClientSqsMessagingMessageConverter 스프링 빈 객체를 의존성 주입하여 스프링 빈으로 등록해줍니다.

SqsMessagingMessageConverterSQS에서 읽어온 JSON 형식의 데이터를 자바 객체로 역직렬화 해주는 역할을 담당합니다. 실제 SqsTemplate에서 SQS 메시지를 처리해주는 messageConverter를 설정해줄 수 있는데, MessagingMessageConverter 인터페이스를 상속받은 메시지 컨버터만 등록해줄 수 있기 때문에 자식 클래스인 SqsMessagingMessageConverter를 사용해야합니다.


메시지 컨버터 설정

@Slf4j
public class TextPlainJsonMessageConverter extends AbstractMessageConverter {

    private ObjectMapper objectMapper;

    public TextPlainJsonMessageConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return true;
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof String) {
            String payload = (String) message.getPayload();
            try {
                return objectMapper.readValue(payload, PostLogDto.class);
            } catch (IOException e) {
                log.error("Unable to convert message", e);
                return null;
            }
        }
        return null;
    }
}

해당 커스텀 메시지 컨버터는 JSON 형태의 SQS 메시지의 본문(payload) 값을 꺼내 자바 객체로 역직렬화 해주는 역할을 하고 있습니다.

해당 커스텀 메시지 컨버터를 SqsMessagingMessageConverter.setPayloadMessageConverter() 설정을 통해 주입해주고 SqsTemplate.messageConverter() 설정에 해당 SqsMessagingMessageConverter를 등록해주면 SqsTemplate을 통하여 SQS에 메시지를 읽어오는 경우 JSON 형태의 데이터를 원하는 자바 객체로 변환해주는 작업을 합니다.


Batch 설정 파일 생성

@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {

    private JobRepository jobRepository;

    private PlatformTransactionManager platformTransactionManager;

    private SqsWriter sqsWriter;
    private SqsProcessor sqsProcessor;

    private SqsAsyncClient sqsAsyncClient;

    private String queueName;



    private SqsTemplate sqsTemplate;

    public BatchConfig(JobRepository jobRepository,
                       PlatformTransactionManager platformTransactionManager,
                       SqsWriter sqsWriter,
                       SqsProcessor sqsProcessor,
                       SqsAsyncClient sqsAsyncClient,
                       SqsTemplate sqsTemplate,
                       @Value("${cloud.aws.sqs.queue-name}") String queueName) {
        this.jobRepository = jobRepository;
        this.platformTransactionManager = platformTransactionManager;
        this.sqsWriter = sqsWriter;
        this.sqsProcessor = sqsProcessor;
        this.sqsTemplate = sqsTemplate;
        this.sqsAsyncClient = sqsAsyncClient;
        this.queueName = queueName;
    }


    @Bean
    public Job sqsBatchJob(Step step) {
        return new JobBuilder("sqsBatchJob", this.jobRepository)
                .start(step)
                .build();
    }

    @Bean
    public Step sqsStep() {
        return new StepBuilder("sqsBatchStep", this.jobRepository)
                .<PostLogDto, PostLog>chunk(10, platformTransactionManager)
                .reader(sqsReader())
                .processor(sqsProcessor)
                .writer(sqsWriter)
                .build();
    }



    
    @Bean
    @StepScope
    public ItemReader<PostLogDto> sqsReader() {
        log.info("SQS 메시지 읽기 작업 호출");
        List<PostLogDto> inputDataList = new ArrayList<>();
        boolean continuePolling = true;

        while (continuePolling) {
            try {
                // SQS 폴링 작업
                Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                        .maxNumberOfMessages(10)
                        .pollTimeout(Duration.ofSeconds(10)), PostLogDto.class);

                if (messages == null || messages.isEmpty()) {
                    continuePolling = false;  // 메시지가 없으면 반복 종료
                } else {
                        List<PostLogDto> extractedData = messages.stream()
                                .map(Message::getPayload)
                                .collect(Collectors.toList());
                        inputDataList.addAll(extractedData);

                        log.info("SQS 읽은 데이터 목록 :");
                        for (PostLogDto inputData : inputDataList) {
                            log.info("{}", inputData);
                        }
                    }
                } catch (Exception e) {
                continuePolling = false;
            }
        }


        return new ListItemReader<>(inputDataList);
    }

}

해당 BatchConfig 클래스에서는 배치 프로세스를 진행해야할 JobStep에 대해서 정의하고 있습니다.

Job에서 하나의 Step에 대한 배치 처리를 정의하고 있고 Step에서는 청크 사이즈를 10개로 지정하고 있으며 ItemReader, ItemProcessor, ItemWriter를 등록해주고 있습니다.

ItemReaderSQS에서 메시지를 수신 요청하여 저장되어 있는 모든 메시지를 읽어오는 역할을 하고 있습니다.

의존성 주입을 받은 SqsTemplatereceiveMany() 메서드를 호출하여 SQS에 메시지 수신 요청을 하게 됩니다. 한 번에 수신할 최대 메시지 갯수는 10개로 설정하고 폴링 시간을 10초로 설정하였습니다.

while 반복문을 통하여 SQS의 모든 메시지를 읽어오고 난 후 ItemProcessor에게 해당 메시지(PostLogDto 객체)를 전달하게 됩니다.


ItemProcessor 설정

@Component
@Slf4j
public class SqsProcessor implements ItemProcessor<PostLogDto, PostLog> {

    @Override
    public PostLog process(PostLogDto dto) throws Exception {
        PostLog postLogEntity = new PostLog(dto);
        return postLogEntity;
    }
}

해당 SqsProcessor 클래스는 ItemReader에서 전달받은 PostLogDto 객체들을 엔티티 객체인 PostLog로 변환해주는 작업을 담당합니다.

변환이 완료된 (새로 생성된) 엔티티 목록을 다음 단계인 ItemWriter에게 전달하게 됩니다.


ItemWriter 설정

@Component
@RequiredArgsConstructor
@Slf4j
public class SqsWriter implements ItemWriter<PostLog> {

    private final PostLogRepository postLogRepository;

    @Override
    public void write(Chunk<? extends PostLog> chunk) throws Exception {
        List<? extends PostLog> items = chunk.getItems();
        postLogRepository.saveAll(items);
        log.info("SqsWriter 호출 :" + items.size());
    }
}

해당 SqsWriter 클래스에서는 ItemProcessor로부터 받은 PostLog 엔티티 객체를 실제 DB에 저장하는 역할을 맡고 있습니다.

청크 단위로 Item들을 받아 Repository를 통해 DB에 INSERT하게 됩니다.


BatchScheduler 설정

@Configuration
@EnableScheduling
@Slf4j
public class BatchScheduler {


    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job sqsBatchJob;

    @Scheduled(cron = "0 */15 * * * ?")  // 매 15분마다 실행
    public void runBatchJob() throws JobExecutionException {
        log.info("SQS 스케줄링 동작");
        jobLauncher.run(sqsBatchJob, new JobParametersBuilder()
                .addLong("uniqueness", System.nanoTime()).toJobParameters());
    }
}

해당 BatchScheduler 클래스에서는 일정 주기를 설정하여 스프링 빈으로 등록해두었던 Job을 실행시켜주는 역할을 하고 있습니다.

크론 방식을 사용하여 매 15분마다 SQS에 저장된 메시지를 읽어와 DB에 저장해주는 Job을 실행시켜주게 됩니다.



📝 참고사항

Amazon SQS에서 메시지를 수신할 수 있는 다른 방법을 공식 문서에서 확인할 수 있었습니다. 아래와 같이 @SqsListener 어노테이션을 사용하여 간단하게 메시지를 수신하는 방법이 빈번하게 활용된다고 하더라구요.

@Component
public class SqsTransferListener {

    @SqsListener(value = "${cloud.aws.sqs.queue-name}",factory = "defaultSqsListenerContainerFactory")
    public void messageListener(PostLogDto message) {
        System.out.println("Listener : " + message);
    }

}

위의 코드에서 @SqsListener 어노테이션은 Amazon SQS 대기열 큐에 메시지가 도착하는 대로 자동으로 메시지를 수신합니다.

Evertrip 프로젝트에서는 Amazon SQS에서 메시지 수신 요청을 원하는 시점(일정 주기에 따른)에 보내야하기 때문에 위의 @SqsListener 어노테이션을 사용한 메시지 수신 방법은 적합하지 않았습니다.

그래도 @SqsListener 어노테이션을 활용하는 방법에 대해 공부하기 위해 공식 사이트를 보면서 찾아보았습니다.

@SqsListener 어노테이션 설정에서 value 값과 factory 값을 설정할 수 있습니다.

value 설정에는 메시지 수신 요청을 할 queue name을 입력하고 factory에는 메시지 리스너 컨테이너 팩토리를 입력하게 됩니다.

메시지 리스너 컨테이너 팩토리를 설정하는 코드를 살펴보겠습니다.

@Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory
                .builder()
                .configure(options -> options
                        .maxMessagesPerPoll(10)
  						.listenerMode(ListenerMode.BATCH)
                        .acknowledgementMode(AcknowledgementMode.ALWAYS)
                        .acknowledgementInterval(Duration.ofSeconds(3))
                        .acknowledgementThreshold(5)
                        .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
                        .messageConverter(sqsMessagingMessageConverter(objectMapper))
                )
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

이번 포스팅에서는 배치에 관련된 옵션에 대해서 설명드리도록 하겠습니다. 다른 옵션들이 궁금하신 분들은 아래 링크를 확인해주세요!

[AWS] Amazon SQS 적용해보기 - 스프링 SQS 설정 및 AWS SQS로 메시지 보내기

리스너 컨테이너 팩토리에 SqsContainerOptions 옵션에서 ListenerMode.BATCH로 설정하면 해당 팩토리를 사용하여 @SqsListener 메서드에 대한 단일 메시지 및 배치 컨테이너 모두 생성할 수 있다고 합니다.

ListenerMode.BATCH 모드로 설정하면 리스너는 여러 메시지를 한 번에 수신하게 됩니다. 즉, SQS에서 메시지를 배치로 가져와 한 번의 호출로 여러 메시지를 처리하게 됩니다.

AcknowledgementMode.ALWAYS 모드 설정은 리스너가 메시지를 성공적으로 수신하면 항상 자동으로 메시지를 SQS에게 확인하도록 지시합니다. 즉, 메시지 처리 후 자동으로 SQS에서 해당 메시지가 삭제되도록 구성됩니다.

listenerMode(BATCH)acknowledgementMode(ALWAYS)가 함께 사용될 때, 리스너는 메시지를 배치로 수신하고, 각 배치의 처리가 성공적으로 완료되면, 해당 배치의 모든 메시지에 대해 자동으로 SQS에 확인 응답을 보냅니다.

해당 설정은 대량의 메시지를 효과적으로 처리하면서도, 메시지 처리에 대한 신뢰성을 보장하기 위해 자동으로 확인 처리를 수행한다고 합니다.

Spring Cloud AWS 공식 사이트에 기재된 BATCH 처리 관련 내용입니다.

배치 모드를 활성화하면 프레임 워크는 메시지 폴링의 전체 결과를 리스너에게 제공하게 됩니다.

SQS의 제한으로 인해 실제로 한 번의 폴링에 10개까지만 메시지를 수신할 수 있습니다. 따라서 maxMessagesPerPoll이 10을 초과하는 경우, 여러 폴링 작업의 결과가 결합되어 리스너에 전달된다고 하네요.

또한 SqsListener를 사용하여 일괄 처리를 활성화하려면 단일 List 또는 List<Message> 형태의 파리미터가 리스너 메서드에 제공되어야 한다고 적혀있습니다.

  @Component
public class SqsTransferListener {

    @SqsListener(value = "${cloud.aws.sqs.queue-name}",factory = "defaultSqsListenerContainerFactory")
    public void messageListener(List<PostLogDto> message) {
        System.out.println("Listener : " + message);
    }

}

이런식으로 파라미터에 List 형태로 넣어주고 factory 설정에서 옵션에 ListenerMode.BATCH 모드를 추가해주시면 메시지를 한꺼번에 읽어온다고 하네요.


이번 포스팅 내용은 여러 가지 기술들을 사용해서 내용이 많이 복잡하고 길었습니다. 읽으시느라 고생 많으셨습니다 ㅎㅎ

참고 자료
Spring Cloud AWS 공식 사이트

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보