[트러블슈팅] BatchEntryIdsNotDistinctException 발생

김강욱·2024년 5월 17일
0

Project-Evertrip

목록 보기
13/19
post-thumbnail

Amazon SQS에서 메시지를 읽어올 때 다음과 같은 에러가 발생하였습니다.

Caused by: java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException: Id b4c3204a-3ccc-45a0-b76b-b6ea9bbeb43f repeated. (Service: Sqs, Status Code: 400, Request ID: c3059f6e-10ca-58e9-a068-c1ae8fc32d5f)

Caused by: software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException: Id b4c3204a-3ccc-45a0-b76b-b6ea9bbeb43f repeated. (Service: Sqs, Status Code: 400, Request ID: c3059f6e-10ca-58e9-a068-c1ae8fc32d5f)

해당 에러는 SQS에서 메시지를 읽어오는 과정에서 발생하였습니다. 해당 에러를 구글링해도 나오지 않았고 원인을 분석하기 쉽지 않았습니다.

스프링 스케줄링과 스프링 배치를 활용하여 주기적으로 SQS 대기열에서 메시지를 읽어와서 처리하는 로직을 작성하기 위해 ItemReader의 코드를 작성해주었는데 아래와 같이 작성해주었습니다.

 @Bean
    @StepScope
    public ItemReader<PostLogDto> sqsReader() {
        log.info("SQS 메시지 읽기 작업 호출");
        List<PostLogDto> inputDataList = new ArrayList<>();
        boolean continuePolling = true;

        while (continuePolling) {
            System.out.println("폴링 작업 호출 : 메시지 수신 중");
            try {
                // SQS 폴링 작업
                Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                        .visibilityTimeout(Duration.ofSeconds(300))
                        .maxNumberOfMessages(2)  // 한 번에 최대 2개의 메시지를 가져옴
                        .pollTimeout(Duration.ofSeconds(10)), PostLogDto.class);

                if (messages == null || messages.isEmpty()) {
                    continuePolling = false;  // 메시지가 없으면 반복 종료
                } else {
                        List<PostLogDto> extractedData = messages.stream()
                                .map(Message::getPayload)
                                .collect(Collectors.toList());
                        inputDataList.addAll(extractedData);

                        log.info("SQS 읽은 데이터 목록 :");
                        for (PostLogDto inputData : inputDataList) {
                            log.info("{}", inputData);
                        }
                    }
                } catch (Exception e) {
                log.error("Error receiving messages from SQS", e);
                continuePolling = false;
            }
        }


        return new ListItemReader<>(inputDataList);
    }

해당 코드를 돌려보니

SQS 메시지 읽기 작업 로그가 호출되고나서 에러가 발생하였습니다.

해당 메시지를 읽어오는 도중에 메시지 ID 중복으로 인해 수신 불가 에러가 발생하는 것을 확인할 수 있었습니다.

이를 해결하기 위해 sqsTemplate의 설정 옵션을 수정하면서 테스트를 진행해보았습니다.

@RestController
@RequiredArgsConstructor
public class SqsTestController {

    private final SqsTemplate sqsTemplate;


    @GetMapping("/test/test2")
    ResponseEntity pull2(@Value("${cloud.aws.sqs.queue-name}") String queueName) throws ExecutionException, InterruptedException {
        
        // 1. 기본 옵션 설정으로 메시지 읽어오기
        Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(queueName, PostLogDto.class);

        // 2. visibilityTimeout과 최대 메시지 수신 갯수 설정
        Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                .visibilityTimeout(Duration.ofSeconds(300))
                .maxNumberOfMessages(2), PostLogDto.class);

        // 3. pollTimeout과 최대 메시지 수신 갯수 설정
        Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                        .maxNumberOfMessages(2)
                        .pollTimeout(Duration.ofSeconds(5)), PostLogDto.class);

        List<PostLogDto> extractedData = messages.stream()
                .map(Message::getPayload)
                .collect(Collectors.toList());
        
        return ResponseEntity.ok(extractedData);
    }
}

sqsTemplate의 설정 옵션들에 대해 간단히 설명하자면

1. visibilityTimeout

메시지의 유효 가시성 시간을 설정할 수 있는 부분입니다.

예를 들어, SQS 입장에서 하나의 컨슈머가 메시지 A를 가져갔을 시, 큐에 메시지 A가 남아있기는 하지만, 다른 컨슈머가 메시지 A를 (또) 가져가지 않도록 일정시간동안(visibility timeout) 수신요청에 드러나지 않게 할 수 있습니다.

2. maxNumberOfMessages

한 번에 수신하는 메시지 최대 갯수를 설정하는 부분입니다. Amazon SQS에서 한 번에 최대로 수신할 수 있는 메시지는 10개입니다.

3. pollTimeout

메시지를 수신할 때 대기할 시간을 설정할 수 있는 부분입니다.
예를 들어 pollTimeout을 10초로 설정한다면 컨슈머는 10초 동안 폴링을 시도합니다.

참고로 일반적으로 클라이언트 설정이 대기열 기본 설정보다 우선하므로, sqsTemplate에서 설정한 값이 우선 적용됩니다.

