RxGo 옵저버 여러개 붙이는 법

박재훈·2022년 10월 3일
1

GO

목록 보기
1/19

RxGo 사용법은 기본적으로는 아래와 같다.

// observable 생성
observable := rxgo.Just("Hello World!")()

// 데이터를 공유할 채널 생성
ch := observable.Observe()

// 채널로부터 데이터 한 번만 수신
item := <-ch
message := item.V.(string)
fmt.Println(message)

// 채널에 데이터가 들어올 때마다 수신
for item := range ch {
	message := item.V.(string)
	fmt.Println(message)
}

이러면 단발성으로밖에 못쓰인다.

그래서 직접 채널을 생성한 뒤 RxGo에 연결시킬 수도 있다.
채널을 생성하고 연결시킬 때는 무조건 rxgo.Item 타입의 채널로 해야 한다.
rxgo.Item은 내부적으로는 아래처럼 되어 있다.

type Item struct {
	V interface{}
	E error
}

값은 interface{} 타입의 V에 저장되고, 에러는 error 타입의 E에 저장된다.
rxgo.Item 타입으로 만들어진 채널로부터 rxgo.FromChannel 함수를 통해 Observable을 만들 수 있다.

ch := make(chan rxgo.Item)

// 채널에 데이터를 넣어주는 과정의 예시
go func() {
	// dataChannel로부터 지속적으로 데이터 받아오기
	for data := range dataChannel {
    	data := <-dataChannel // data의 타입은 뭐든지 될 수 있다.
    	ch <-rxgo.Of(data)    // data를 rxgo.Item 타입으로 감싼 뒤 채널에 넣는다.
    }
}()

// 채널로부터 observable 객체 생성
observable := rxgo.FromChannel(ch)

// 첫번째 옵저버
go func() {
	for item := observable.Observe() {
    	fmt.Println("first", item.V)
    }
}()

// 두번째 옵저버
go func() {
	for item := observable.Observe() {
    	fmt.Println("second", item.V)
    }
}()

// 이러한 방식으로도 할 수 있다.
go func() {
	observable.ForEach(
		func(i interface{}) {
			fmt.Println(idx, i)
		}, func(err error) {
			fmt.Println(idx, err)
		}, func() {
			fmt.Println("done")
		},
	)
}()

위 코드에는 ch 채널로부터 만들어진 observable을 구독하는 2개의 옵저버가 있다.

통상적으로 생각하면 ch에 데이터가 들어올 때마다 각 옵저버들에서 구독을 해야 할 것 같지만 그렇지 않고 한 번에 하나의 옵저버만 데이터를 받는다. 이는 채널을 이용하는 것과 똑같은 원리로써 동작하기에 그런 것으로, 여러 채널이 데이터를 받게끔 하고 싶으면 다른 방법을 써야 한다.

일단, 위처럼 외부 채널로부터 데이터를 받아오는 방식을 hot 방식이라고 한다. 반대로 내부로부터 데이터를 만들어주는 방식을 cold 방식이라고 한다.

cold 방식을 통해 데이터를 만들어주게 되면 구독한 모든 옵저버가 데이터를 한꺼번에 받을 수 있다.
이에 대한 비교를 코드로 구현해보자면

externalChannel := make(chan rxgo.Item)
go func() {
	// observable 외부에서 데이터 생성 (hot)
	for i := 0; ; i++ {
    	externalChannel <- rxgo.Of(i)
        time.Sleep(time.Second)
    }
}()

hotObservable := rxgo.FromChannel(externalChannel)
go SubscribeAndPrint(&hotObservable)  // hot observable의 첫번째 옵저버
go SubscribeAndPrint(&hotObservable)  // hot observable의 두번째 옵저버

coldObservable := rxgo.Defer([]rxgo.Producer{
	func(_ context.Context, next chan<- rxgo.Item) {
    	// observable 내부에서 데이터 생성 (cold)
    	for i := 0; ; i++ {
        	next <- rxgo.Of(i)
            time.Sleep(time.Second)
        }
    },
})

