Amazon SQS(Simple Queue Service)는 마이크로서비스 아키텍처에서 시스템 간의 결합도를 낮추고 비동기 통신을 구현하는 핵심 서비스입니다. 하지만 대용량 메시지를 안정적으로 처리하기 위해서는 SQS Consumer(소비자)의 동작 방식을 깊이 이해하고, 상황에 맞는 최적화 전략을 적용해야 합니다.

이 문서는 SQS Consumer의 핵심 설정을 이해하는 것부터 시작하여, 처리 시간이 긴 작업을 안정적으로 수행하기 위한 고급 패턴, 그리고 Spring Boot 프로젝트에 실제 적용하기 위한 Quick-Start 가이드까지 단계별로 상세하게 안내합니다.

📋 목차

  1. SQS Consumer의 핵심 설정 이해하기
  2. maxConcurrentMessages와 maxMessagesPerPoll의 관계
  3. 장기 실행 작업을 위한 동적 가시성 제한 시간(VT) 관리
  4. 주기적인 가시성 연장을 통한 안정적인 메시지 처리
  5. 가시성 연장 로직의 상세 분석 및 튜닝 포인트
  6. Spring Boot Quick-Start 가이드
  7. 최종 구현: 주기적인 가시성 연장 적용 코드
  8. 마무리하며

🔑 1. SQS Consumer의 핵심 설정 이해하기

SQS Consumer의 성능과 동작은 @SqsListener 어노테이션의 세 가지 핵심 설정에 의해 결정됩니다. 물류센터 운영에 비유하여 각 설정의 역할을 이해해 보겠습니다.

  • SQS 큐: 🚚 물건이 도착하는 컨베이어 벨트
  • maxMessagesPerPoll: 🛒 벨트에서 물건을 담아오는 카트의 크기
  • maxConcurrentMessages: 👷‍♂️ 물건을 포장하는 작업자(스레드)의 수

value / queueNames

역할: 어떤 큐(컨베이어 벨트)의 메시지를 처리할지 대상을 지정하는 가장 기본적인 설정입니다.

maxMessagesPerPoll

역할: 한 번의 폴링(polling)으로 SQS에서 최대 몇 개의 메시지를 가져올지 결정합니다. (기본값: 10, SQS API 최대값: 10)

참고: 최대값이 10인 이유는 AWS SQS의 ReceiveMessage API 호출 시 한 번에 수신할 수 있는 메시지 개수가 최대 10개로 제한되어 있기 때문입니다.

일괄 처리(Batching)와 비용 절감: AWS SQS는 메시지 개수가 아닌 API 요청 횟수를 기준으로 비용을 청구합니다. 예를 들어, 메시지 1개를 가져오는 요청을 10번 하면 10건의 요청으로 과금되지만, maxMessagesPerPoll=10으로 설정하여 한 번에 10개를 가져오면 1건의 요청으로 과금됩니다. 따라서 일괄 처리는 API 호출 수를 줄여 SQS 사용 비용을 직접적으로 절감하는 핵심적인 최적화 방법입니다.

영향: 이 값을 높이면 SQS API 호출 횟수가 줄어 네트워크 비용과 지연 시간이 감소합니다. 특별한 이유가 없다면 최대값인 10으로 설정하는 것이 가장 효율적입니다.

maxConcurrentMessages

역할: 가져온 메시지들을 동시에 몇 개나 처리할지 결정하는, 애플리케이션의 실질적인 처리량(Throughput)과 직결되는 설정입니다. (기본값: 10)

영향: 이 값을 높이면 처리 속도가 빨라지지만, 그만큼 CPU, 메모리, DB 커넥션 등의 리소스를 더 많이 사용합니다. 시스템의 처리 능력과 메시지 처리 로직의 성격(CPU-bound vs I/O-bound)을 고려하여 신중하게 튜닝해야 합니다.

🔗 2. maxConcurrentMessages와 maxMessagesPerPoll의 관계

두 설정은 독립적이지 않으며, 시스템의 안정성을 위해 반드시 지켜야 할 규칙이 있습니다.

핵심 검증 규칙: maxMessagesPerPoll ≤ maxConcurrentMessages

