5. Processor과 Subject

안석주·2022년 1월 2일
0

RxJava

목록 보기
9/10

서론

Processor과 Subject로 넘어왔습니다. Processor와 Subject는 각각 생산자, 소비자의 기능이 함께 들어있는 인터페이스입니다. Processor는 Flowable과 Subscriber의 기능을 동시에 쓸 수 있고, Subject는 Observable과 Observer의 기능을 동시에 가지고 있습니다. 이번 장에서는 Subject에 대해서만 다루도록 하겠습니다. (배압 기능을 제외하곤 기능이 동일합니다)

5.1 Subject란

5.1.1

Subject는 Observable, Observer를 둘다 상속합니다(Observer는 인터페이스입니다).
Subject가 Observer를 구독하면 Observer가 통지하는 데이터를 받는 소비자가 되며, 동시에 이 Observer에 데이터를 통지하는 생산자가 됩니다. 즉 생산자와 소비자의 역할을 한번에 수행할 수 있게됩니다.

따라서 Observable의 연산자를 사용할 수 있고 통지할 때 데이터를 변환할 수 있습니다. 하지만 이런 경우 연산자의 반환값 타입은 Observable이 돼야 하므로 주의가 필요합니다. 또한 Subject는 Observer 인터페이스를 구현해야 하지만, Subject를 상속받은 클래스들(아래 나오지만 PublishSubject, BehaviorSubject등...)에서 구현해줍니다.

5.1.2

Subject의 종류에 대해 알아봅니다~!

클래스설명
PublishSubjectSubject가 데이터를 받은 시점에 데이터를 통지한다
BehaviorSubjectSubject가 마지막으로 받은 데이터를 캐시하고 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다
ReplaySubject기본적으로 Subject가 받은 모든 데이터를 캐시하고, 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다
AsyncSubjectSubject가 완료 통지를 받았을 때 마지막으로 받은 데이터만 통지한다
UnicastSubject하나의 소비자만 구독한다

이처럼 용도가 모두 달라 구분해서 사용해야 합니다. 특히 ReplaySubject처럼 기본적으로 데이터를 버퍼링하는 Subject를 사용시에 메모리 관리를 하지 않으면 메모리가 부족해질 수도 있습니다. 또한 Subject는 Hot 생산자의 특성이 있어 캐시한 데이터를 통지해도, 이 데이터가 캐시에서 사라지지 않습니다.

그리고 위에서 말했듯이 Subject는 Observable을 상속하며, Observer의 통지 메소드는 상속한 클래스인 각 Suject에 구현되어 있습니다. 또한 그 외에 Subject 자체 메소드도 제공합니다.

반환값 타입메소드설명
booleanhasComplete()완료가 통지되면 true를 반환한다
booleanhasThrowable()에러가 통지되면 true를 반환한다
ThrowablegetThrowable()에러가 통지되면 해당 에러 객체를 반환하고, 에러가 통지되지 않으면 null을 반환한다.
booleanhasObservers()구독 중인 Observer가 있으면 true, 완료나 에러 통지 후에는 false를 반환한다

또한 RxJava의 Subject에서는 데이터를 받아 통지할 때 안전성을 높이고자 다음에 설명하는 toSerialize() 메소드를 제공합니다.

toSerialized() 메소드

하나의 Subject는 서로 다른 쓰레드에서 동시에 통지하는 것을 허용하지 않습니다. 여러 쓰레드에서 동시에 통지 메소드를 호출시에 구독하는 소비자에게 통지가 순차적으로 가지 않을 수 있어 RxJava의 안전성을 위협합니다.

그래서 RxJava에서는 이런 안전하지 않은 Subject를 쓰레드 안전으로 만들기 위해 SerializedSubject라는 클래스를 제공합니다. 이 클래스를 사용하면, 여러 쓰레드에서 동시에 통지 메소드를 호출해도 쓰레드 안전한 처리를 할 수 있습니다. 하지만 이 클래스는 패키지 프라이빗한 클래스이므로 직접 접근할 수 없습니다. 따라서 SerializedSubject를 생성하려면 Subject의 toSerializedSubject를 생성하려면 Subject의 toSerialized() 메소드를 호출해야합니다.
FlowableProcessor<T> serialized = processor.toSerialized();

이 SerializedSubject는 내부적으로 원래 Subject를 래핑해 통지 처리를 동기화합니다. 하지만 동기 처리는 비용이 많이 드는 작업이라서 통지량이 많은 상황이면 성능에 영향을 줄 수 있기 때문에, SerializedSubject로 변환시에 적적한 곳에서만 사용해야 합니다.

5.2 PublishSubject

