
대량의 리뷰 데이터를 LLM으로 분석하는 시스템을 구축하게 되었습니다. 구현하면서 마주친 여러 문제점들과 해결 과정을 공유합니다.
시리즈 구성
1부: 네트워크 지연 환경에서의 효율적 배치 처리 (현재 글)
2부: LangChain을 통한 LLM 연동과 토큰·응답 속도 최적화 (다음 글)
초기 구조는 단순했습니다. 데이터 수집 서버에서 가져온 리뷰 데이터를 배치로 처리해서 LLM API를 호출하고, 분석 결과를 저장하는 것이었습니다.
데이터 수집 서버 → 배치 작업 → LLM API 호출 → 결과 저장
실제로 운영해보니 다음과 같은 문제들이 발생했습니다:
특히 수천 건의 리뷰를 처리할 때 전체 작업이 완료되기까지 예상보다 훨씬 오랜 시간이 걸렸고, 서버 리소스 사용량이 불안정했습니다.
메인 서비스에 영향을 주지 않기 위해 LLM 처리를 별도 서버로 분리했습니다.
데이터 수집 서버 → 서비스 API 서버 → AWS SQS → LLM 전담 서버
분리한 이유는 다음과 같습니다:
장애 격리
LLM API의 지연이나 실패가 메인 서비스 응답성에 영향을 주지 않도록 설계할 수 있습니다.
리소스 최적화
기존 SpringBoot/Kotlin 기반 서비스는 그대로 유지하면서, LLM 처리만 별도로 분리함으로써 FastAPI/LangChain과 같은 Python 생태계의 강점을 활용할 수 있습니다.
확장성
LLM 처리 부하에 따라 해당 서버만 독립적으로 스케일링할 수 있습니다.
서비스 API는 기존에 JPA를 사용하고 있었습니다. 수집된 리뷰 데이터를 DB에 저장하는 과정에서 대량 데이터 처리에 한계가 있었습니다.
JPA만으로는 대량 데이터 처리에 한계가 있었습니다. Hibernate의 배치 설정을 사용할 수도 있지만, 다음과 같은 제약을 고려할 수 있습니다:
이러한 한계를 해결하기 위해 대량 데이터 처리 부분에는 JdbcTemplate을 도입할 수 있습니다.
구현 예시
@Repository
class ReviewBulkRepository(private val namedParameterJdbcTemplate: NamedParameterJdbcTemplate) {
fun bulkInsert(reviews: List<Review>) {
val sql = """
INSERT INTO reviews (id, content, platform, created_at)
VALUES (:id, :content, :platform, :createdAt)
ON DUPLICATE KEY UPDATE
content = VALUES(content),
updated_at = NOW()
""".trimIndent()
val parameterList = reviews.map { review ->
mapOf(
"id" to review.id,
"content" to review.content,
"platform" to review.platform,
"createdAt" to review.createdAt
)
}
namedParameterJdbcTemplate.batchUpdate(sql, parameterList.toTypedArray())
}
}
JdbcTemplate 도입으로 다음과 같은 개선 효과를 얻을 수 있습니다:
ON DUPLICATE KEY UPDATE 기능을 활용한 중복 처리rewriteBatchedStatements=true 설정으로 네트워크 라운드트립 최소화참고: 파라미터 바인딩 시 NamedParameterJdbcTemplate을 사용하면 인덱스 기반 바인딩보다 코드 가독성과 유지보수성을 높일 수 있습니다.
LLM API 응답 시간(평균 3-5초)을 고려하면 순차 처리 방식으로는 대량 데이터 처리에 너무 많은 시간이 소요됩니다.
직렬 처리 vs 병렬 처리 비교 (1,000건 기준)
| 처리 방식 | 건당 처리 시간 | 총 소요 시간 | 시간당 처리량 |
|---|---|---|---|
| 직렬 처리 (1개) | 약 3.5초 | 약 58분 | 1,030건/시간 |
| 병렬 처리 (4개) | 약 3.5초 | 약 15분 | 4,120건/시간 |
건당 처리 시간 구성
- LLM API 응답: 평균 3초
- DB 트랜잭션 및 후처리: 약 0.5초
- 총 3.5초/건
병렬 처리로 약 4배의 처리량 향상을 기대할 수 있습니다. 이제 적정 동시성 수준을 결정해야 했습니다.
서버 스펙
부하 테스트 결과
| 동시 요청 수 | CPU 사용률 | 메모리 사용률 | 처리 성공률 | 비고 |
|---|---|---|---|---|
| 2개 | 60% | 70% | 높음 | 안정적인 수준 |
| 4개 | 85% | 80% | 높음 | 허용 가능한 범위 |
| 6개 | 95% | 90% | 낮음 | 불안정한 상태 |
| 8개 | 100% | 95% | 매우 낮음 | 리소스 부족 |
테스트 결과를 바탕으로 동시 처리 4개를 최적값으로 설정할 수 있었습니다.
1차 구현 컨셉
리소스 제약을 고려해 병렬 처리를 4개로 제한하며 일괄 처리하는 방식으로 접근했습니다. 커서 기반 페이지네이션으로 데이터를 조회하고, 동시 처리 개수(4개)에 맞춰 청크 단위로 나누어 병렬 처리하는 방식으로 설계했습니다. 전체 데이터 처리가 완료될 때까지 재귀적으로 실행되도록 구현할 수 있습니다.
구현 예시
@Service
class ReviewAnalysisService {
companion object {
private const val MAX_CONCURRENT_REQUESTS = 4
private const val BATCH_SIZE = 10
private const val PAGE_LIMIT = MAX_CONCURRENT_REQUESTS * BATCH_SIZE
}
suspend fun processReviews() {
var cursor: String? = null
do {
val reviews = reviewRepository.findByCursor(cursor, PAGE_LIMIT)
if (reviews.isNotEmpty()) {
// 4개 청크로 나누어 병렬 처리
reviews.chunked(BATCH_SIZE).map { chunk ->
async { processChunk(chunk) }
}.awaitAll()
cursor = reviews.last().id
}
} while (reviews.size == PAGE_LIMIT)
}
private suspend fun processChunk(chunk: List<Review>) {
chunk.forEach { review ->
llmAnalysisService.analyzeReview(review)
}
}
}
개선된 부분
남은 한계
이러한 한계를 해결하기 위해 메시지 큐 기반 비동기 처리를 도입할 수 있습니다.
FIFO SQS 선택 이유
메시지 발행 예시
@Component
class ReviewAnalysisPublisher(private val sqsClient: SqsClient) {
fun publishReviewBatch(reviews: List<Review>) {
reviews.chunked(4).forEach { batch ->
val message = ReviewBatchMessage(
batchId = UUID.randomUUID().toString(),
reviews = batch,
timestamp = LocalDateTime.now()
)
sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(QUEUE_URL)
.messageBody(objectMapper.writeValueAsString(message))
.messageGroupId("review-analysis")
.messageDeduplicationId(message.batchId)
.build()
)
}
}
}
메시지 소비 예시
@SqsListener("review-analysis-queue.fifo")
suspend fun processReviewBatch(message: ReviewBatchMessage) {
if (isAlreadyProcessed(message.batchId)) return
message.reviews.map { review ->
async { llmAnalysisService.analyzeReview(review) }
}.awaitAll()
markAsProcessed(message.batchId)
}
AWS SQS는 메시지 크기를 256KB로 제한합니다. 대량의 리뷰 데이터를 배치로 처리할 때 이 제한을 쉽게 초과할 수 있습니다.
현재 배치 크기(4건)로는 문제없지만, 처리량 향상을 위해 배치 크기를 늘리거나 컨텐츠 자체의 크기가 커질 경우 제한에 걸릴 수 있습니다.
메시지 큐에는 참조 키만 담고, 실제 데이터는 Redis와 같은 캐시 저장소에 저장하는 방식으로 해결할 수 있습니다.
메시지 크기 최적화
성능 향상
유연성 확보
Redis 장애 대응
TTL 관리
SQS 사용 시 중요하게 고려할 부분은 Visibility Timeout 설정입니다.
Visibility Timeout의 역할
SQS에서 메시지를 가져온 후 처리가 완료되기 전까지는 다른 컨슈머가 해당 메시지를 볼 수 없습니다. 이 시간이 Visibility Timeout입니다.
타임아웃 설정에는 트레이드오프가 있습니다:
동적 타임아웃 연장 컨셉
LLM 처리 시간이 가변적이므로 고정된 타임아웃으로는 한계가 있습니다. 따라서 메시지 처리 중에 주기적으로 타임아웃을 연장하는 Heartbeat 방식을 구현할 수 있습니다. 처리가 완료되면 메시지를 삭제하고, 실패하면 SQS가 자동으로 재시도하도록 설정할 수 있습니다.
권장 설정값
동적 타임아웃 연장 구현 예시
@SqsListener("review-analysis-queue.fifo")
suspend fun processMessage(
message: ReviewBatchMessage,
@Header("ReceiptHandle") receiptHandle: String
) {
val heartbeatJob = startHeartbeat(receiptHandle)
try {
if (isAlreadyProcessed(message.batchId)) return
processReviewAnalysis(message)
sqsClient.deleteMessage(receiptHandle)
} catch (e: Exception) {
log.error("Message processing failed", e)
throw e
} finally {
heartbeatJob.cancel()
heartbeat.join()
}
}
private fun startHeartbeat(receiptHandle: String): Job {
return CoroutineScope(Dispatchers.IO).launch {
while (isActive) {
delay(30000) // 30초마다 연장
try {
sqsClient.changeMessageVisibility(receiptHandle, 120) // 2분으로 연장
} catch (e: Exception) {
log.warn("Failed to extend visibility timeout", e)
break
}
}
}
}
참고: Heartbeat 주기는 전체 타임아웃의 1/3 ~ 1/4 정도로 설정할 수 있습니다. 타임아웃이 120초라면 → 30초 간격으로 갱신하는 구성을 적용할 수 있습니다.
DLQ 설정
안정적인 운영을 위해서는 Dead Letter Queue 설정을 고려할 수 있습니다. 처리 실패가 반복되는 메시지의 무한 재시도를 방지할 수 있습니다. 적절한 재시도 횟수를 설정하여 DLQ로 이동하도록 구성할 수 있습니다.
서버 자원 안정성
배치 작업 효율성
운영 관리
구현 과정에서 고려할 수 있는 점들입니다:
LangChain을 활용한 LLM 연동 최적화에 대해 다룰 예정입니다:
기술 스택