Spring Cloud AWS 프레임워크는 애플리케이션 시작 시점에 이 규칙을 강제합니다. 이는 한 번에 가져온 메시지(카트에 담아온 물건)의 수보다 처리할 수 있는 스레드(작업자)의 수가 적어서 메시지가 애플리케이션 내부 버퍼에 쌓이는 것을 방지하기 위함입니다.

상황별 최적화 전략

전략maxMessagesPerPollmaxConcurrentMessages주요 사용 사례
균형 잡힌 표준 구성1010가장 일반적이고 안정적인 구성. 대부분의 경우 이 설정으로 시작하는 것을 권장합니다.
I/O-Bound 작업1020 (또는 그 이상)DB 조회, 외부 API 호출 등 대기 시간이 긴 작업을 처리할 때, 시스템 리소스가 허용하는 한도 내에서 처리량을 극대화합니다.
CPU-Bound 작업22 (CPU 코어 수와 유사하게)복잡한 연산, 데이터 변환 등 CPU를 많이 사용하는 작업을 처리할 때, 과도한 컨텍스트 스위칭을 방지하고 시스템 과부하를 막습니다.

I/O-Bound 작업은 처리 시간의 대부분을 데이터베이스, 외부 API, 파일 시스템 등 외부 자원의 응답을 기다리는 데 사용합니다. 이 시간 동안 CPU는 유휴 상태(idle)가 되므로, CPU 코어 수보다 많은 스레드를 두면 한 스레드가 대기하는 동안 다른 스레드가 CPU를 사용하여 시스템 전체의 효율을 높일 수 있습니다.

반면 CPU-Bound 작업은 암호화, 압축, 복잡한 계산처럼 처리 시간 내내 CPU를 집중적으로 사용합니다. 이 경우 CPU 코어 수보다 많은 스레드는 잦은 컨텍스트 스위칭(Context Switching)을 유발하여 오히려 성능을 저하시킵니다. 따라서 스레드 수를 CPU 코어 수와 비슷하게 유지하는 것이 가장 효율적입니다.

💡 FIFO SQS 순서 보장을 위한 최적화 Tip

@SqsListener의 기본 설정(maxConcurrentMessages=10)은 FIFO 큐의 순서를 보장하지 않습니다. 엄격한 순서 보장이 필요하다면, 처리 스레드를 하나로 제한하는 maxConcurrentMessages="1" 설정이 필수입니다.

⏳ 3. 장기 실행 작업을 위한 동적 가시성 제한 시간(VT) 관리

문제점: 고정된 VT와 메시지 중복 처리

SQS 메시지는 소비자가 가져가면 가시성 제한 시간(Visibility Timeout, VT) 동안 다른 소비자에게 보이지 않게 됩니다. 만약 작업이 이 시간 안에 끝나지 않으면, SQS는 작업이 실패했다고 간주하고 메시지를 다시 큐에 노출시킵니다. 이때 다른 소비자가 이 메시지를 가져가 처리하면 의도치 않은 중복 실행이 발생합니다.

큐의 VT를 무작정 길게 설정하는 것은 해결책이 아닙니다. 소비자가 정말로 장애가 발생했을 때, 해당 메시지가 너무 오랫동안 큐에 묶여있게 되어 전체 시스템의 장애 복구 시간(Recovery Time)이 길어지기 때문입니다.

해결책: 소비자 주도의 동적 VT 연장

가장 이상적인 해결책은, 큐에는 비교적 짧은 VT(예: 5분)를 설정하고, 소비자가 자신의 작업 진행 상황에 맞춰 필요할 때마다 VT를 동적으로 연장하는 것입니다.

🛡️ 4. 주기적인 가시성 연장을 통한 안정적인 메시지 처리

소비자 주도의 동적 VT 연장을 구현하는 가장 표준적인 방법은 처리 시간 동안 주기적으로 가시성 제한 시간을 갱신해주는 것입니다.

주기적인 가시성 연장이란?

이 방식은 소비자가 메시지를 처리하는 동안, "아직 작업이 끝나지 않았으니 이 메시지에 대한 처리 권한을 연장해달라"는 신호를 SQS에 주기적으로 보내는 것을 의미합니다. 별도의 스레드(또는 코루틴)가 ChangeMessageVisibility API를 주기적으로 호출하여, 메시지가 처리 도중에 다른 소비자에게 넘어가는 것을 방지합니다.