PublishSubject는 이미 통지한 데이터를 캐시하지 않고 구독한 뒤로 받은 데이터만 통지하는 Subject입니다. 소비자(Observer)는 구독한 후에 통지된 데이터는 받을 수 있지만, 구독 전에 통지된 데이터는 받을 수 없습니다. 즉, 소비자는 Subject를 구독한 이후에 통지되는 데이터만 받습니다.

다만 Subject가 이미 처리를 완료한 후에 구독하면 완료나 에러 통지를 소비자가 받을 수 있습니다. 이때 데이터는 받지 않습니다. 아래 예제는 PublishSubject를 생성하고, 몇 가지 다른 통지 시점에 Subscriber가 구독하게 합니다. 먼저 데이터를 통지하기 전에 첫 번째 Subscriber를 추가하고, 다음으로 전체 데이터 중 몇 개의 데이터만을 통지한 후에 두 번째 Subscriber를, 마지막으로 완료 통지 후에 세 번째 Subscriber를 추가합니다.

val subject = PublishSubject.create<Int>()

println("구독 1")
subject.subscribe(DebugObserver("no1"))
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

println("구독 2")
subject.subscribe(DebugObserver("--- no2"))
subject.onNext(4)
subject.onNext(5)

subject.onComplete()

println("구독 3")
subject.subscribe(DebugObserver("no3"))

=> 구독 1
main : no1: 1
main : no1: 2
main : no1: 3
구독 2
main : no1: 4
main : --- no2: 4
main : no1: 5
main : --- no2: 5
main : no1 완료
main : --- no2 완료
구독 3
main : no3 완료

결과를 보면, PublishSubject가 onNext() 메소드에 데이터를 전달하면, 구독하는 Observer에 데이터를 통지합니다. 또한, 통지 도중에 새로운 Observer를 추가하면 해당 Observer는 구독한 후에 통지된 데이터를 받습니다. 하지만 완료 후에는 구독한 Observer는 구독 전에 Observer의 처리가 완료돼도 완료 통지를 받는 것을 볼 수 있습니다. (no3 의경우 완료 후 구독을 했지만, 완료 통지를 받았다!)

보통 안드로이드에서 debounce를 이용해 EditText를 이용할 때, 클릭 버튼을 이용해 throttleFirst를 이용할 때 PublishSubject를 이용할 수 있습니다!

5.3 BehaviorSubject

BehaviorSubject는 마지막으로 통지한 데이터를 캐시하고, 구독 시 캐시된 데이터를 소비자에게 통지하는 Subject입니다. 소비자(Observer)는 구독할 때 캐시된 데이터를 먼저 받고 그 이후에는 통지 시점에 데이터를 받습니다. 하지만 이미 처리가 끝난 뒤에 Subject를 구독하면 완료나 에러만 소비자에게 통지합니다.(처리가 끝난 뒤는 위의 PublishSubject와 동일합니다)

val behaviorSubject = BehaviorSubject.create<Int>()
behaviorSubject.subscribe(DebugObserver("No.1"))

behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.onNext(3)

println("Subscribe no.2 추가")
behaviorSubject.subscribe(DebugObserver("--- No.2"))
behaviorSubject.onNext(4)
behaviorSubject.onNext(5)

behaviorSubject.onComplete()

println("Subscribe no.3 추가")
behaviorSubject.subscribe(DebugObserver("No.3"))

=> main : No.1: 1
main : No.1: 2
main : No.1: 3
Subscribe no.2 추가
main : --- No.2: 3
main : No.1: 4
main : --- No.2: 4
main : No.1: 5
main : --- No.2: 5
main : No.1 완료
main : --- No.2 완료
Subscribe no.3 추가
main : No.3 완료

결과를 보면, 메인에 3이 캐싱됐다가 no.2가 추가됐을 때 마지막 데이터인 3을 통지해줍니다. 즉 통지 도중에 새로운 Observer를 추가하면 해당 Observer는 구독하기 전에 마지막으로 통지된 데이터를 받습니다.

안드로이드에서 back키를 두번 눌렀을 때 앱을 종료하게 할 때 사용합니다!

5.4 ReplaySubject

ReplaySubject는 통지한 데이터를 모두 또는 지정한 범위까지 캐시하고, 구독 시점에 캐시된 데이터를 소비자에게 모두 통지하는 Subject입니다. 소비자는 구독할 때 캐시된 데이터를 먼저 받고 그 이후에는 통지되는 시점에 데이터를 받습니다. 또한 다른 Subject와는 달리 이미 완료한 후 구독하면 캐시된 모든 데이터와 완료나 에러를 소비자에게 통지합니다. 다만 모든 데이터를 캐시하면 통지할 데이터가 계속해서 쌓여 메모리 사용량이 커지니 범위를 설정하는 것이 좋습니다.

