지금까지 Observable, Single, Flowable 등의 데이터 스트림에 대해 알아보았다. 이번 포스팅에서 알아볼 녀석은 Subject
라는 녀석인데, 이는 관찰 가능한 데이터 스트림과 관찰자(구독자)의 성격을 모두 갖고 있는 특이한 녀석이다. 즉 Observable 과 Observer 를 모두 구현한 추상 타입으로, 하나의 소스로부터 다중의 구독자에게 멀티 캐스팅이 가능하다는 특징을 갖고 있다.
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
public Subject() {
}
public abstract boolean hasObservers();
public abstract boolean hasThrowable();
public abstract boolean hasComplete();
@Nullable
public abstract Throwable getThrowable();
@NonNull
public final Subject<T> toSerialized() {
return (Subject)(this instanceof SerializedSubject ? this : new SerializedSubject(this));
}
}
Observer 를 구현한다는 특징때문에, onNext()
, onError()
, onComplete()
등의 이벤트를 수동으로 발생하여 구독자들에게 전달해줄 수 있다.
그럼, 다양한 Subject 의 종류 중 몇 가지에 대해 하나씩 알아보도록 하자.
PublishSubject
는 가장 단순한 Subject
구현체중 하나이다. 구독자들에게 이벤트를 널리 전달할 수 있으며, Hot Observable 특성을 갖고 있어 구독한 시점부터 발생하는 데이터를 전달한다. 따라서 데이터가 모두 발행되고 난 뒤 구독을 하면 아무 데이터도 받아볼 수 없게 된다.
아래 코드를 보면 여러 구독자를 붙여줄 수 있고, 직접 Emitter
이벤트를 발생시킬 수 있는 점을 확인해볼 수 있다.
fun main() {
val src = PublishSubject.create<Int>()
src.subscribe {
println("A : $it")
}
src.subscribe {
println("B : $it")
}
src.onNext(10)
src.onNext(20)
src.onNext(30)
}
A : 10
B : 10
A : 20
B : 20
A : 30
B : 30
그리고 다른 데이터 스트림의 구독자로서의 행세도 할 수 있기 때문에, 구독자로서 전달받은 데이터를 발행하는 식의 동작도 구현할 수 있다. 다른 Observable
로부터 전달받은 데이터를 자신의 구독자에게 전달해주는 것이다.
fun main() {
val src1 = Observable.interval(1, TimeUnit.SECONDS)
val src2 = Observable.interval(500, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<String>()
src1.map { "A : $it" }.subscribe(subject)
src2.map { "B : $it" }.subscribe(subject)
subject.subscribe(System.out::println)
Thread.sleep(3000)
}
B : 0
B : 1
A : 0
B : 2
B : 3
A : 1
B : 4
A : 2
B : 5
두 Observable
로 부터 데이터를 전달받는다는 점에서 Observable
의 merge
연산자와 비슷한 동작을 수행하는 것을 확인해볼 수 있다.
BehaviorSubject
는 특이한 성질을 갖고 있다. PublishSubject
와 동일하게 동작하지만, 새로운 구독자가 들어온 경우 해당 구독자에게 구독 시점에 가장 마지막 데이터를 발행한다는 점이 특징이다. 따라서 가장 최신값을 가져오는 등의 동작을 구현할 때 유용하게 사용될 수 있다.
fun main() {
val src = BehaviorSubject.create<Int>()
src.subscribe { println("첫번째 $it") }
src.onNext(1)
src.subscribe { println("****두번째 $it") }
src.onNext(2)
src.onNext(3)
src.subscribe { println("********세번째 $it") }
src.onNext(4)
src.onComplete()
}
첫번째 1
****두번째 1
첫번째 2
****두번째 2
첫번째 3
****두번째 3
********세번째 3
첫번째 4
****두번째 4
********세번째 4
구독을 한 이후에는 PublishSubject
와 동일하게 모두 수신할 수 있다.
PublishSubject
에 cache
연산자를 적용한 것과 유사한 동작을 수행한다. 새로운 구독자가 생겼을 경우 이전에 발행했던 데이터들을 모두 해당 구독자에게 전달해주는 특징을 갖고 있다.
fun main() {
val src = ReplaySubject.create<Int>()
src.onNext(1)
src.onNext(2)
src.onNext(3)
src.subscribe(System.out::println)
src.onNext(4)
}
1
2
3
4
데이터 '1, 2, 3' 이 발행되고난 뒤 새로운 구독자가 들어왔을 때, 이미 발행했던 '1, 2, 3' 이 다시금 반복되는 것이다. 그러고나서는 PublishSubject
과 다름없이 '4'가 정상적으로 발행되는 것을 확인할 수 있다.
ReplaySubject 를 사용할때는 수많은 혹은 무한한 데이터를 발행하는 소스에 대해 적용하면 자칫 OOM (OutOfMemoryException) 이 발생할 가능성이 높다. 조심히 사용하도록 하자!
AsyncSubject 는 onComplete()
호출 직전에 발행된 아이템만 구독자들에게 전달한다. 즉, onComplete()
가 발생할 때까지는 아무 데이터도 전달하지 않다가 onComplete()
가 발생하면 가장 마지막에 발행된 데이터를 전달하는 것이다.
fun main() {
val src = AsyncSubject.create<Int>()
src.subscribe {
println("A : $it")
}
src.onNext(10)
src.subscribe {
println("B : $it")
}
src.onNext(20)
src.subscribe {
println("C : $it")
}
src.onNext(30)
src.onComplete()
}
A : 30
B : 30
C : 30
구독자는 연달아 3번에 거쳐 들어오게 되고, 그 사이 데이터는 3회 발행됐다. 끝으로 onComplete()
가 호출되자 3마리 구독자에게 가장 마지막으로 발행된 '30'이라는 데이터가 전달됐다.
다른 Subject
들과 비슷하지만, 핵심적인 차이점을 갖고있다. 어떤 구독자가 UnicastSubject 를 구독하기 전까지는 발행하는 데이터들을 계속 버퍼에 저장해뒀다가 구독을 시작할 때 버퍼의 데이터들을 싹 발행하고 버퍼를 깨끗이 비워낸다.
그렇다면 처음으로 들어온 구독자가 모든 데이터들을 소비할 것이고, 두 번째로 들어온 구독자들은 아무 데이터도 받아볼 수 없을 것이다. 따라서 구독자를 딱 하나만 둘 수 있고, 때문에 'Unicast' 라는 용어가 제격인 것이다. (만약 두 개 이상 구독자가들어올 시 IllegalStateException
을 발생시킨다)
fun main() {
val src = UnicastSubject.create<Long>()
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(src)
Thread.sleep(3000)
src.subscribe {
println("A : $it")
}
Thread.sleep(2000)
}
A : 0
A : 1
A : 2
A : 3
A : 4
3초가 흘러갈 동안 Observable 은 '0, 1, 2' 이렇게 3개의 데이터를 발행했다. UnicastSubject
는 이들을 버퍼에 쌓아두고, 첫번째 구독자가 들어왔을 때 한 번에 발행하여 '0, 1, 2' 가 동시에 출력되는 것을 확인할 수 있다. 이후 발행되는 데이터들은 그대로 전달받게 된다.
https://duzi077.tistory.com/178
https://chanhyeok.tistory.com/15
도서 '아키텍처를 알아야 앱 개발이 보인다' - 옥수환 저
bedava arkadaslik sitesi olan https://www.hayatarkadasim.net ve https://kafiyebul.com web sitemiz ile online arkadaş bulabilirsiniz. https://www.bizimmekanlar.com yeni açılan sohbet odalarımıza istediğiniz zaman katılabilirsiniz.
Thank you, many people might find this article very useful. At first, I thought it was written about talking to a person on the internet in general. Actually, I can give people some advice - there are tons of different dating apps, and you never know whether this person is real or fake, so you can use https://www.bizimmekan.xyz/ or https://www.esohbet.net/ to video chat with girls.