go SubscribeAndPrint(&coldObservable)  // cold observable의 첫번째 옵저버
go SubscribeAndPrint(&coldObservable)  // cold observable의 두번째 옵저버

위 코드를 실행시키면 위의 hot observable을 구독한 두 옵저버는 한 번에 하나만 동작하고, 아래의 cold observable을 구독한 두 옵저버는 둘 다 잘 동작한다. 이를 이용해서 처음에는 cold로 짜면 해결이 될 줄 알았다.

// 외부채널 ch
ch := make(chan rxgo.Item)
go InputData(ch)

observable := rxgo.Defer([]rxgo.Producer{
	func(_ context.Context, next chan<- rxgo.Item) {
    	// 외부채널 ch로부터 데이터 가져와서 전달
    	for data := range ch {
        	next <- rxgo.Of(data)
        }
    },
})

go SubscribeAndPrint(&observable)
go SubscribeAndPrint(&observable)

그런데 이렇게 했더니 두 옵저버들이 한 번에 한 개씩만 데이터를 받는다. 그래서 이 방법은 먹히지 않고, Connectable Observable을 써야 한다.

// 외부채널
ch := make(chan rxgo.Item)
go InputData(ch)

// connectable observable 생성
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// DoOnNext 메소드를 통해야 다중 연결이 된다.
observable.DoOnNext(func(i interface{}) {
	fmt.Println(i)
}
observable.DoOnNext(func(i interface{}) {
	fmt.Println(i)
}

이렇게 하면 여러 옵저버들에게 데이터를 전달할 수 있다.
그런데 하다보면 굳이 이렇게 해야 하나 싶은 게, 단순히 채널을 분배하고자 하는 목적으로만 하자면 새로 만드는 게 훨씬 낫다.

package distribution

type Subscription[T any] struct {
	channel chan T
	isAlive bool
	id      int
	parent  *ChannelDistributor[T]
}

func (s *Subscription[T]) Close() {
	s.isAlive = false
	delete(s.parent.children, s.id)
	close(s.channel)
}

func (s *Subscription[T]) IsAlive() bool {
	return s.isAlive
}

type ChannelDistributor[T any] struct {
	channel  chan T
	children map[int]Subscription[T]
}

func NewChannelDistributor[T any](channel chan T) *ChannelDistributor[T] {
	distributor := &ChannelDistributor[T]{
		channel:  channel,
		children: make(map[int]Subscription[T]),
	}
	go func() {
		for data := range channel {
			for _, child := range distributor.children {
				child.channel <- data
			}
		}
	}()
	return distributor
}

func (c *ChannelDistributor[T]) In() chan<- T {
	return c.channel
}

func (c *ChannelDistributor[T]) Subscribe(f func(value T)) Subscription[T] {
	id := len(c.children)
	child := Subscription[T]{
		channel: make(chan T),
		id:      id,
		parent:  c,
	}
	c.children[id] = child
	go func() {
		for data := range child.channel {
			f(data)
		}
	}()
	return child
}
package main

import (
	"channel-distributor/distribution"
	"fmt"
	"time"
)

func main() {
	distributor := distribution.NewChannelDistributor(make(chan int))

	go func() {
		for i := 0; ; i++ {
			distributor.In() <- i
			time.Sleep(time.Second)
		}
	}()

	subscription := distributor.Subscribe(func(value int) {
		fmt.Println(1, value)
	})

	go func() {
		time.Sleep(time.Second * 5)
		subscription.Close()
	}()

	distributor.Subscribe(func(value int) {
		fmt.Println(2, value)
	})
}

full code: https://github.com/p9595jh/go-channel-distributor
Go 라이브러리 chdist 만들어서 거기로 옮겼음 (링크)

profile
코딩 좋아합니다

0개의 댓글