val subject = ReplaySubject.create<Int>()
subject.subscribe(DebugObserver("No.1"))

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

println("Subscribe No.2 추가")
subject.subscribe(DebugObserver("--- No.2"))
subject.onNext(4)
subject.onNext(5)

subject.onComplete()

println("Subscribe No.3 추가")
subject.subscribe(DebugObserver("------- No.3"))

=> main : No.1: 1
main : No.1: 2
main : No.1: 3
Subscribe No.2 추가
main : --- No.2: 1
main : --- No.2: 2
main : --- No.2: 3
main : No.1: 4
main : --- No.2: 4
main : No.1: 5
main : --- No.2: 5
main : No.1 완료
main : --- No.2 완료
Subscribe No.3 추가
main : ------- No.3: 1
main : ------- No.3: 2
main : ------- No.3: 3
main : ------- No.3: 4
main : ------- No.3: 5
main : ------- No.3 완료

결과를 보면 1,2,3이 먼저 통지되고, No.2가 추가됐을 때 1,2,3이 추가로 통지됩니다. 후에 완료가 되고 나서 No.3를 구독하면, 이전에 받았던 모든 데이터를 통지해준다(1,2,3,4,5). 그리고 완료 통지를 합니다.

5.5 AsyncSubject

AsyncSubject는 완료할 때까지 아무것도 통지하지 않다가 완료했을 때 마지막으로 통지한 데이터와 완료만 통지합니다! 또한 완료 후 통지한 소비자는 마지막 데이터와 함께 완료통지를 합니다.

val subject = AsyncSubject.create<Int>()
subject.subscribe(DebugObserver("No.1"))

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

println("Subscribe No.2 추가")
subject.subscribe(DebugObserver("--- No.2"))
subject.onNext(4)
subject.onNext(5)

subject.onComplete()

println("Subscribe No.3 추가")
subject.subscribe(DebugObserver("------- No.3"))

=> Subscribe No.2 추가
main : No.1: 5
main : No.1 완료
main : --- No.2: 5
main : --- No.2 완료
Subscribe No.3 추가
main : ------- No.3: 5
main : ------- No.3 완료

결과를 보면 onComplete() 이전의 데이터 5와 함께 No.1, No.2가 완료 통지를 하며 No.3이 완료 후에 구독을 했을 때, 5와 함께 완료 통지를 합니다!

5.6 UnicastSubject

UnicastSubject는 1개의 소비자(Observer)만 구독할 수 잇는 Subject입니다. 다른 소비자가 구독한다면 IllegalStateException 에러가 소비자에게 통지됩니다.
이 Subject가 통지한 데이터는 캐시되며, 소비자가 굳고한 시점에 캐시된 데이터가 통지됩니다. 이는 완료 후에도 마찬가지로 완료 후에 처음으로 소비자가 구독하면 모든 데이터와 함께 완료를 통지합니다. 에러 통지 또한 에러통지 전까지의 모든 데이터를 통지한 뒤 에러통지를 합니다.

 val subject = UnicastSubject.create<Int>()

subject.onNext(1)
subject.onNext(2)

println("Subscriber no.1 추가")
subject.subscribe(DebugObserver("No.1"))

println("Subscriber no.2 추가")
subject.subscribe(DebugObserver("No.2"))

subject.onNext(3)

subject.onComplete()

=> Subscriber no.1 추가
main : No.1: 1
main : No.1: 2
Subscriber no.2 추가
main : kotlin.Unit
main : No.1: 3
main : No.1 완료
java.lang.IllegalStateException: Only a single observer allowed.
	at io.reactivex.rxjava3.subjects.UnicastSubject.subscribeActual(UnicastSubject.java:294)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13099)
	at RxJava5Example.UnicastSubjectKt.main(UnicastSubject.kt:16)
	at RxJava5Example.UnicastSubjectKt.main(UnicastSubject.kt)

결과를 보면 통지한 데이터를 캐시하고 처음으로 구독될 때 캐시된 데이터를 통지합니다. 또한, 도중에 Observer가 추가되면 해당 Observer에는 에러를 통지합니다. 하지만 먼저 통지된 Observer에는 에러를 통지하지 않고 끝까지 처리 후 완료 통지를 합니다.

정리

이번 장에서는 Subject에 대해서 알아봤습니다. 안드로이드에서도 자주 쓰이는 기능이니 복습과 기능에 대한 생각을 정리해보는 것이 좋을 것 같습니다... 왜 PublishSubject가 중복 클릭 방지나 debounce와 함께 edittext에 쓰이는지, 왜 BehaviorSubject가 뒤로가기 버튼에서 쓰이는지에 대해서...

profile
뜻을 알고 코딩하기

0개의 댓글