다른 메시징 시스템(예: Azure Service Bus)에서는 이와 유사한 개념을 "잠금 갱신(Lock Renewal)" 또는 "임대 연장(Lease Extension)"이라고 부르기도 합니다.

동작 원리 및 기대 효과

  1. 메인 작업과 가시성 연장 로직 동시 시작: 소비자는 메시지 처리를 시작함과 동시에, 가시성 제한 시간을 연장하는 로직을 비동기적으로 실행합니다.

  2. 주기적인 VT 연장: 이 로직은 VT가 만료되기 전에 ChangeMessageVisibility API를 호출하여 VT를 계속해서 갱신해 줍니다.

  3. 안전한 종료: 메인 작업이 성공적으로 완료되면 가시성 연장 로직을 즉시 중단하고 메시지를 삭제합니다. 작업 중 예외가 발생하더라도 이 로직은 반드시 중지되어야 합니다.

기대 효과: 이 패턴을 통해 처리 시간을 예측할 수 없는 긴 작업도 중복 실행의 위험 없이 안정적으로 처리할 수 있습니다.

안전장치: 최대 연장 횟수 설정

만약 작업 로직에 버그가 있어 무한 루프에 빠질 경우, 가시성 연장 로직이 영원히 VT를 갱신할 수 있습니다. 이를 방지하기 위해 최대 연장 횟수를 설정하고, 이 횟수를 초과하면 스스로 중단하고 경고를 발생시키는 안전장치를 반드시 마련해야 합니다.

🔍 5. 가시성 연장 로직의 상세 분석 및 튜닝 포인트

주기적인 가시성 연장 로직을 실제로 구현하기 전에, 동작을 제어하는 핵심 파라미터들을 이해하고 어떻게 튜닝할지 결정해야 합니다.

initialVT (기본값: 30초)

리스너가 처음 메시지를 받았을 때 적용되는 VT입니다.

extendBy (기본값: 30초)

가시성을 연장할 때마다 새롭게 갱신할 VT 시간입니다. 예를 들어 이 값이 30이면, ChangeMessageVisibility API 호출 시 VT를 30초로 재설정합니다.

튜닝 포인트: 이 값을 짧게(예: 60초) 설정하면, 소비자 장애 시 메시지가 큐에 다시 나타나는 시간이 짧아져 빠른 장애 복구에 유리합니다. 길게(예: 300초) 설정하면 SQS API 호출 횟수가 줄어들어 비용을 절감할 수 있습니다.

heartbeatRatio (기본값: 0.6)

VT가 만료되기까지 남은 시간의 비율을 의미합니다. 이 비율에 도달하면 가시성 연장을 실행합니다.

예시: initialVT가 100초이고 heartbeatRatio가 0.7이면, 첫 가시성 연장은 100 * 0.7 = 70초가 지났을 때 실행됩니다. 네트워크 지연 등을 고려하여 0.5 ~ 0.8 사이의 값을 사용하는 것이 안전합니다.

maxExtensions (기본값: 120회)

최종 안전장치입니다. 작업이 비정상적으로 길어지는 것을 방지하기 위한 최대 연장 횟수입니다.

튜닝 포인트: extendBy maxExtensions는 해당 작업이 최대로 실행될 수 있는 시간을 의미합니다. (기본값: 30초 120회 = 3600초 = 1시간). 비즈니스 요구사항에 맞춰 "이 작업은 절대 이 시간 이상 실행되어서는 안 된다"는 상한선을 설정해야 합니다.

▶️ 6. Spring Boot Quick-Start 가이드

이 섹션에서는 Spring Boot와 Kotlin 환경에서 SQS 리스너와 코루틴을 사용하기 위한 최소한의 설정과 코드를 안내합니다.

6.1. 의존성 추가 (build.gradle.kts)

가장 먼저 build.gradle.kts 파일에 Spring Cloud AWS와 Kotlin Coroutines 관련 의존성을 추가합니다.

// Spring Cloud AWS BOM(Bill of Materials)을 통해 호환되는 버전 관리
dependencyManagement {
    imports {
        mavenBom("io.awspring.cloud:spring-cloud-aws-dependencies:3.1.1")
    }
}

dependencies {
    // Spring Cloud AWS SQS Starter
    implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs")

    // Kotlin Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") // Spring WebFlux 환경 등에서 사용 시
}

6.2. 설정 추가 (application.yml)

