이번 포스팅에서는 Amazon SQS
에 저장된 메시지들을 Spring Batch
를 이용하여 배치 처리 작업을 해보는 시간을 가져보도록 하겠습니다.
먼저 Evertrip
프로젝트에 대해서 왜 Amazon SQS
와 Spring Batch
를 사용했는지에 대해 말씀드리겠습니다.
해당 프로젝트에서는 게시글을 조회하는 사용자의 이벤트들을 기록하여 게시글 작성자에게 통계를 보여주는 기능을 제공하고 있습니다.
예를 들어, 특정 게시글을 조회하는 사용자들이 해당 게시글에 머문 시간이나 내린 스크롤 깊이, 댓글 작성 등에 대한 이벤트들을 DB에 저장하고 게시글 작성자가 통계 페이지를 조회할 때 해당 이벤트들의 평균값을 조회할 수 있도록 설계를 해야했습니다.
이러한 이벤트들은 빈번하게 발생하기 때문에 이벤트 발생마다 서버에 직접적으로 전달한다면 부하가 크게 발생하게 됩니다.
또한 해당 이벤트들은 실시간성이 중요하지 않습니다. 즉, 즉각적으로 바로 DB에 반영하지 않더라도 서비스 이용에 크게 방해가 되지 않기 때문에 일정 주기에 따라 한꺼번에 DB에 반영해주는 편이 성능적으로 훨씬 이점을 많이 줄 것이라 판단하였습니다.
게시글 조회에서 발생된 이벤트들을 저장하고 있는 공간이 따로 있고 서버에서는 일정 주기에 따라 해당 저장 공간에서 기록된 이벤트들을 꺼내와서 한꺼번에 배치 작업을 하는 식으로 설계를 해야겠다 판단하였고 그에 적합한 기술들이 바로 Amazon SQS
와 Spring Batch
였습니다. 일정 주기에 따른 작업을 처리하기 위해 Spring Scheduler
도 사용하였습니다.
해당 기술 관련 내용은 아래를 참고하시면 좋을 것 같습니다.
[Spring] Spring Batch란?
[AWS] Amazon SQS란?
[Spring] Spring Scheduler 적용해보기
다음으로 아키텍처 설계에 대해서 살펴보도록 하겠습니다. 현재 사용자 이벤트 관련 처리에 대한 흐름은 아래와 같습니다.
크게 세 가지 흐름으로 볼 수 있습니다
특정 게시글을 조회하는 사용자들은 게시글 조회 페이지를 나갈 때 각종 이벤트를 SQS
에 전달하게 됩니다.
예를 들어, 다음과 같은 형식으로 SQS
에 JSON 형태로 메시지를 전송하게 됩니다.
{
"postId" : 1,
"eventType" : "HISTORY",
"memberId" : 1,
"eventContent" : "MEMBER IS VISETED",
"createdAt" : "2024-05-09 13:13:00"
}
백앤드 서버에서는 Spring Scheduling
설정을 통해 일정 주기를 설정하여 SQS
에 메시지 전송 요청을 보내게 됩니다.
SQS
에서 수신한 메시지들을 Spring Batch
를 사용하여 Chunk
단위로 데이터를 처리하여 DB에 저장해줍니다.
이런 흐름으로 사용자의 이벤트들이 주기적으로 DB에 저장하며 대용량 데이터 처리를 하고 있습니다.
그리고 위 흐름도의 녹색으로 색칠된 과정에 대해서도 설명해보도록 하겠습니다.
SQS
에서 메시지 처리를 실패할 시 해당 메시지는 DLQ
로 이동하게 됩니다.
해당 사진의 최대 수신 수(MaximumReceives)로 설정된 최대 재시도 횟수를 초과할 시 해당 메시지가 DLQ
로 이동하도록 설정이 되어있는 상태입니다.
DLQ
에 쌓여진 실패 메시지들은 AWS Lambda
를 통해 다시 SQS
로 전달할 수 있습니다.
AWS Lambda
에 대한 설정은 아래 글에서 확인하실 수 있습니다.
이제 작성한 코드를 살펴보도록 하겠습니다.
dependencies {
// 스프링 배치 의존성 설정
implementation 'org.springframework.boot:spring-boot-starter-batch'
// 메시지 큐 의존성 추가, Spring 3.0 버전 이상
implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'
}
@Configuration
@RequiredArgsConstructor
public class AwsSQSConfig {
@Value("${cloud.aws.credentials.accessKey}")
private String AWS_ACCESS_KEY;
@Value("${cloud.aws.credentials.secretKey}")
private String AWS_SECRET_KEY;
@Value("${cloud.aws.region.static}")
private String AWS_REGION;
private ObjectMapper objectMapper;
// 클라이언트 설정: region과 자격증명
@Bean
public SqsAsyncClient sqsAsyncClient() {
return SqsAsyncClient.builder()
.credentialsProvider(() -> new AwsCredentials() {
@Override
public String accessKeyId() {
return AWS_ACCESS_KEY;
}
@Override
public String secretAccessKey() {
return AWS_SECRET_KEY;
}
})
.region(Region.of(AWS_REGION))
.build();
}
// 메시지 수신을 받기 위한 SqsTemplate 빈 생성
@Bean
public SqsTemplate sqsTemplate() {
return SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient())
.messageConverter(sqsMessagingMessageConverter(objectMapper)).build();
}
@Bean
public SqsMessagingMessageConverter sqsMessagingMessageConverter(ObjectMapper objectMapper) {
SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
TextPlainJsonMessageConverter payloadConverter = new TextPlainJsonMessageConverter(objectMapper);
converter.setPayloadMessageConverter(payloadConverter);
return converter;
}
}
해당 AwsSQSConfig
클래스에서는 Amazon SQS
와의 통신을 위한 클라이언트 및 SqsTemplate
을 설정하고 있습니다.
ACCESS_KEY
와 SECRET_KEY
, REGION
정보를 바탕으로 SQS
와의 통신을 담당하는 SqsAsyncClient
를 스프링 빈으로 등록해주고 있습니다.
실제 Amazon SQS
와 통신하여 메시지를 수신을 담당하는 SqsTemplate
도 SqsAsyncClient
과 SqsMessagingMessageConverter
스프링 빈 객체를 의존성 주입하여 스프링 빈으로 등록해줍니다.
SqsMessagingMessageConverter
는 SQS
에서 읽어온 JSON
형식의 데이터를 자바 객체로 역직렬화 해주는 역할을 담당합니다. 실제 SqsTemplate
에서 SQS
메시지를 처리해주는 messageConverter
를 설정해줄 수 있는데, MessagingMessageConverter
인터페이스를 상속받은 메시지 컨버터만 등록해줄 수 있기 때문에 자식 클래스인 SqsMessagingMessageConverter
를 사용해야합니다.
@Slf4j
public class TextPlainJsonMessageConverter extends AbstractMessageConverter {
private ObjectMapper objectMapper;
public TextPlainJsonMessageConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
protected boolean supports(Class<?> clazz) {
return true;
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
if (message.getPayload() instanceof String) {
String payload = (String) message.getPayload();
try {
return objectMapper.readValue(payload, PostLogDto.class);
} catch (IOException e) {
log.error("Unable to convert message", e);
return null;
}
}
return null;
}
}
해당 커스텀 메시지 컨버터는 JSON
형태의 SQS
메시지의 본문(payload
) 값을 꺼내 자바 객체로 역직렬화 해주는 역할을 하고 있습니다.
해당 커스텀 메시지 컨버터를 SqsMessagingMessageConverter.setPayloadMessageConverter()
설정을 통해 주입해주고 SqsTemplate.messageConverter()
설정에 해당 SqsMessagingMessageConverter
를 등록해주면 SqsTemplate
을 통하여 SQS
에 메시지를 읽어오는 경우 JSON
형태의 데이터를 원하는 자바 객체로 변환해주는 작업을 합니다.
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {
private JobRepository jobRepository;
private PlatformTransactionManager platformTransactionManager;
private SqsWriter sqsWriter;
private SqsProcessor sqsProcessor;
private SqsAsyncClient sqsAsyncClient;
private String queueName;
private SqsTemplate sqsTemplate;
public BatchConfig(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
SqsWriter sqsWriter,
SqsProcessor sqsProcessor,
SqsAsyncClient sqsAsyncClient,
SqsTemplate sqsTemplate,
@Value("${cloud.aws.sqs.queue-name}") String queueName) {
this.jobRepository = jobRepository;
this.platformTransactionManager = platformTransactionManager;
this.sqsWriter = sqsWriter;
this.sqsProcessor = sqsProcessor;
this.sqsTemplate = sqsTemplate;
this.sqsAsyncClient = sqsAsyncClient;
this.queueName = queueName;
}
@Bean
public Job sqsBatchJob(Step step) {
return new JobBuilder("sqsBatchJob", this.jobRepository)
.start(step)
.build();
}
@Bean
public Step sqsStep() {
return new StepBuilder("sqsBatchStep", this.jobRepository)
.<PostLogDto, PostLog>chunk(10, platformTransactionManager)
.reader(sqsReader())
.processor(sqsProcessor)
.writer(sqsWriter)
.build();
}
@Bean
@StepScope
public ItemReader<PostLogDto> sqsReader() {
log.info("SQS 메시지 읽기 작업 호출");
List<PostLogDto> inputDataList = new ArrayList<>();
boolean continuePolling = true;
while (continuePolling) {
try {
// SQS 폴링 작업
Collection<Message<PostLogDto>> messages = sqsTemplate.receiveMany(from -> from.queue(queueName)
.maxNumberOfMessages(10)
.pollTimeout(Duration.ofSeconds(10)), PostLogDto.class);
if (messages == null || messages.isEmpty()) {
continuePolling = false; // 메시지가 없으면 반복 종료
} else {
List<PostLogDto> extractedData = messages.stream()
.map(Message::getPayload)
.collect(Collectors.toList());
inputDataList.addAll(extractedData);
log.info("SQS 읽은 데이터 목록 :");
for (PostLogDto inputData : inputDataList) {
log.info("{}", inputData);
}
}
} catch (Exception e) {
continuePolling = false;
}
}
return new ListItemReader<>(inputDataList);
}
}
해당 BatchConfig
클래스에서는 배치 프로세스를 진행해야할 Job
과 Step
에 대해서 정의하고 있습니다.
Job
에서 하나의 Step
에 대한 배치 처리를 정의하고 있고 Step
에서는 청크 사이즈를 10개로 지정하고 있으며 ItemReader
, ItemProcessor
, ItemWriter
를 등록해주고 있습니다.
ItemReader
는 SQS
에서 메시지를 수신 요청하여 저장되어 있는 모든 메시지를 읽어오는 역할을 하고 있습니다.
의존성 주입을 받은 SqsTemplate
의 receiveMany()
메서드를 호출하여 SQS
에 메시지 수신 요청을 하게 됩니다. 한 번에 수신할 최대 메시지 갯수는 10개로 설정하고 폴링 시간을 10초로 설정하였습니다.
while
반복문을 통하여 SQS
의 모든 메시지를 읽어오고 난 후 ItemProcessor
에게 해당 메시지(PostLogDto 객체
)를 전달하게 됩니다.
@Component
@Slf4j
public class SqsProcessor implements ItemProcessor<PostLogDto, PostLog> {
@Override
public PostLog process(PostLogDto dto) throws Exception {
PostLog postLogEntity = new PostLog(dto);
return postLogEntity;
}
}
해당 SqsProcessor
클래스는 ItemReader
에서 전달받은 PostLogDto
객체들을 엔티티 객체인 PostLog
로 변환해주는 작업을 담당합니다.
변환이 완료된 (새로 생성된) 엔티티 목록을 다음 단계인 ItemWriter
에게 전달하게 됩니다.
@Component
@RequiredArgsConstructor
@Slf4j
public class SqsWriter implements ItemWriter<PostLog> {
private final PostLogRepository postLogRepository;
@Override
public void write(Chunk<? extends PostLog> chunk) throws Exception {
List<? extends PostLog> items = chunk.getItems();
postLogRepository.saveAll(items);
log.info("SqsWriter 호출 :" + items.size());
}
}
해당 SqsWriter
클래스에서는 ItemProcessor
로부터 받은 PostLog
엔티티 객체를 실제 DB에 저장하는 역할을 맡고 있습니다.
청크 단위로 Item
들을 받아 Repository
를 통해 DB에 INSERT
하게 됩니다.
@Configuration
@EnableScheduling
@Slf4j
public class BatchScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job sqsBatchJob;
@Scheduled(cron = "0 */15 * * * ?") // 매 15분마다 실행
public void runBatchJob() throws JobExecutionException {
log.info("SQS 스케줄링 동작");
jobLauncher.run(sqsBatchJob, new JobParametersBuilder()
.addLong("uniqueness", System.nanoTime()).toJobParameters());
}
}
해당 BatchScheduler
클래스에서는 일정 주기를 설정하여 스프링 빈으로 등록해두었던 Job
을 실행시켜주는 역할을 하고 있습니다.
크론 방식을 사용하여 매 15분마다 SQS
에 저장된 메시지를 읽어와 DB에 저장해주는 Job
을 실행시켜주게 됩니다.
Amazon SQS
에서 메시지를 수신할 수 있는 다른 방법을 공식 문서에서 확인할 수 있었습니다. 아래와 같이 @SqsListener
어노테이션을 사용하여 간단하게 메시지를 수신하는 방법이 빈번하게 활용된다고 하더라구요.
@Component
public class SqsTransferListener {
@SqsListener(value = "${cloud.aws.sqs.queue-name}",factory = "defaultSqsListenerContainerFactory")
public void messageListener(PostLogDto message) {
System.out.println("Listener : " + message);
}
}
위의 코드에서 @SqsListener
어노테이션은 Amazon SQS
대기열 큐에 메시지가 도착하는 대로 자동으로 메시지를 수신합니다.
Evertrip
프로젝트에서는 Amazon SQS
에서 메시지 수신 요청을 원하는 시점(일정 주기에 따른)에 보내야하기 때문에 위의 @SqsListener
어노테이션을 사용한 메시지 수신 방법은 적합하지 않았습니다.
그래도 @SqsListener
어노테이션을 활용하는 방법에 대해 공부하기 위해 공식 사이트를 보면서 찾아보았습니다.
@SqsListener
어노테이션 설정에서 value
값과 factory
값을 설정할 수 있습니다.
value
설정에는 메시지 수신 요청을 할 queue name
을 입력하고 factory
에는 메시지 리스너 컨테이너 팩토리
를 입력하게 됩니다.
메시지 리스너 컨테이너 팩토리
를 설정하는 코드를 살펴보겠습니다.
@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory
.builder()
.configure(options -> options
.maxMessagesPerPoll(10)
.listenerMode(ListenerMode.BATCH)
.acknowledgementMode(AcknowledgementMode.ALWAYS)
.acknowledgementInterval(Duration.ofSeconds(3))
.acknowledgementThreshold(5)
.acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
.messageConverter(sqsMessagingMessageConverter(objectMapper))
)
.sqsAsyncClient(sqsAsyncClient)
.build();
}
이번 포스팅에서는 배치에 관련된 옵션에 대해서 설명드리도록 하겠습니다. 다른 옵션들이 궁금하신 분들은 아래 링크를 확인해주세요!
리스너 컨테이너 팩토리에 SqsContainerOptions
옵션에서 ListenerMode.BATCH
로 설정하면 해당 팩토리를 사용하여 @SqsListener
메서드에 대한 단일 메시지 및 배치 컨테이너 모두 생성할 수 있다고 합니다.
ListenerMode.BATCH
모드로 설정하면 리스너는 여러 메시지를 한 번에 수신하게 됩니다. 즉, SQS에서 메시지를 배치로 가져와 한 번의 호출로 여러 메시지를 처리하게 됩니다.
AcknowledgementMode.ALWAYS
모드 설정은 리스너가 메시지를 성공적으로 수신하면 항상 자동으로 메시지를 SQS에게 확인하도록 지시합니다. 즉, 메시지 처리 후 자동으로 SQS에서 해당 메시지가 삭제되도록 구성됩니다.
listenerMode(BATCH)
와 acknowledgementMode(ALWAYS)
가 함께 사용될 때, 리스너는 메시지를 배치로 수신하고, 각 배치의 처리가 성공적으로 완료되면, 해당 배치의 모든 메시지에 대해 자동으로 SQS에 확인 응답을 보냅니다.
해당 설정은 대량의 메시지를 효과적으로 처리하면서도, 메시지 처리에 대한 신뢰성을 보장하기 위해 자동으로 확인 처리를 수행한다고 합니다.
Spring Cloud AWS
공식 사이트에 기재된 BATCH
처리 관련 내용입니다.
배치 모드를 활성화하면 프레임 워크는 메시지 폴링의 전체 결과를 리스너에게 제공하게 됩니다.
SQS의 제한으로 인해 실제로 한 번의 폴링에 10개까지만 메시지를 수신할 수 있습니다. 따라서 maxMessagesPerPoll이 10을 초과하는 경우, 여러 폴링 작업의 결과가 결합되어 리스너에 전달된다고 하네요.
또한 SqsListener
를 사용하여 일괄 처리를 활성화하려면 단일 List 또는 List<Message> 형태의 파리미터가 리스너 메서드에 제공되어야 한다고 적혀있습니다.
@Component
public class SqsTransferListener {
@SqsListener(value = "${cloud.aws.sqs.queue-name}",factory = "defaultSqsListenerContainerFactory")
public void messageListener(List<PostLogDto> message) {
System.out.println("Listener : " + message);
}
}
이런식으로 파라미터에 List 형태로 넣어주고 factory
설정에서 옵션에 ListenerMode.BATCH
모드를 추가해주시면 메시지를 한꺼번에 읽어온다고 하네요.
참고 자료
Spring Cloud AWS 공식 사이트