Golang에서의 발행-구독 모델

simp7·2024년 6월 19일
0
post-thumbnail
post-custom-banner

계기

일을 하던 도중 각 기기의 상태를 수시로 체크해서 내보내는 기능을 구현해야 했다. 여러 기기의 상태를 체크해서 변화가 있으면 여러 소스에 내보내야 했고 이러한 이벤트가 비동기적으로 발생되는 만큼 데이터 역시 비동기적으로 받아서 표현해야 했다. 이를 위해 검색하던 도중 발행-구독 모델이 적절하다고 생각했고 해당 개념에 대해 자세히 공부하게 되었다.

발행-구독 모델

발행-구독 모델(Publish-subscribe pattern)은 특정 데이터를 발행할 때 기존에 구독했던 모든 구독자에게 내용을 전달하게끔 하는 모델이다.

구조

특정한 이벤트가 발생하여 브로커(Broker)의 발행(Publish) 함수를 데이터를 건네면서 호출하면 미리 구독(Subscribe) 함수를 호출한 객체들이 데이터를 받아 처리하는 구조이다.
발행-구독 모델

브로커

이벤트와 구독자 사이에는 브로커 객체가 존재한다. 옵저버 패턴으로 따지면 발행자의 역할도 담당하며 이벤트와 구독자 사이의 징검다리 역할을 담당한다. 이러한 구조는 이벤트의 소스와 구독자가 서로 몰라도 브로커를 통해 원하는 동작을 하도록 만든다.

장점

  • 발행자(이벤트)와 구독자를 서로 독립적으로 구현할 수 있다.
  • 구독자는 여러 이벤트 소스로부터 데이터를 받을 수 있다.
  • 비동기적으로 동작하기 때문에 비동기적으로 작동하도록 설계된 구조에 적합하다.
  • 받을 데이터만 안다면 구현에 독립적이므로(내부 코드에 접근할 필요가 없으므로) 외부 서비스와의 연동이 가능하다.

단점

  • 비동기적으로 동작하는 만큼 언어에 따라 구현이 어려울 수 있다.
  • 구독자의 수가 많을 수록, 이벤트가 빈번하게 발생할 수록 처리량이 늘어나서 전체적인 성능에 영향이 갈 수 있다.
  • 외부 서비스와의 연동을 위해 구독 기능을 노출시킬 경우 별도의 조치가 없으면 구독자를 무한정 늘리거나 의도하지 않은 사용자가 구독하여 데이터를 탈취하는 등의 보안상의 허점이 생길 수 있다.

VS 옵저버 패턴

두 패턴 모두 특정 대상의 이벤트를 통해 다른 대상들이 특정한 작업을 진행하도록 하는 디자인 패턴이다. 즉 목적과 행동 자체는 비슷하다. 다만 구조 및 구현 상 몇가지 차이점이 존재한다.

  • 옵저버 패턴에서는 관측자(구독자)가 상태를 관측하기 위해서는 관측 대상의 관측(구독) 함수를 직접 호출해야 한다. 반면 발행-구독 모델에서는 이러한 작업을 모두 브로커에게 위임하기 때문에 발행자는 브로커만 알고 있으면 되고 구독 대상은 브로커의 발행 함수만 호출하면 된다.
  • 옵저버 패턴은 1:N의 형태이지만 발행-구독 모델은 N:M의 형태로 구현할 수 있다. 즉, 동일한 broker라면 이벤트가 발생하는 어디에서든 발행할 수 있다.
  • 옵저버 패턴은 동기적이지만 발행-구독 모델은 비동기적이다. 이는 구현 형태에 기인하는데 옵저버 패턴은 직접 함수를 호출하는 방식인 반면 발행-구독 모델은 큐에 데이터를 전달하는 방식이기 때문이다. 이러한 특징으로 인해 언어에 따라서 발행-구독 패턴 구현이 어려울 수 있다.
  • 옵저버 패턴은 외부 서비스와의 연동이 어려운 반면 발행-구독 모델은 비교적 쉽다.

구현

Golang은 설계적으로 채널을 이용한 비동기 통신에 특화되어 있는 언어이기 때문에 발행-구독 모델을 어렵지 않게 구현할 수 있다.
우선 참고자료에서 구현된 코드이다.

type Broker[T any] struct {
    stopCh    chan struct{}
    publishCh chan T
    subCh     chan chan T
    unsubCh   chan chan T
}