src/main/resources/application.yml 파일에 AWS 자격 증명과 SQS 큐 정보를 설정합니다.

# SQS Listener에서 사용할 큐 이름
sqs:
  name: "your-queue-name"

# Spring Cloud AWS 설정
spring:
  cloud:
    aws:
      region:
        static: "ap-northeast-2" # AWS 리전
      credentials:
        access-key: "YOUR_AWS_ACCESS_KEY"
        secret-key: "YOUR_AWS_SECRET_KEY"
      # EC2, ECS 등 IAM Role을 사용하는 환경에서는 access-key, secret-key 없이 자동 감지

6.3. AWS SQS 클라이언트 설정

@SqsListener가 사용할 SqsClient 빈을 등록합니다. Spring Cloud AWS 3.x부터는 SDK v2를 사용하므로 SqsAsyncClient를 등록하는 것이 권장되지만, 동기 방식의 SqsClient도 사용 가능합니다. 예제에서는 가시성 연장 관리자가 동기 클라이언트를 사용하므로 SqsClient를 등록합니다.

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient

@Configuration
class AwsConfig {

    @Bean
    fun sqsClient(): SqsClient {
        return SqsClient.builder()
            .region(Region.AP_NORTHEAST_2)
            // application.yml 설정을 자동으로 읽어오므로 별도 credentialsProvider 설정 불필요
            .build()
    }
}

6.4. 간단한 리스너 구현

이제 모든 준비가 끝났습니다. 아래와 같이 간단한 리스너를 작성하여 코루틴과 함께 동작하는지 확인할 수 있습니다.

import io.awspring.cloud.sqs.annotation.SqsListener
import io.awspring.cloud.sqs.support.SqsHeaders
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KLogging
import org.springframework.messaging.handler.annotation.Header
import org.springframework.stereotype.Component

@Component
class MySqsListener {

    companion object: KLogging()

    @SqsListener(value = ["\${sqs.name}"])
    fun listen(
        payload: String,
        @Header(SqsHeaders.SQS_MESSAGE_ID_HEADER) messageId: String
    ) {
        // SQS 리스너 메서드는 기본적으로 별도의 스레드에서 동작합니다.
        // 이 스레드를 블로킹하고 내부에서 코루틴을 실행하려면 runBlocking을 사용합니다.
        runBlocking {
            logger.info { "[$messageId] 메시지 수신, 5초간 처리 시작..." }
            
            // Coroutine의 비동기 지연 함수 사용
            delay(5000L) 

            logger.info { "[$messageId] 5초 작업 완료." }
        }
    }
}

✅ 7. 최종 구현: 주기적인 가시성 연장 적용 코드

이제 앞서 설명한 모든 개념을 종합하여, Kotlin Coroutine을 사용한 최종 구현 코드를 살펴봅니다.

SqsVisibilityExtender.kt: 동적 VT 연장 관리자

package com.franchise.note.api.common.sqs

import kotlinx.coroutines.*
import mu.KLogging
import org.springframework.stereotype.Component
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest
import kotlin.coroutines.cancellation.CancellationException

