일을 하던 도중 각 기기의 상태를 수시로 체크해서 내보내는 기능을 구현해야 했다. 여러 기기의 상태를 체크해서 변화가 있으면 여러 소스에 내보내야 했고 이러한 이벤트가 비동기적으로 발생되는 만큼 데이터 역시 비동기적으로 받아서 표현해야 했다. 이를 위해 검색하던 도중 발행-구독 모델이 적절하다고 생각했고 해당 개념에 대해 자세히 공부하게 되었다.
발행-구독 모델(Publish-subscribe pattern)은 특정 데이터를 발행할 때 기존에 구독했던 모든 구독자에게 내용을 전달하게끔 하는 모델이다.
특정한 이벤트가 발생하여 브로커(Broker)의 발행(Publish) 함수를 데이터를 건네면서 호출하면 미리 구독(Subscribe) 함수를 호출한 객체들이 데이터를 받아 처리하는 구조이다.
이벤트와 구독자 사이에는 브로커 객체가 존재한다. 옵저버 패턴으로 따지면 발행자의 역할도 담당하며 이벤트와 구독자 사이의 징검다리 역할을 담당한다. 이러한 구조는 이벤트의 소스와 구독자가 서로 몰라도 브로커를 통해 원하는 동작을 하도록 만든다.
두 패턴 모두 특정 대상의 이벤트를 통해 다른 대상들이 특정한 작업을 진행하도록 하는 디자인 패턴이다. 즉 목적과 행동 자체는 비슷하다. 다만 구조 및 구현 상 몇가지 차이점이 존재한다.
Golang은 설계적으로 채널을 이용한 비동기 통신에 특화되어 있는 언어이기 때문에 발행-구독 모델을 어렵지 않게 구현할 수 있다.
우선 참고자료에서 구현된 코드이다.
type Broker[T any] struct {
stopCh chan struct{}
publishCh chan T
subCh chan chan T
unsubCh chan chan T
}
func NewBroker[T any]() *Broker[T] {
return &Broker[T]{
stopCh: make(chan struct{}),
publishCh: make(chan T, 1),
subCh: make(chan chan T, 1),
unsubCh: make(chan chan T, 1),
}
}
func (b *Broker[T]) Start() {
subs := map[chan T]struct{}{}
for {
select {
case <-b.stopCh:
return
case msgCh := <-b.subCh:
subs[msgCh] = struct{}{}
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker:
select {
case msgCh <- msg:
default:
}
}
}
}
}
func (b *Broker[T]) Stop() {
close(b.stopCh)
}
func (b *Broker[T]) Subscribe() chan T {
msgCh := make(chan T, 5)
b.subCh <- msgCh
return msgCh
}
func (b *Broker[T]) Unsubscribe(msgCh chan T) {
b.unsubCh <- msgCh
}
func (b *Broker[T]) Publish(msg T) {
b.publishCh <- msg
}
제너릭을 이용해 브로커를 구현한 모습이다. 가장 정석적인 형태의 발행-구독 모델이고 대부분의 경우에는 위의 코드를 넣음으로써 해결할 수 있다.
위의 코드에서 눈여겨보아야 할 부분으로는 채널의 채널 타입(chan chan T)과 맵 형태로 관리되는 구독 채널들(run 함수 내 subs), 그리고 전반적인 작동을 다루는 run 함수 내 select 문이 있다.
다만 필자가 해결해야 하는 문제에는 다음과 같은 추가 요구사항이 있었다.
결과적으로 다음과 같은 코드를 짜게 되었다.
package broker
import (
"apsb-server/log"
"reflect"
"sync"
)
type broker[T any] struct {
pub chan T
sub chan chan T
unsub chan chan T
stop chan struct{}
last T
}
func New[T any](initial T) *broker[T] {
b := &broker[T]{
pub: make(chan T, 10),
sub: make(chan chan T, 10),
unsub: make(chan chan T, 10),
stop: make(chan struct{}),
last: initial,
}
go b.run()
return b
}
func (b *broker[T]) run() {
subs := map[chan T]struct{}{}
for {
select {
case <-b.stop:
for ch := range subs {
close(ch)
}
return
case ch := <-b.sub:
subs[ch] = struct{}{}
ch <- b.last
case ch := <-b.unsub:
if _, ok := subs[ch]; ok {
delete(subs, ch)
close(ch)
}
case message := <-b.pub:
for ch := range subs {
select {
case ch <- message:
default:
}
}
}
}
}
func (b *broker[T]) Stop() {
close(b.stop)
}
func (b *broker[T]) Subscribe() chan T {
ch := make(chan T, 1)
b.sub <- ch
return ch
}
func (b *broker[T]) Unsubscribe(ch chan T) {
b.unsub <- ch
}
func (b *broker[T]) Publish(message T) {
if reflect.DeepEqual(b.last, message) {
return
}
b.last = message
b.pub <- message
}
func (b *broker[T]) Current() T {
return b.last
}