func NewBroker[T any]() *Broker[T] {
    return &Broker[T]{
        stopCh:    make(chan struct{}),
        publishCh: make(chan T, 1),
        subCh:     make(chan chan T, 1),
        unsubCh:   make(chan chan T, 1),
    }
}

func (b *Broker[T]) Start() {
    subs := map[chan T]struct{}{}
    for {
        select {
        case <-b.stopCh:
            return
        case msgCh := <-b.subCh:
            subs[msgCh] = struct{}{}
        case msgCh := <-b.unsubCh:
            delete(subs, msgCh)
        case msg := <-b.publishCh:
            for msgCh := range subs {
                // msgCh is buffered, use non-blocking send to protect the broker:
                select {
                case msgCh <- msg:
                default:
                }
            }
        }
    }
}

func (b *Broker[T]) Stop() {
    close(b.stopCh)
}

func (b *Broker[T]) Subscribe() chan T {
    msgCh := make(chan T, 5)
    b.subCh <- msgCh
    return msgCh
}

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
    b.unsubCh <- msgCh
}

func (b *Broker[T]) Publish(msg T) {
    b.publishCh <- msg
}

제너릭을 이용해 브로커를 구현한 모습이다. 가장 정석적인 형태의 발행-구독 모델이고 대부분의 경우에는 위의 코드를 넣음으로써 해결할 수 있다.

위의 코드에서 눈여겨보아야 할 부분으로는 채널의 채널 타입(chan chan T)과 맵 형태로 관리되는 구독 채널들(run 함수 내 subs), 그리고 전반적인 작동을 다루는 run 함수 내 select 문이 있다.

다만 필자가 해결해야 하는 문제에는 다음과 같은 추가 요구사항이 있었다.

  • 각 데이터는 어디에서 구독하든 똑같은 데이터를 보여줘야 하므로 싱글톤으로 구현되어야 하며 브로커의 생명 주기는 프로그램의 생명 주기와 함께한다.
  • 새로 구독할 경우 가장 최근 데이터를 가져와야 한다.(위의 코드는 새로 온 구독자는 데이터를 발행할 때 까지 어떤 데이터도 받을 수 없다.)
  • 위와 이어지는 문제로 처음에는(어떤 발행도 하지 않았을 경우) 초기값이 존재해야 한다.
  • 성능상 이전 데이터가 동일하다면 재전송하지 않으면 좋다.(상태 체크가 빈번하게 이루어지기 때문에 상태 변화가 없더라도 발행에 리소스가 많이 들어갈 수 있다.)

결과적으로 다음과 같은 코드를 짜게 되었다.

package broker

import (
	"apsb-server/log"
	"reflect"
	"sync"
)

type broker[T any] struct {
	pub   chan T
	sub   chan chan T
	unsub chan chan T
	stop  chan struct{}
	last  T
}

func New[T any](initial T) *broker[T] {
	b := &broker[T]{
		pub:   make(chan T, 10),
		sub:   make(chan chan T, 10),
		unsub: make(chan chan T, 10),
		stop:  make(chan struct{}),
		last:  initial,
	}
	go b.run()
	return b
}

func (b *broker[T]) run() {
	subs := map[chan T]struct{}{}
	for {
		select {
		case <-b.stop:
			for ch := range subs {
				close(ch)
			}
			return
		case ch := <-b.sub:
			subs[ch] = struct{}{}
			ch <- b.last
		case ch := <-b.unsub:
			if _, ok := subs[ch]; ok {
				delete(subs, ch)
				close(ch)
			}
		case message := <-b.pub:
			for ch := range subs {
				select {
				case ch <- message:
				default:
				}
			}
		}
	}
}

func (b *broker[T]) Stop() {
	close(b.stop)
}

func (b *broker[T]) Subscribe() chan T {
	ch := make(chan T, 1)
	b.sub <- ch
	return ch
}

func (b *broker[T]) Unsubscribe(ch chan T) {
	b.unsub <- ch
}

func (b *broker[T]) Publish(message T) {
	if reflect.DeepEqual(b.last, message) {
		return
	}
	b.last = message
	b.pub <- message
}

func (b *broker[T]) Current() T {
	return b.last
}

참고자료

profile
Simple & Clear
post-custom-banner

0개의 댓글