worker pool pattern은 실행이 오래 걸리는 task를 워커로 처리하는 패턴입니다. 몇개의 worker를 생성하고 task를 할당하여 작업하는 방식입니다. 대량의 worker를 생성하여 작업하면 좋겠지만, 리소스의 한계가 있기에 적당한 수의 worker를 생성하는것이 좋습니다.
아래 그림과 같이 worker의 수 만큼 병렬로 처리를 합니다.

예제에서는 통장에 특정금액이 있고 그것을 transfer, withdraw, deposit하는 기능을 만들어 보도록 하겠습니다.
아래와 같이 directory 구조를 만들고 go파일을 생성합니다.

work.go 파일내에는 worker가 무슨일(transfer, withdraw, deposit)을 할지를 정의합니다.
package pool
import "errors"
type op string
const (
Transfer op = "transfer"
Withdraw = "withdraw"
Deposit = "deposit"
)
type WorkRequest struct {
Op op
From string
To string
Amount int
}
type WorkResponse struct {
Wr WorkRequest
Account *BankAccount
Err error
}
func Process(wr WorkRequest) WorkResponse {
switch wr.Op {
case Transfer:
return transfer(wr)
case Withdraw:
return withdraw(wr)
case Deposit:
return deposit(wr)
default:
return WorkResponse{Err: errors.New("unsupported operation")}
}
}
worker.go에서는 worker pool을 생성하고 channel을 연결합니다, 초기에는 대기상태이고 task가 들어오게 되면 waiting 상태인 worker중 하나의 worker가 task를 받아서 작업을 하기 시작합니다.
package pool
import (
"context"
"fmt"
)
func Dispatch(numWorker int) (context.CancelFunc, chan WorkRequest, chan WorkResponse) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
in := make(chan WorkRequest, 10)
out := make(chan WorkResponse, 10)
for i := 0; i < numWorker; i++ {
go Worker(ctx, i, in, out)
}
return cancel, in, out
}
// Worker loops forever and is part of the worker pool
func Worker(ctx context.Context, id int, in chan WorkRequest, out chan WorkResponse) {
for {
select {
case <-ctx.Done():
return
case wr := <-in:
fmt.Printf("worker id: %d, performing %s work\n", id, wr.Op)
out <- Process(wr)
}
}
}
repository.go에는 저장되어(메모리) 있는 데이터를 조작합니다.
package pool
import "errors"
type BankAccount struct {
Account string
Name string
Amount int
}
var bankAccounts = map[string]*BankAccount{
"a-230914-1": {Account: "a-230914-1", Name: "홍길동", Amount: 1000000},
"a-230914-2": {Account: "a-230914-2", Name: "임환희", Amount: 1000000},
"a-230914-3": {Account: "a-230914-3", Name: "서은교", Amount: 1000000},
}
var (
ErrNoAccount = errors.New("no account")
ErrInsufficient = errors.New("insufficient amount")
)
func transfer(wr WorkRequest) WorkResponse {
sender, exist := bankAccounts[wr.From]
if !exist {
return WorkResponse{
Wr: wr,
Err: ErrNoAccount,
}
}
taker, exist := bankAccounts[wr.To]
if !exist {
return WorkResponse{
Wr: wr,
Err: ErrNoAccount,
}
}
if sender.Amount-wr.Amount < 0 {
return WorkResponse{
Wr: wr,
Err: ErrInsufficient,
} // 충분하지 않은 잔고
}
sender.Amount -= wr.Amount
taker.Amount += wr.Amount
return WorkResponse{
Wr: wr,
Err: nil,
Account: sender,
}
}
func withdraw(wr WorkRequest) WorkResponse {
owner, exist := bankAccounts[wr.From]
if !exist {
return WorkResponse{
Wr: wr,
Err: ErrNoAccount,
}
}
if owner.Amount-wr.Amount < 0 {
return WorkResponse{
Wr: wr,
Err: ErrInsufficient,
} // 충분하지 않은 잔고
}
owner.Amount -= wr.Amount
return WorkResponse{
Wr: wr,
Err: nil,
Account: owner,
}
}
func deposit(wr WorkRequest) WorkResponse {
owner, exist := bankAccounts[wr.To]
if !exist {
return WorkResponse{
Wr: wr,
Err: ErrNoAccount,
}
}
owner.Amount += wr.Amount
return WorkResponse{
Wr: wr,
Err: nil,
Account: owner,
}
}
main.go에서는 실제로 원하는작업을 생성합니다, 예제에서는 1.임환희 -> 서은교 1000 원씩 30번 transfer 수행, 2. 임환희 계좌에 다시 1000을 충전 30회를 수행하였습니다.
package main
import (
"fmt"
"playGround/workerpool/pool"
)
const NumLoop = 30
func main() {
cancel, in, out := pool.Dispatch(10)
defer cancel()
// 임환희 -> 서은교 1000 원씩 10번 transfer
for i := 0; i < NumLoop; i++ {
in <- pool.WorkRequest{Op: pool.Transfer, From: "a-230914-2", To: "a-230914-3", Amount: 1000}
}
// 임환희 계좌에 다시 1000을 충전
for i := 0; i < NumLoop; i++ {
res := <-out
account := res.Account
in <- pool.WorkRequest{Op: pool.Deposit, To: account.Account, Amount: 1000}
}
for i := 0; i < NumLoop; i++ {
res := <-out
if res.Err != nil {
fmt.Printf("err : %v \n", res.Err)
}
account := res.Account
fmt.Printf("account : %v, name : v% , amount : %v \n", account.Account, account.Name, account.Amount)
}
}
실행 결과를 보면 아래와 같이 어떤 worker가 작업을 수행할지에 대해서는 알 수 없습니다. 그렇기에 위 예제는 실제로 은행계좌 입출금 시스템에는 여러 worker가 데이터를 조작함으로 적합하지 않습니다. 예를들면 영수가, 철수가 잔액이 1000원인 C 통장에서 동시에 1000원의 출금 요청 시 둘중 한명은 출금 실패가 되어야 합니다. 만약 하나의 worker를 사용한다면, 정상동작 합니다.
