데이터 흐름에 맞게 알림(Based on Observer Pattern)을 보내 Subscriber가 데이터를 처리할 수 있도록 한다
Observable은 옵저버 패턴을 구현한다. 객체의 상태 변화를 관찰하는 관찰자 목록을 객체에 등록하고,
상태 변화가 있을 때마다 메서드를 호출 -> 관찰자에게 변화를 통지
Observed가 관찰을 통해서 얻은 결과를 의미한다면
Observable은 현재는 관찰되지 않았지만 이론을 통해서 앞으로 관찰할 가능성을 의미한다.
- The Function of Reason(이성의 기능)
Observable은 객체를 생성하지 않고 Factory 패턴으로 생성
public static <T> Observable<T> just(T item1, ..., T item10) {
ObjectHelper.requireNonNull(item1, "item1 is null");
...
ObjectHelper.requireNonNull(item10, "item10 is null");
return fromArray(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10);
}
(에반데) 위와 같이 순서대로 데이터들을 받아오고 Observable Timeline(마블 다이어그램 위쪽 선)으로 발행
val taskObservableCreate = Observable
.create { emitter: ObservableEmitter<Task> ->
emitter.onNext(Task(description = "", isComplete = false, priority = 0))
// 끝내려면 onComplete 호출
emitter.onComplete()
}
.subscribe { Log.d(TAG, "$it") }
내가 코드로 작성하는 것이 어떻게 작동되는 지를 명세하는 것이 아니라,
무엇인지 명세해주는 방식, 이 객체가 어떤 객체인지 정의함
Observable은 Factory로 객체를 정의하고 subscribe 함수로 데이터를 발행시킨다
Disposable disposable() -> onError 이벤트 발생 시 Exception throw, 디버깅할 때 사용
Disposable subscribe(
Consumer <? super T> onNext,
Consumer <? super java.lang.Throwable> onError,
Action onComplete
) -> 이벤트 처리
onComplete 이벤트가 발생되었다면 dispose는 호출할 필요가 없는데...
-> 만약 처리가 안되면 메모리 릭 발생이 되니 CompositeDisposable Class 활용해서 객체가 destroy될 때 관계 해제
Observable은 데이터 무한 발행 쌉가능,
But, Single
은 오로지 하나!
// 1. Observable -> Single
Single.fromObservable(source)
// Single<T>
.subscrbe(...)
// 2. single()
Observable.just("Hello! World!")
.single("Default value")
// Single<T>
.subscribe(...)
// 3. first()
Observable.fromArray(source)
.first("default")
// Single<T>
.subscribe(...)
// 4. empty() -> single()
Observable.empty()
.single("default")
// Single<T>
.subscribe(...)
// 5. take(1) -> single()
Observable.fromArray(source)
.take(1)
.single("default")
// Single<T>
.subscribe(...)
Maybe 클래스는 데이터를 하나만 발행하지만 0개도 발행할 수 있음!
즉, onComplete 메서드를 하나 더 추가해서 구현하는 형식
Subject 클래스는 구독자와 Cold Observable의 특성이 모두 공존한다
데이터를 발행할 수도 있고, 발행된 데이터를 바로 처리할 수도 있음
AsyncySubject
BehaviorSubject
PublishSubject
ReplaySubject
완료되기 전 마지막 데이터에만 관심, 이전 데이터는 무시
// AsyncSubject 객체 생성
val subject = AsyncSubject.create<String>()
// Subject의 구독자 설정
subject.subscribe { Log.d(TAG, it) }
// Subject 발행(이건 무시)
subject.onNext("Hi")
subject.onNext("Hello")
// Subject의 구독자 설정
subject.subscribe { Log.d(TAG, "${it + " Second}") }
// Subject 발행(이건 무시)
subject.onNext("HyunWoo")
subject.onComplete()
subject.onNext("Fake")
subject.subscribe { Log.d(TAG, "${it + " Third}") }
// 최종 결과, onComplete가 호출되기 직전의 마지막 결과만 처리, 이후 onNext는 Fake
// onComplete가 호출된 이후의 subscriber는 최종값만 가져옴
TAG: HyunWoo
TAG: HyunWoo Second
TAG: HyunWoo Third
구독을 하면 가장 최근 값 혹은 기본 값을 넘겨 받음
// BehaviorSubject 생성
val subject = BehaviorSubject.createDefault<String>("DEFAULT")
// 디폴트 값 출력됨(처음에는)
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }
// 발행
subject.onNext("1")
subject.onNext("3")
// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }
// 발행
subject.onNext("5")
subject.onComplete()
// 결과
TAG: First Subscriber -> 6
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5
가장 일반적인 Subject 클래스, subscribe()
함수를 호출하면 값을 발행하기 시작한다.
이후에 구독을 해도 최근의 값을 받아오지 않음(BehaviorSubject와 다른 점)
val subject = PublisherSubject.create<String>()
// 구독 시작
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }
// 발행
subject.onNext("1")
subject.onNext("3")
// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }
// 발행
subject.onNext("5")
subject.onComplete()
// 결과
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5
데이터의 처음부터 끝가지 발행하는 것을 보장해줌
모든 데이터 저장할 때 메모리 릭이 나는 가능서옫 염두해야 됨
val subject = ReplaySubject.create<String>()
// 구독 시작
subject.subscribe { Log.d(TAG, "First Subscriber -> $it") }
// 발행
subject.onNext("1")
subject.onNext("3")
// 구독자 하나 더 추가
subject.subscribe { Log.d(TAG, "Second Subscriber -> $it") }
// 발행
subject.onNext("5")
subject.onComplete()
// 결과
TAG: First Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 1
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5
Cold Observable이긴 한데 여러명의 구독자들에게 쏴주고 싶을 때!
val dataList = listOf("1", "3", "5")
// Cold Observable 만들기
val observableData = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map { dataList[it] }
.take(dataList.length)
// ConnectableObservable 만들기
val dataSource = observableData.publish()
// subscribe(ConnectableObservable, 얘에다가 연결만)
dataSource.subscribe { Log.d(TAG, "First Subscriber -> $it") }
dataSource.subscribe { Log.d(TAG, "Second Subscriber -> $it") }
// ConnectableObservable과 연결
dataSource.connect()
// Hot Observable의 특성 -> 일정 시간이 지나면 그 이후로 발행된 데이터만 받아올 수 있음
Thread.sleep(250L)
dataSource.subscribe { Log.d(TAG, "Third Subscriber -> $it") }
// 결과
TAG: First Subscriber -> 1
TAG: Second Subscriber -> 1
TAG: First Subscriber -> 3
TAG: Second Subscriber -> 3
TAG: First Subscriber -> 5
TAG: Second Subscriber -> 5
TAG: Third Subscriber -> 5