위의 코드로 테스트를 돌린 결과 1번과 3번에서는 메시지를 정상적으로 수신하는 반면 2번의 경우에서 똑같은 에러가 발생하는 것을 확인할 수 있었습니다.

결국 visibilityTimeout 설정 때문에 메시지 가시성에 대한 문제가 발생한 것으로 추정됩니다. 클라이언트가 백앤드 서버 하나뿐인데 왜 이런 문제가 발생하는지에 대해 의문이 들었습니다.

그래서 혹시 스프링 배치를 사용하면서 SQS 메시지 요청이 멀티 쓰레드에서 동시 요청을 하는 문제인가 의심이 들었습니다.

ItemReader 단일 쓰레드화

@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;
    private final SqsWriter sqsWriter;
    private final SqsProcessor sqsProcessor;
    private final SqsAsyncClient sqsAsyncClient;
    private final SqsTemplate sqsTemplate;
    private final String queueName;

    public BatchConfig(JobRepository jobRepository,
                       PlatformTransactionManager platformTransactionManager,
                       SqsWriter sqsWriter,
                       SqsProcessor sqsProcessor,
                       SqsAsyncClient sqsAsyncClient,
                       SqsTemplate sqsTemplate,
                       @Value("${cloud.aws.sqs.queue-name}") String queueName) {
        this.jobRepository = jobRepository;
        this.platformTransactionManager = platformTransactionManager;
        this.sqsWriter = sqsWriter;
        this.sqsProcessor = sqsProcessor;
        this.sqsAsyncClient = sqsAsyncClient;
        this.sqsTemplate = sqsTemplate;
        this.queueName = queueName;
    }

    @Bean
    public Job sqsBatchJob() {
        return new JobBuilder("sqsBatchJob", this.jobRepository)
                .start(sqsReadStep())
                .next(sqsProcessWriteStep())
                .build();
    }

    @Bean
    public Step sqsReadStep() {
        return new StepBuilder("sqsReadStep", this.jobRepository)
                .tasklet(sqsReadTasklet(), this.platformTransactionManager)
                .build();
    }

    @Bean
    public Step sqsProcessWriteStep() {
        return new StepBuilder("sqsProcessWriteStep", this.jobRepository)
                .<PostLogDto, PostLog>chunk(10, this.platformTransactionManager)
                .reader(synchronizedSqsItemReader())
                .processor(sqsProcessor)
                .writer(sqsWriter)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
        asyncTaskExecutor.setConcurrencyLimit(5); // 동시에 실행할 최대 스레드 수
        return asyncTaskExecutor;
    }

    @Bean
    @StepScope
    public Tasklet sqsReadTasklet() {
        return (contribution, chunkContext) -> {
            log.info("SQS 메시지 읽기 작업 호출");
        List<PostLogDto> inputDataList = new ArrayList<>();
        boolean continuePolling = true;

        while (continuePolling) {
            System.out.println("폴링 작업 호출 : 메시지 수신 중");
            try {
                // SQS 폴링 작업
                Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                        .visibilityTimeout(Duration.ofSeconds(300))
                        .maxNumberOfMessages(2)  // 한 번에 최대 2개의 메시지를 가져옴
                        .pollTimeout(Duration.ofSeconds(10)), PostLogDto.class);

                if (messages == null || messages.isEmpty()) {
                    continuePolling = false;  // 메시지가 없으면 반복 종료
                } else {
                        List<PostLogDto> extractedData = messages.stream()
                                .map(Message::getPayload)
                                .collect(Collectors.toList());
                        inputDataList.addAll(extractedData);

                        log.info("SQS 읽은 데이터 목록 :");
                        for (PostLogDto inputData : inputDataList) {
                            log.info("{}", inputData);
                        }
                    }
                } catch (Exception e) {
                log.error("Error receiving messages from SQS", e);
                continuePolling = false;
            }
        }

            chunkContext.getStepContext().getStepExecution().getJobExecution()
                    .getExecutionContext().put("sqsMessages", inputDataList);

            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    @StepScope
    public ItemStreamReader<PostLogDto> sqsItemReader() {
        return new ItemStreamReader<PostLogDto>() {
            private ListItemReader<PostLogDto> delegate;

            @Override
            public void open(org.springframework.batch.item.ExecutionContext executionContext) throws ItemStreamException {
                List<PostLogDto> sqsMessages = (List<PostLogDto>) StepSynchronizationManager.getContext()
                        .getJobExecutionContext().get("sqsMessages");
                this.delegate = new ListItemReader<>(sqsMessages);
            }           
        };
    }

    @Bean
    public SynchronizedItemStreamReader<PostLogDto> synchronizedSqsItemReader() {
        SynchronizedItemStreamReader<PostLogDto> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
        synchronizedItemStreamReader.setDelegate(sqsItemReader());
        return synchronizedItemStreamReader;
    }
}

멀티 쓰레드 환경에서의 동시 요청이 원인일 거라 생각하여 ItemReader 만 단일 쓰레드로 처리하고 ItemProcessor, ItemWriter는 멀티 쓰레드로 구동하는 환경을 조성해보았습니다.

