옵저버 패턴은 객체(subject)의 상태 변화를 관찰하는 관찰자들, 즉 옵저버(observer)들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 각 옵저버에게 변화를 알리는(notify) 디자인 패턴입니다.
옵저버 패턴에서 객체(subject)는 옵저버에 대한 정보가 없습니다. 오직 옵저버가 특정 인터페이스(Interface)를 구현한다는 것 만 알고 있습니다. 즉, 옵저버가 무슨 동작을 하는지 모른다는 것입니다. 게다가 옵저버는 언제든지 새로 추가되거나 제거될 수 있으며 새로운 형식의 옵저버를 추가할 때에도 주제에 전혀 영향을 주지 않습니다. 이러한 관계를 느슨한 결합(Loose coupling)
이라고 합니다.
typealias Observer<T> = (event: T) -> Unit
class EventSource<T> {
// observer 객체들을 담는 리스트
private val observers = mutableListOf<Observer<T>>()
// observer 리스트에 observer 추가
fun addObserver(observer: Observer<T>) {
observers.add(observer)
}
// observer 들에게 이벤트 알림
fun notify(event: T) {
observers.forEach {
it(event)
}
}
}
fun main() {
val eventSource = EventSource<String>()
eventSource.addObserver { println("첫번째 옵저버: $it") }
eventSource.addObserver { println("두번째 옵저버: $it") }
eventSource.notify("Hello")
eventSource.notify("Hi")
}
👉 결과
첫번째 옵저버: Hello
두번째 옵저버: Hello
첫번째 옵저버: Hi
두번째 옵저버: Hi
Rxjava에는 크게 3가지의 역할이 있습니다.
Rxjava는 소비자(Subscriber/Observer)가 생산자(Observable/Flowable/Completable/Maybe/Single)를 소비(subscribe)하는 형태로 만들어져 있습니다.
간단히 말하면, 생산자는 데이터를 생산하고 통지하는 클래스이며, 소비자는 통지된 데이터를 전달받아 이 데이터를 처리합니다.
fun main() {
Observable // 생산자
.just(0,1,2,3) // 생산 연산자
.map { it * 2 } // 변경 연산자
.subscribe { println(it) } // 소비자
}
observer interface 를 구현한 객체를 subscribe 해서 소비자를 추가합니다.
return type 은 Unit 입니다.
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable?) {
// Observable이 데이터 전달할 준비가 되었을 때,
// 작업 취소를 위한 Disposable를 여기서 받음
}
override fun onNext(t: Int?) {
// Observable이 데이터를 전달할 때 호출
}
override fun onError(e: Throwable?) {
// Observable이 에러를 전달할 때 호출. Error시 Complete없이 종료 된다.
}
override fun onComplete() {
// Observable이 완료된 경우 호출
}
}
Observable.just(1, 2, 3, 4).subscribe(observer)
각각의 Consumer를 subscribe해서 소비자를 추가합니다.
Consumer는 메소드 한개짜리 자바 인터페이스이므로 SAM을 통해 람다로 표현할 수 있습니다.
subscribe의 return type은 Disposable 입니다.
val disposable = Observable.just(1, 2, 3, 4).subscribe(
{ println("onNext: $it") }, //onNext
{ println("onError: $it") }, //onError
{ println("onComplete") } //onComplete
)
유한한 아이템을 발행하는 observable 의 경우 onComplete() 호출로 안전하게 종료되지만, 무한하게 아이템을 발행하거나 오랫동안 실행되는 observable 의 경우에는 명시적인 폐기(dispose)가 필요합니다. Disposable.dispose()
메소드를 호출하면 아이템 발행을 중단할 수 있습니다.
val disposable = Observable.just(1, 2, 3, 4).subscribe(
{ println("onNext: $it") }, //onNext
{ println("onError: $it") }, //onError
{ println("onComplete") } //onComplete
)
disposable.dispose()
CompositeDisposable 은 여러 disposable 들을 저장하는 container 입니다. 여러 disposable 들을 한꺼번에 dispose 할 때 사용합니다.
add
, addAll
함수로 disposable 을 추가하고, clear
, dispose
함수로 dispose 처리합니다.
clear 함수는 CompositeDisposable을 재사용 가능한 반면, dispose 함수는 CompositeDisposable 를 재사용할 수 없습니다.
val compositeDisposable = CompositeDisposable()
val disposable1 = Observable.just(1, 2).subscribe { println(it) }
compositeDisposable.add(disposable1)
compositeDisposable.clear()
val disposable2 = Observable.just(3,4).delay(1, TimeUnit.SECONDS).subscribe { println(it) }
compositeDisposable.add(disposable2)
Thread.sleep(2000L)
👉결과
1
2
3
4
val compositeDisposable = CompositeDisposable()
val disposable1 = Observable.just(1, 2).subscribe { println(it) }
compositeDisposable.add(disposable1)
compositeDisposable.dispose()
val disposable2 = Observable.just(3,4).delay(1, TimeUnit.SECONDS).subscribe { println(it) }
compositeDisposable.add(disposable2)
Thread.sleep(2000L)
👉결과
1
2
옥수환, 아키텍처를 알아야 앱 개발이 보인다
[LuckyGg][Design Pattern] 옵저버 패턴(Observer Pattern) 이야기 #1 (예제 포함)
https://velog.io/@p4stel-dev/RxJava-정리-1-Rxjava란-
https://velog.io/@cmplxn/RxJava-입문하기-Kotlin