@Component
class SqsVisibilityExtender(
    private val sqs: SqsClient,
) {
    companion object : KLogging() {
        private const val DEFAULT_INITIAL_VT = 30
        private const val DEFAULT_EXTEND_BY = 30
        private const val DEFAULT_HEARTBEAT_RATIO = 0.6
        private const val DEFAULT_MAX_EXTENSIONS = 120
    }

    suspend fun <T> withVisibilityExtension(
        queueUrl: String,
        receiptHandle: String,
        taskName: String,
        initialVT: Int = DEFAULT_INITIAL_VT,
        extendBy: Int = DEFAULT_EXTEND_BY,
        heartbeatRatio: Double = DEFAULT_HEARTBEAT_RATIO,
        maxExtensions: Int = DEFAULT_MAX_EXTENSIONS,
        action: suspend () -> T,
    ): T =
        coroutineScope {
            val periodMs = (initialVT * heartbeatRatio * 1000).toLong().coerceAtLeast(1000L)
            val extensionJob = startExtension(queueUrl, receiptHandle, extendBy, periodMs, taskName, maxExtensions)

            try {
                val result = action()
                deleteMessage(queueUrl, receiptHandle, taskName)
                result
            } finally {
                stopExtension(extensionJob, taskName)
            }
        }

    private fun CoroutineScope.startExtension(
        queueUrl: String,
        receiptHandle: String,
        extendBy: Int,
        periodMs: Long,
        taskName: String,
        maxExtensions: Int,
    ): Job {
        return launch(Dispatchers.IO) {
            var extensionCount = 0
            while (isActive) {
                delay(periodMs)

                if (extensionCount >= maxExtensions) {
                    logger.warn("최대 가시성 연장 횟수($maxExtensions) 초과. Task: $taskName")
                    this.cancel(CancellationException("Max extensions reached"))
                    continue
                }

                runCatching {
                    extendVisibility(queueUrl, receiptHandle, extendBy)
                }.onSuccess {
                    extensionCount++
                    logger.info("가시성 연장 성공 #$extensionCount/$maxExtensions. Task: $taskName")
                }.onFailure { e ->
                    logger.warn("가시성 연장 실패. Task: $taskName, err=${e.message}")
                }
            }
        }
    }

    private suspend fun stopExtension(extensionJob: Job, taskName: String) {
        if (extensionJob.isActive) {
            extensionJob.cancelAndJoin()
            logger.info("가시성 연장 종료. Task: $taskName")
        }
    }

    private fun extendVisibility(queueUrl: String, receiptHandle: String, extendBy: Int) {
        val request = ChangeMessageVisibilityRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(receiptHandle)
            .visibilityTimeout(extendBy)
            .build()
        sqs.changeMessageVisibility(request)
    }

    private fun deleteMessage(queueUrl: String, receiptHandle: String, taskName: String) {
        sqs.deleteMessage(
            DeleteMessageRequest.builder()
                .queueUrl(queueUrl)
                .receiptHandle(receiptHandle)
                .build(),
        )
        logger.info("메시지 삭제 완료. Task: $taskName")
    }
}

SqsListener: 가시성 연장 적용 리스너

import io.awspring.cloud.sqs.annotation.SqsListener
import io.awspring.cloud.sqs.support.SqsHeaders
import kotlinx.coroutines.runBlocking
import mu.KLogging
import org.springframework.messaging.handler.annotation.Header
import org.springframework.stereotype.Component

@Component
class MyAdvancedSqsListener(
    private val sqsVisibilityExtender: SqsVisibilityExtender
) {

    companion object : KLogging()

    @SqsListener(
        value = ["\${sqs.name}"],
        maxMessagesPerPoll = "10",
        maxConcurrentMessages = "20"
    )
    fun handleLongRunningTask(
        payload: String,
        @Header(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER) receiptHandle: String,
        @Header(SqsHeaders.SQS_QUEUE_URL_HEADER) queueUrl: String,
    ) = runBlocking {
        if (payload.isBlank()) {
            logger.error("payload가 비어있습니다.")
            return@runBlocking
        }

        sqsVisibilityExtender.withVisibilityExtension(
            queueUrl = queueUrl,
            receiptHandle = receiptHandle,
            taskName = "my-long-running-job",
            extendBy = 60, // 60초마다 VT 연장
            maxExtensions = 60 // 최대 60회 연장 (총 1시간)
        ) {
            logger.info { "장기 실행 작업 시작..." }
            // 실제 비즈니스 로직 호출 (예: 2분 소요)
            runLongRunningJob(payload)
            logger.info { "장기 실행 작업 완료." }
        }
    }

    private suspend fun runLongRunningJob(data: String) {
        // 이 함수는 2분이 걸린다고 가정
        kotlinx.coroutines.delay(120_000L)
    }
}

🎉 8. 마무리하며

지금까지 SQS Consumer의 기본적인 처리량 조절부터 장기 실행 작업을 안정적으로 처리하기 위한 고급 패턴까지 살펴보았습니다. maxConcurrentMessages와 같은 기본 설정을 통해 시스템의 부하를 조절하고, 주기적인 가시성 연장 패턴을 통해 작업의 안정성을 확보하는 것은 확장 가능한 비동기 시스템을 구축하는 데 매우 중요합니다. 이 문서에서 다룬 개념과 코드 예제를 바탕으로 여러분의 시스템에 맞는 최적의 SQS Consumer를 구현하시길 바랍니다.

profile
일하며 겪은 문제를 나눠요

0개의 댓글