
기존 몇천만 건 데이터를 조회하고
대략 30개의 worker로 동시에 db read / write 작업을 하는 be 프로세스가
rdb 와 mongodb 에 많은 부하를 주게되서 개선하는 작업을 실무에서 진행하게 되었다.
db 인스턴스 하향 조정을 통해 비용 감소를 목표로 하는 작업으로 일정이 여유롭지 않아서 아쉬웠다.
golang 에 대해 완벽한 학습이 되지 않은 상태에서 맡은 2번째 업무라서 많이 뻘 짓을 했지만,
그래도 worker pool + rate limiter 개념을 제대로 알게된 기회였다.
be 프로세스를 개선하는 작업을 진행할 때, 많은 도움이 되었던 worker pool , rate limiter 개념을 정리해본다.
worker pool + rate limiter
https://levelup.gitconnected.com/rate-limit-plus-worker-pool-in-golang-8df5b8cab378

go-concurrency-worker-pool-pattern 참고
https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0
Go의 Worker Pool에서 graceful shutdown을 구현하는 방법
주로 Context와 WaitGroup을 사용하여 작업 고루틴들이 안전하게 종료될 수 있도록 한다.
root context에서 cancellation signal을 생성하고,
각 worker 고루틴에 cancellable context를 전달 작업이 완료되면
WaitGroup을 사용하여 모든 worker들이 종료될 때까지 대기한다.
구체적인 구현 방법
Context 기반 취소
context.WithCancel을 사용하여 root context에서 cancellable context를 생성
생성된 cancellable context와 취소 함수를 worker에 전달
WaitGroup 사용
sync.WaitGroup을 사용하여 worker 고루틴들의 개수를 추적
각 worker 고루틴이 시작될 때 wg.Add(1)을 호출하고, 작업 완료 후 wg.Done()을 호출
Producer 측 채널 닫기
작업 요청 채널(예: jobs chan Job)을 사용하는 경우, 모든 작업이 처리된 후 채널을 닫음.
취소 신호 전파
cancel() 함수를 호출하여 worker 고루틴들에게 종료를 알림.
WaitGroup 대기
wg.Wait()를 호출하여 모든 worker 고루틴들이 종료될 때까지 대기
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
}
func worker(ctx context.Context, id int, jobs <-chan Job, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
fmt.Printf("Worker %d: shutting down\n", id)
return
}
fmt.Printf("Worker %d: processing job %d\n", id, job.ID)
// 작업 처리
time.Sleep(time.Second)
fmt.Printf("Worker %d: job %d done\n", id, job.ID)
case <-ctx.Done():
fmt.Printf("Worker %d: received shutdown signal\n", id)
return
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
numWorkers := 3
jobs := make(chan Job, 10)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(ctx, i, jobs, &wg)
}
// 작업 추가
for i := 0; i < 5; i++ {
jobs <- Job{ID: i}
}
close(jobs) // 모든 작업이 추가되면 채널을 닫습니다.
// Graceful shutdown
time.Sleep(2 * time.Second) // 작업이 모두 처리될 시간을 줍니다.
cancel() // 취소 신호를 보냅니다.
wg.Wait() // 모든 worker가 종료될 때까지 대기합니다.
fmt.Println("Main: all workers finished")
}
추가 고려 사항
타이머 사용
작업 처리 시간이 오래 걸리는 경우, 작업 자체에 대한 타임아웃을 설정하여 무한정 대기하는 것을 방지
에러 처리
worker 고루틴 내부에서 발생하는 오류를 처리하고,
필요한 경우 상위로 전달하여 graceful shutdown 과정에서 적절히 대응할 수 있도록 해야함.
다양한 작업 유형 처리
각기 다른 작업 유형에 따라 서로 다른 취소 전략을 적용해야 할 수도 있음.
예를 들어, 즉시 종료해야 하는 작업과 완료될 때까지 기다려야 하는 작업이 혼재됨.
Graceful shutdown을 사용해야 하는 이유
데이터 손실 방지 : 처리 중인 요청이 중단되는 것을 방지하여 데이터 손실을 최소화
사용자 경험 향상 : 갑작스러운 종료로 인한 오류 발생을 줄여 사용자 경험을 개선
시스템 안정성 확보 : 시스템이 정상적으로 종료되도록 하여 시스템 오류 발생 가능성을 줄임.
worker pool 은 java 의 thread pool 과 같다고 생각할 수 있다.
(나만..?)