하지만 똑같은 에러가 발생하였고, 결국에는 sqsTemplate의 설정 옵션에서 visibilityTimeout에 대한 설정을 뺌으로써 해결할 수 있었습니다.

최종 코드

1. BatchConfig
@Configuration
//@EnableBatchProcessing
@EnableConfigurationProperties(BatchProperties.class)
@Slf4j
public class BatchConfig {

    private JobRepository jobRepository;

    private PlatformTransactionManager platformTransactionManager;

    private SqsWriter sqsWriter;
    private SqsProcessor sqsProcessor;

    private SqsAsyncClient sqsAsyncClient;

    private String queueName;



    private SqsTemplate sqsTemplate;

    public BatchConfig(JobRepository jobRepository,
                       PlatformTransactionManager platformTransactionManager,
                       SqsWriter sqsWriter,
                       SqsProcessor sqsProcessor,
                       SqsAsyncClient sqsAsyncClient,
                       SqsTemplate sqsTemplate,
                       @Value("${cloud.aws.sqs.queue-name}") String queueName) {
        this.jobRepository = jobRepository;
        this.platformTransactionManager = platformTransactionManager;
        this.sqsWriter = sqsWriter;
        this.sqsProcessor = sqsProcessor;
        this.sqsTemplate = sqsTemplate;
        this.sqsAsyncClient = sqsAsyncClient;
        this.queueName = queueName;
    }


    @Bean
    public Job sqsBatchJob(Step step) {
        return new JobBuilder("sqsBatchJob", this.jobRepository)
                .start(step)
                .build();
    }

    @Bean
    public Step sqsStep() {
        return new StepBuilder("sqsBatchStep", this.jobRepository)
                .<PostLogDto, PostLog>chunk(10, platformTransactionManager)
                .reader(sqsReader())
                .processor(sqsProcessor)
                .writer(sqsWriter)
                .build();
    }



    // 최신꺼
    @Bean
    @StepScope
    public ItemReader<PostLogDto> sqsReader() {
        log.info("SQS 메시지 읽기 작업 호출");
        List<PostLogDto> inputDataList = new ArrayList<>();
        boolean continuePolling = true;

        while (continuePolling) {
            System.out.println("폴링 작업 호출 : 메시지 수신 중");
            try {
                // SQS 폴링 작업
                Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
                        .maxNumberOfMessages(5)
                        .pollTimeout(Duration.ofSeconds(10)), PostLogDto.class);

                if (messages == null || messages.isEmpty()) {
                    continuePolling = false;  // 메시지가 없으면 반복 종료
                } else {
                        List<PostLogDto> extractedData = messages.stream()
                                .map(Message::getPayload)
                                .collect(Collectors.toList());
                        inputDataList.addAll(extractedData);

                        log.info("SQS 읽은 데이터 목록 :");
                        for (PostLogDto inputData : inputDataList) {
                            log.info("{}", inputData);
                        }
                    }
                } catch (Exception e) {
                log.error("Error receiving messages from SQS", e);
                continuePolling = false;
            }
        }


        return new ListItemReader<>(inputDataList);
    }

}

2. SqsProcessor
@Component
@Slf4j
public class SqsProcessor implements ItemProcessor<PostLogDto, PostLog> {

    @Override
    public PostLog process(PostLogDto dto) throws Exception {
        PostLog postLogEntity = new PostLog(dto);
        log.info("SqsProcessor 호출 :" + dto);
        return postLogEntity;
    }
}

3. SqsWriter
@Component
@RequiredArgsConstructor
@Slf4j
public class SqsWriter implements ItemWriter<PostLog> {

    private final PostLogRepository postLogRepository;

    @Override
    public void write(Chunk<? extends PostLog> chunk) throws Exception {
        List<? extends PostLog> items = chunk.getItems();
        postLogRepository.saveAll(items);
        log.info("SqsWriter 호출 :" + items.size());
    }
}

SQS 메시지 대기열에 메시지를 12개를 넣고 해당 코드를 실행하니 스케줄링이 동작할 때 스프링 배치 Job을 실행하고 ItemReader에서 SQS 대기열에 메시지를 요청하여 수신하는 것을 확인할 수 있었습니다.

while 문을 통해 메시지를 5개씩(최대 10개까지 설정가능) 받아오고 SQS 대기열에 메시지가 없을 시에 요청을 종료하게 됩니다. 아래와 같이 최종적으로 메시지를 다 읽어오게 됩니다.

ItemReader에서 읽어온 데이터들을 ItemProcessor에서 받아 정상적으로 엔티티로 변환하는 작업 또한 확인할 수 있습니다.

ItemWriter


총 두 번 호출되었으며 청크 사이즈를 10개로 설정해놨기 때문에 12개의 메시지를 처리하는데 10개, 2개씩 처리했음을 확인할 수 있습니다.

실제 DB에도 12개의 메시지 데이터가 알맞게 들어간 것을 확인할 수 있었습니다.

profile
TO BE DEVELOPER

0개의 댓글

관련 채용 정보