RxJava, RxAndroid

dongbin is free·2023년 4월 13일
0

Android

목록 보기
5/6

부족했던 안드로이드 지식을 늘리기 위해 여러 강의나 교재, 블로그 등을 참고하며 공부를 하고 있다.

교재 내용 중 RxJava를 적용하는 부분이 나오는데, 프로젝트를 하나 만들어 고쳐나가는 과정을 담는 교재 내용만으로는 명확히 알 수 없었기에 따로 레퍼런스 자료들과 기술 블로그들을 찾아 공부한 내용들을 정리해보고자 한다. (RxJava2 기준으로 작성되어 있어 RxJava3로 마이그레이션 하느라 애먹었다.)


ReactiveX란?

  • ReactiveX - An API for asynchronous programming with observable streams
    공식 홈페이지에 나온 문구로는 관찰 가능한 스트림을 사용하는 비동기 프로그래밍 API 이다.

  • Reactive Programming is a declarative programming paradigm concerned with data streams and the propagation of change 변경 전파와 데이터 흐름에 관련된 선언전 프로그래밍 패러다임이다.

변경 전파 및 데이터 흐름은 데이터가 변경될 때 마다 이벤트를 발생시켜 데이터를 계속하여 전달하는 것을 의미한다고 볼 수 있다.

Reactive Programming을 정리해보자면 주변 환경과의 끊임없는 상호작용을 하는 프로그래밍이라고 할 수 있다. 그렇다면 기존의 프로그래밍 방식과 어떤 차이점이 있을지 궁금할 수 있다.

기존의 명령형 프로그래밍과 반응형 프로그래밍의 차이를 정리해보자

  • 명령형 프로그래밍 (Imperative programming)은 작성된 코드가 정해진 순서대로 실행되는 방식의 프로그래밍 기법이다.
fun imperative() {
    println("call imperative()")
    // 1. 리스트를 생성
    val items = mutableListOf<Int>()
    // 2. 리스트에 1~4까지 아이템 추가
    items.add(1)
    items.add(2)
    items.add(3)
    items.add(4)
    // 3. for문으로 짝수 출력
    for (item in items) {
        if(item % 2 == 0) println(item)
    }
    // 4. 리스트에 5,6 아이템 추가
    items.add(5)
    items.add(6)
}
// println() 이후 리스트에 추가한 행위가 결과에 영향을 주지 않음
2
4
  • 반응형 프로그래밍 (Reactive programming)은 시간 순으로 들어오는 모든 데이터의 흐름을 스트림으로 처리하여, 하나의 데이터 흐름이 다른 데이터 흐름으로 변형되거나, 여러 데이터 흐름이 하나의 데이터 흐름으로 변경될 수 있다.
fun reactive() {
    println("call reactive()")
    // 1. 데이터 스트림을 만듬
    val items = PublishSubject.create<Int>()
    // 2. 데이터 스트림에 1~4 아이템 추가
    items.onNext(1)
    items.onNext(2)
    items.onNext(3)
    items.onNext(4)
    // 3. 짝수만 출력하는 데이터 스트림으로 변형한 뒤 구독
    items.filter { it % 2 == 0 }.subscribe { println(it) }
    // 4. 데이터 스트림에 5,6 아이템 추가
    items.onNext(5)
    items.onNext(6)
}
// 구독 시점 이후 데이터만 observer에 전달하는 PublishSubject로 6만 출력됨
// ReplaySubject를 통해 구독 시점 이전의 데이터도 출력할 수 있음
6

어느정도 감만 잡은 뒤 RxJava로 넘어가보자.


RxJava, RxAndroid

RxJava는 Reactive Extensions 라이브러리의 JVM 구현체로 Reactive Extensions 라이브러리에서 제공하는 기능들을 모든 자바 기반 플랫폼에 사용할 수 있도록 하고, RxAndroid는 RxJava에 안드로이드용 스케줄러 등 몇가지 클래스를 추가하여 개발을 쉽게 할 수 있도록 하는 라이브러리이다.

우선 해당 라이브러리들을 사용하기 위해 앱 수준의 build.gradle 에 의존성을 아래와 같이 추가한다. 최신버전은 맨 아래 각 라이브러리의 깃허브 리드미를 통해 확인할 수 있다. 또는 여기서 검색해보면 확인할 수 있다.

dependencies {
    implementation 'io.reactivex.rxjava3:rxjava:3.1.6'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
}

이제 공홈 도큐먼트 순서에 따라 하나씩 알아보자.


Observable

공식 문서에 따르면 In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. 라고 정의되어 있다.

  • Observable은 아이템을 만들어내는(emit) 주체로, 스트림을 통해 만들어낸 아이템을 내보낸다. Observable이 만드는 스트림은 특정 조건을 만족하고 종료되어 더 이상 아이템을 만들어낼 수도, 종료되지 않고 유지되어 계속 아이템을 만들 수도 있다.
  • Observer은 Observable에서 만들어진 이벤트에 반응하여, 이벤트를 받았을 때 수행할 작업을 정의한다. Observer가 Observable에서 발생하는 이벤트를 관찰해야 하는데, 이를 구독(subscribe)라고 표현한다.

Observable은 다음 3가지 이벤트를 사용하여 동작한다.

  • onNext() - Observable이 새로운 아이템을 발행할 때 마다 메서드 호출
  • onComplete() - 오류가 발생하지 않은 경우, 마지막 onNext()를 호출한 후 해당 메서드 호출
  • onError() - 기대하는 아이템이 생성되지 않았거나, 다른 이유로 오류 발생 시 호출

위 이벤트들은 Emitter라는 interface에 의해 선언되며, 아이템이이나 오류 내용을 발행 시 null을 발행할 수 없다. 이러한 observable은 subscribe() 메서드를 통해 구독할 수 있다.

RxJava에서 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있다. 그 외에도 연산자를 통해 Observable을 변환하거나, 필터링, 결합, 예외처리 등을 할 수 있다.

생성 연산자

Create 연산자

Observable.create() 로 Emitter를 이용해 직접 아이템을 발행하고, onComplete(), onError의 알림을 직접 설정할 수 있다.

val observable = Observable.create<String> { emitter ->
    emitter.onNext("Hello")
    emitter.onNext("Bye")
//    emitter.onError(Throwable())
    emitter.onComplete()
}
observable.subscribe { data -> println(data) }
Hello
Bye

Defer 연산자

Observer가 구독할 때까지 Observable의 아이템 발행을 지연시키는 역할을 한다. subscribe() 를 호출할 때 Observable이 아이템을 발행한다.

val format = SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.KOREA)
val justObservable = Observable.just(System.currentTimeMillis())
val deferObservable = Observable.defer { Observable.just(System.currentTimeMillis()) }

println("현재 시각 ${format.format(System.currentTimeMillis())}")
Thread.sleep(3000)
println("현재 시각 ${format.format(System.currentTimeMillis())}")
justObservable.subscribe { data -> println("just = ${format.format(data)}") }
deferObservable.subscribe { data -> println("defer = ${format.format(data)}") }
현재 시각 2023-04-16 17:32:31
현재 시각 2023-04-16 17:32:34
just = 2023-04-16 17:32:31
defer = 2023-04-16 17:32:34

Empty 연산자

아이템을 발행하지 않는 Observable을 생성한다. 오직 onComplete() 를 호출한다.
즉. 아이템을 발행하지 않고 스트림을 종료시킨다.

Never 연산자

empty() 와 달리 onComplete() 를 호출하지 않는다. 즉, 아이템을 발행하지 않고 스트림도 종료시키지 않는다.

둘의 비교를 예시를 통해 알아보자.

Observable.empty<Any>().doOnComplete { println("empty::onComplete()") }
    .doOnTerminate { println("empty::종료") }.subscribe()
Observable.never<Any>().doOnComplete { println("never::onComplete()") }
    .doOnTerminate { println("never::종료") }.subscribe()
empty::onComplete()
empty::종료

From 연산자

fromArray, fromCallable, fromFuture, fromIterable, fromPublisher 처럼 다양한 오브젝트들을 from으로 시작하는 연산자를 통해 Observable로 변환할 수 있다.

  • fromArray() - 배열의 아이템을 Observable로 바꾸어 아이템을 순차적으로 발행한다.
val array = arrayOf("Morning", "taxi", "is", "good")
// fromArray() 사용 시 *을 통해 데이터를 읽어올 수 있도록 지정해야 함
// 대체로 array.toObservable()를 하면 내부에서 자체적으로 fromArray(*this)를 호출
Observable.fromArray(*array).subscribe { data -> print("$data ") }
Morning taxi is good 

  • fromCallable() - Callable을 Observable로 변환하고 비동기적으로 아이템을 발행한다.
val callable = Callable { "Five from operator done\n" }
Observable.fromCallable(callable).subscribe { data -> println(data) }
Five from operator done

  • fromFuture() - Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 호출한 값을 반환한다. (호출 시 Future의 작업이 끝나기 전까지 스레드는 블로킹 -> 비동기적인 작업의 결과를 구할 때 사용, RxJava에서는 Executor를 직접 다루기보다 스케줄러 사용을 권장)
val future = Executors.newSingleThreadExecutor().submit<String> { "I think bus is perfect" }
Observable.fromFuture(future).subscribe { data -> println(data) }
I think bus is perfect

  • fromPublisher() - Publisher (잠재적인 데이터 발행을 제공하는 생산자로, Subscriber로부터 요청을 받아야 발행) 를 Observable로 변환한다.
val publisher = Publisher { subscriber ->
    subscriber.onNext("Metro, too")
    subscriber.onComplete()
}
Observable.fromPublisher(publisher).subscribe { data -> println(data) }
Metro, too

  • fromIterable() - HashSet, ArrayList와 같은 Iterable 자료 구조 클래스를 Observable로 변환한다.
val iterable = arrayListOf("Evening", "taxi", "is", "expensive")
Observable.fromIterable(iterable).subscribe { data -> print("$data ") }
Evening taxi is expensive 

Interval 연산자

주어진 시간 간격으로 0부터 순서대로 정수를 발행하는 Observable을 생성한다. 구독을 해지하기 전까지 아이템을 계속해서 발행함으로 불필요할 때 dispose() 해야 한다.

val intervalObservable = Observable.interval(1, TimeUnit.SECONDS).subscribe { println(it) }
Thread.sleep(3000)
intervalObservable.dispose()
0
1
2

Just 연산자

Observable.just()는 해당 아이템을 그대로 발행하는 Observable을 생성한다. 인자로 넣은 아이템을 차례로 발행하며, 한개의 아이템 또는 여러개의 아이템을 넣을 수 있다.

val observable = Observable.just("item", 1, true, "four")
observable.subscribe { data -> print("$data ") }
item 1 true four 

Range 연산자

특정 범위의 정수를 순서대로 발행하는 Observable을 생성한다. 특정 범위를 정의하기에 발행이 완료되면 스트림을 종료시킨다.

Observable.range(1, 5).subscribe { print("$it ") }
1 2 3 4 5

Timer 연산자

특정 시간동안 지연시킨 뒤 Long타입 0L을 발행하는 Observable을 생성한다. 발행 후 스트림을 종료시킨다.

runBlocking {
    val timerObservable = Observable.timer(3, TimeUnit.SECONDS)
    timerObservable.subscribe { println(it) }
    delay(4000)
}
0

변형 연산자

Buffer 연산자

Observable이 발행하는 아이템을 묶어 컬렉션으로 변환한다. 오류가 발생한 경우 이미 발행된 아이템들이 버퍼에 포함되더라도 발행하지 않고 즉시 오류를 전달한다. 매개변수로 몇 개씩 담아 발행할지 지정한다.

Observable.range(0, 8).buffer(4).subscribe {
    print("emit::")
    for (item in it) print("$item ")
    println()
}
emit::0 1 2 3 
emit::4 5 6 7 

FlatMap 연산자

Observable이 발행한 아이템을 다른 Observable으로 변환한 후, 이를 단일 Observable으로서 발행한다.

val observable = Observable.just("a", "b", "c")
observable.flatMap { 
    item -> Observable.just(item + "1", item + "2") }.subscribe {
	println(it)
}
a1
a2
b1
b2
c1
c2

Map 연산자

Observable이 발행한 아이템을 각각 함수를 적용시킨다.

val observable = Observable.just("a", "b", "c")
observable.map { it + it }.subscribe {
	println(it)
}
aa
bb
cc

Scan 연산자

Observable이 발행한 아이템을 다음 발행되는 아이템의 첫번째 인자로 전달한다.

val observable = Observable.just("a", "b", "c")
observable.scan { prev, item -> prev + item }.subscribe {
	println(it)
}
a
ab
abc

필터 연산자

Debounce 연산자

특정 시간동안 다른 아이템이 발행되지 않을 때에만 아이템을 발행하도록 한다. 반복적으로 빠르게 발행된 아이템을 필터링할 때 유용하다.

val observable = Observable.create<String> { emitter ->
	emitter.onNext("1")
	Thread.sleep(100)
	emitter.onNext("2")
	emitter.onNext("3")
	emitter.onNext("4")
	Thread.sleep(100)
	emitter.onNext("5")
}.debounce(10, TimeUnit.MILLISECONDS).subscribe {
	println(it)
}
1
4

Distinct 연산자

이미 발행한 아이템을 중복 발행하지 않도록 필터링한다.

val observable = Observable.just(1, 2, 1, 4, 2)
observable.distinct().subscribe {
	println(it)
}
1
2
4

ElementAt 연산자

발행되는 아이템 시퀸스에서 특정 인덱스를 필터링한다.

val observable = Observable.just(1, 2, 3,4, 5)
   observable.elementAt(3).subscribe { println(it) }
4

Filter 연산자

조건식이 true인 경우에만 아이템을 발행한다.

val observable = Observable.just(1, 2, 3, 4, 5, 6)
observable.filter { item -> item % 2 == 0 }.subscribe { println(it) }
2
4
6

Sample 연산자

일정 시간 간격으로 Observable이 최근에 발행한 아이템을 발행한다.

val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
Thread.sleep(1000)
observable.sample(300, TimeUnit.MILLISECONDS).subscribe { println(it) }
Thread.sleep(1000)
2
5
8

Skip 연산자

Observable이 발행하는 n개의 아이템을 무시한 후 이후의 아이템을 발행하는 연산자이다.

val observable = Observable.just(1, 2, 3, 4, 5, 6)
observable.skip(2).subscribe { println(it) }
3
4
5
6

Take 연산자

Skip과 반대로 처음 발행하는 n개의 아이템만 발행하는 연산자이다. 이후 발행되는 아이템은 무시한다.

val observable = Observable.just(1, 2, 3, 4, 5, 6)
observable.take(3).subscribe { println(it) }
1
2
3

All 연산자

발행되는 모든 아이템이 특정 조건을 만족해야 true를 반환한다. 그 외에는 모두 false를 반환한다.

val observable = Observable.just(1, 2, 3, 4)
observable.all { item -> item > 0 }.subscribe(System.out::println)
true

결합 연산자

CombineLatest 연산자

두 개의 Observable 중 하나의 Observable에서 아이템이 발행될 때, 두 Observable에서 가장 최근에 발행한 아이템을 결합하여 하나로 발행한다.

val intObservable = Observable.create<Int> { emitter ->
	thread {
		for (i in 1 .. 10) {
			emitter.onNext(i)
			try {
				Thread.sleep(500)
			} catch (_: Exception) {}
		}
	}
}

val strObservable = Observable.create { emitter ->
	thread {
		for(i in 1 .. 10) {
			emitter.onNext((i+64).toChar())
			try {
				Thread.sleep((i * 100).toLong())
			} catch (_: Exception) {}
		}
	}
}

Observable.combineLatest(intObservable, strObservable) {
	num, str -> num.toString() + str 
}.subscribe { println(it) }
1A
1B
1C
2C
2D
3D
3E
4E
4F
5F
5G
6G
6H
7H
8H
8I
9I
10I
10J

Zip 연산자

여러 Observable을 하나로 결합하고, 지정된 함수를 통해 하나의 아이템으로 발행한다. zip은 1:1 순서를 지켜 아이템을 발행한다.

val intObservable = Observable.create<Int> { emitter ->
	thread {
		for (i in 1 .. 10) {
			emitter.onNext(i)
			try {
				Thread.sleep(500)
			} catch (_: Exception) {}
		}
	}
}

val strObservable = Observable.create { emitter ->
	thread {
		for(i in 1 .. 10) {
			emitter.onNext((i+64).toChar())
			try {
				Thread.sleep((i * 100).toLong())
			} catch (_: Exception) {}
		}
	}
}

Observable.zip(intObservable, strObservable) {
	num, str -> num.toString() + str
}.subscribe { println(it) }
1A
2B
3C
4D
5E
6F
7G
8H
9I
10J

Merge 연산자

여러 Observable을 결합해 하나의 Observable에서 발행하는 것처럼 사용할 수 있다.

val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(250, TimeUnit.MILLISECONDS)
Observable.merge(observable1, observable2).subscribe { println(it) }
Thread.sleep(700)
0
1
0
2
3
1
4
5

subscribe()

실제로 아이템을 발행하기 위해 just, create 등으로 데이터 스트림을 정한 후 subscribe()를 호출해야 실제로 아이템이 발행된다.

subscribe()는 Disposable 인터페이스 객체를 리턴하며, Disposable 인터페이스에는 dispose(), isDispose() 두 가지 메서드가 있고, dispose() 를 통해 Observable가 더 이상 아이템을 발행하지 않도록 구독을 해지할 수 있다.

Observable이 onComplete() 를 호출할 경우 자동으로 dispose() 를 호출하여 Observable와 Observer의 관계를 끊는다. 즉 정상적으로 종료 시 dispose를 호출할 필요가 없다.


그 외 다양한 Observable

Observable 외에 Single, Maybe, Completable와 같이 특별한 스트림이 존재한다.

Single

Single은 단 하나의 아이템만 발행할 수 있다. create()를 사용할 경우 Emitter를 통해 발행한다. 단 한번만 발행하기 때문에 onNext()+onComplete() 대신 onSuccess()를 통해 아이템 발행이 완료됨을 알려준다.

오류처리는 onError()를 통해 Observer에게 전달한다.

val single = Single.create { emitter ->
    emitter.onSuccess("혼자 왔어?")
}
single.subscribe { data -> println(data) }

Maybe

Single과 비슷하지만, 아이템을 발행하지 않을 수도 있다는 차이가 존재한다. 아이템을 발행한 경우 onSuccess() 를 호출, 발행하지 않을 때는 onComplete() 를 호출한다.

val maybe = Maybe.create { emitter ->
    emitter.onSuccess("어 싱글이야")
}
maybe.subscribe { data -> println(data) }

Completable

아이템을 발행하지 않고, 정상적으로 실행이 종료되었는지 확인할 때 사용한다. 발행을 하지 않기 때문에 onComplete()onError() 만 사용한다.

val completable = Completable.create { emitter ->
    emitter.onComplete()
}
completable.subscribe { println("조용해라") }

Hot and Cold Observables

When does an Observable begin emitting its sequence of items? It depends on the Observable.
Observable은 연속된 아이템을 언제 발행하기 시작할까? 이는 Observable에 따라 다르다.

  • Hot Observable : 아이템이 생성되자마자 발행되기 시작한다. ex. 라이브방송
  • Cold Observable : Observer가 구독할 때까지 아이템을 발행하지 않는다. ex. 유튜브영상

Cold Observable은 구독자가 Observable이 발행하는 아이템 전체를 구독할 수 있도록 보장받음

뜨거워서 바로 흘려보내야 하고, 얼어있어서 해빙(구독)해야 흘려보낼 수 있다고 생각하자.

Hot Observable

아이템 발행이 시작된 이후 모든 Observer에게 동시에 같은 아이템을 발행하기에, 늦게 구독한 Observer은 구독 이전에 발행된 아이템을 놓칠 수 있다.
이를 정리해보자면,

  • 구독자의 존재 여부와 상관없이 아이템을 발행하는 Observable
  • 마우스 이벤트, 키보드 이벤트, 시스템 이벤트 등이 주로 사용됨
  • 멀티 캐스팅도 포함됨
  • 구독 시점으로부터 발행하는 값을 받는 것을 기본으로 함

Cold Observable

구독을 요청하면 아이템을 발행하기 시작하기에 아이템이 처음부터 끝까지 발행되며, 임의 종료가 아닌 이상 여러 요청에도 아이템 전부를 받는 것을 보장한다.
이를 정리해보자면,

  • 일반적인 Observable의 형태
  • 누가 구독하기 전에는 아이템을 발행하지 않음
  • 일반적인 웹 요청(HTTP GET), DB쿼리 등이 사용되며 내가 요청하면 결과를 받는 과정을 거침
  • 처음부터 발행하는 것을 기본으로 함

Hot Observable vs Cold Observable

Cold Observable은 스트림을 분기시키는 성질을 갖고있지 않는다. 그로 인해 Cold Observable을 여러번 구독하는 경우, 각각 별도의 스트림이 생성되고 할당됨(different instances of emitted items)

반면, Hot Observable은 스트림을 분기시키는 성질이 있어 스트림의 분기가 필요한 경우 Hot Observable을 사용한다. 즉 하나의 스트림을 여러번 구독해야 하는 경우에 사용하게 된다.

일반적인 Observable(Cold Observable)의 예시를 코드를 통해 알아보자. 아래는 interval 연산자를 통해 1초마다 아이템을 발행하는 Cold Observable 예시이다.

val observable = Observable.interval(1, TimeUnit.SECONDS)
observable.subscribe { data -> println("first subscribe : $data") }
Thread.sleep(2000)
observable.subscribe { data -> println("second subscribe : $data") }
Thread.sleep(2000)
first subscribe : 0
first subscribe : 1
first subscribe : 2
second subscribe : 0
first subscribe : 3
second subscribe : 1

첫 번째 구독 이후 2초 뒤 새로 구독을 하였는데, 두 Observer 모두 아이템을 처음부터 받았음을 확인할 수 있다. 같은 Observable 인스턴스를 구독했음에도 처음부터 시작하는 이유는 각각의 Observer마다 다른 소스 Observable이 생성되기 때문이다.

그렇다면 어떻게 일반적인 Observable을 Hot Observable로 변환할까?
이는 ConnectableObservableSubject 클래스를 통해 변환할 수 있다.

ConnectableObservable

ConnectableObservable은 Cold Observable을 Hot Observable으로 변환할 수 있는 observable이다. 서로 다른 Observer에 대한 단일 소스 Observable 이다.

아이템 발행 시 subscribe() 연산자를 호출하기 전, publish(), connect() 연산자를 호출하여 Hot Observable로 변환하여 사용할 수 있다는 점이다.

publish() 연산자를 통해 Observable을 Hot Observable로 변환한다. 이 때 publish 만으로는 아이템 replay가 활성화되지 않고, connect() 연산자를 호출할 때 아이템을 발행하기 시작한다.

val hotObservable1 = ConnectableObservable.interval(1, TimeUnit.SECONDS).publish()
hotObservable1.connect()
hotObservable1.subscribe { data -> println("first subscribe : $data") }
Thread.sleep(2000)
hotObservable1.subscribe { data -> println("second subscribe : $data") }
Thread.sleep(2000)
first subscribe : 0
first subscribe : 1
first subscribe : 2
second subscribe : 2
first subscribe : 3
second subscribe : 3

첫 번째 구독 이후 2초 뒤 새로 구독을 하였는데 새로 구독한 Observer은 이전에 발행된 아이템을 받지 못하고 2부터 받은 것을 확인할 수 있다.

이 때 connect() 연산자가 아닌 autoconnect() 연산자를 사용할 수도 있는데, 이는 해당 연산자의 매개변수의 값만큼의 Observer가 붙어야만 아이템을 발행하기 시작한다.


Subject

Subject는 Cold Observable을 Hot Observable로 변환시켜주며, Observable의 속성과 Observer의 속성을 모두 갖고 있다.

공식 문서에 따르면 Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

즉, Observer이기에 하나 이상의 Observable을 구독할 수 있으며, 동시에 Observable이기에 아이템들을 하나 하나 거치며 재발행하고 관찰하며, 또한 새로운 아이템들을 발행할 수도 있다.

이는 Subject 클래스가 Observable 클래스를 상속(확장)하는 동시에 Observer 인터페이스를 구현하기 때문에 가능하다.

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    @CheckReturnValue
    public abstract boolean hasObservers();

Subject에는 총 4가지 종류가 있다. 각각의 Subject는 특정 상황에 맞춰 설계되어 있다고 한다. 하나씩 살펴보도록 하자.

AsyncSubject

Observable으로부터 발행된 마지막 값(만)을 얻어 전달할 수 있는 Subject이고, 오직 Observable이 완료된 이후에 동작한다. 즉 onComplete() 되기 전 onNext() 의 값만 처리한다.

또한 맨 마지막 값을 뒤 이어 오는 Observer에게 전달하는데, 만약 Observable이 오류로 인해 종료될 경우, AsyncSubject는 아무 아이템도 발행하지 않고 발생한 오류를 그대로 전달한다.

코드를 통해 알아보자.

우선 Observable로서의 AsyncSubject 예시 코드이다.

val asyncSubject = AsyncSubject.create<Long>()
asyncSubject.subscribe { data -> println("first subscribe : $data") }
asyncSubject.onNext(1)
asyncSubject.onNext(2)
asyncSubject.subscribe { data -> println("second subscribe : $data") }
asyncSubject.onNext(3)
asyncSubject.onComplete()
first subscribe : 3		// 마지막 값 전달
second subscribe : 3	// 마지막 값 전달


Observer로서의 AsyncSubject 예시 코드이다.

val observable = Observable.just(1, 2, 3, 4)
val observerAsyncSubject = AsyncSubject.create<Int>()
observerAsyncSubject.subscribe { data -> println("subscribe : $data") }
observable.subscribe(observerAsyncSubject)
subscribe : 4

BehaviorSubject

Observer가 구독을 하면 Observable이 가장 최근에 발행한 아이템 혹은 기본 값을 전달하고, 이후 소스 Observable에 의해 발행된 항목들을 계속 전달한다.

만약 소스 Observable이 오류로 인해 종료될 경우, BehaviorSubject는 아무 아이템도 발행하지 않고 발생한 오류를 그대로 전달한다.

val behaviorSubject = BehaviorSubject.createDefault(777)
behaviorSubject.subscribe { data -> println("first subscribe : $data") }
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.subscribe { data -> println("second subscribe : $data") }
behaviorSubject.onNext(3)
behaviorSubject.onComplete()
first subscribe : 777	// 발행 전 기본 값이 전달됨
first subscribe : 1
first subscribe : 2
second subscribe : 2	// 구독 직전 값이 전달됨
first subscribe : 3
second subscribe : 3

PublishSubject

가장 평범한 형태의 Subject 클래스이다. 구독 이후에 Observable이 발행한 아이템들만 Observer에게 전달한다. 주의할 점은 생성 시점에서 즉시 아이템을 발행하기에 Subject가 생성되는 시점과 Observer가 구독하는 시점 사이에 발행된 아이템들을 잃어버릴 수 있다.

발행된 모든 아이템을 보장받기 위해서는 create() 를 통해 명시적으로 Cold Observable을 구성(발행 전 모든 Observer가 구독하였는지 체크)하거나, 마지막으로 소개할 ReplaySubject을 사용해야 한다.

만약 소스 Observable이 오류로 인해 종료될 경우, PublishSubject는 아무 아이템도 발행하지 않고 발생한 오류를 그대로 전달한다.

이에 대한 예시 코드는 해당 글 맨위에서 확인할 수 있다.

ReplaySubject

Observer가 구독한 시점과 관계없이 Observable이 발행한 모든 아이템들을 모든 Observer에게 전달한다. 즉, Cold Observable 처럼 동작한다. 이로 인해 모든 데이터 내용을 저장해두는 과정에서 Memory Leak 이 발생할 수 있다는 가능성을 염두해야 한다.

ReplaySubject는 이를 위해 몇 가지 생성자 오버로드 createWith***() 를 제공하여, replay buffer 의 크기가 특정 이상으로 증가할 경우, 또는 처음 발행 후 지정한 시간이 경과할 경우 오래된 항목들을 제거한다.

val replaySubject = ReplaySubject.create<Int>()
replaySubject.subscribe { data -> println("first subscribe : $data") }
replaySubject.onNext(1)
replaySubject.onNext(2)
replaySubject.subscribe { data -> println("second subscribe : $data") }
replaySubject.onNext(3)
replaySubject.onComplete()
first subscribe : 1
first subscribe : 2
second subscribe : 1
second subscribe : 2
first subscribe : 3
second subscribe : 3

Disposable

dispose()

앞전에 설명했듯 Observable 객체에서 발행할 아이템을 정의하고 subscribe() 를 통해 스트림을 생성하고 발행한다. 이를 호출한 후에는 Disposable 객체가 반환된다.

// Observable.java
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

Observable이 발행하는 아이템의 개수가 유한하다면, 모두 발행된 후 onComplete() 가 호출되고 안전하게 종료될 것이다. 하지만 아이템을 무한히 발행하거나 오랫동안 실행되는 Observable의 경우 제대로 종료되지 않는다면 Memory Leak 이 발생할 수 있다.

Observable에 더 이상 구독이 필요하지 않을 경우 Disposable.dispose() 를 호출하여 아이템의 발행을 중단할 수 있다. onComplete() 호출 시에는 dispose() 가 자동으로 호출된다.

val observable = Observable.interval(1, TimeUnit.SECONDS)
val disposable = observable.subscribe { data -> println(data) }

thread {
    Thread.sleep(4000)
    disposable.dispose()
}
0
1
2
3

CompositeDisposable

Observable에 대해 Observer가 여러 개가 있다면, 이를 폐기할 때마다 각 Disposable 객체의 dispose()를 호출해야 한다. 이런 번거로운 작업을 피하기 위해 CompositeDisposable 에 각 Disposable을 추가한 후 한번에 폐기할 수 있다.

val observable = Observable.interval(1, TimeUnit.SECONDS)
val disposable1 = observable.subscribe { data -> println(data) }
val disposable2 = observable.subscribe { data -> println(data) }
val compositeDisposable =
    CompositeDisposable().apply { addAll(disposable1, disposable2) }

thread {
    Thread.sleep(4000)
    compositeDisposable.dispose()
}

Scheduler

Observable이 동작하는 Thread

Observable은 기본적으로 subscribe하는 Thread에서 동작한다.
interval, timer로 Observable을 생성하거나, delay 연산자를 사용하는 경우 외에는 subscribe가 완료될 때까지 thread가 block된다.

이는 곧 subscribe { } 블럭에서 모든 데이터를 발행하고 처리해야만 해당 블럭을 벗어날 수 있다는 것이다.

코드로 subscribe하는 thread에 따라 currentThread()의 이름이 어떻게 나오는지 확인해보자.
아래 코드는 Observable에서 map 처리시에 thread 이름을 찍고 subscribe에서도 thread 이름을 찍는다.
이후 새 Thread를 생성하여 같은 Observable을 subscribe 한다.

runBlocking {
	val observable = Observable.just(1, 2, 3).map {
		println("map : $it - ${Thread.currentThread().name}")
	}
	observable.subscribe { println("First : $it - ${Thread.currentThread().name}") }
	println("--------------------------")
	thread { observable.subscribe { println("Second : $it - ${Thread.currentThread().name}") } }
	delay(1000)
}
map : 1 - main
First : kotlin.Unit - main
map : 2 - main
First : kotlin.Unit - main
map : 3 - main
First : kotlin.Unit - main
--------------------------
map : 1 - Thread-0
Second : kotlin.Unit - Thread-0
map : 2 - Thread-0
Second : kotlin.Unit - Thread-0
map : 3 - Thread-0
Second : kotlin.Unit - Thread-0

이를 통해 Observable과 Subscribe는 subscribe를 수행한 thread에서 실행된다.
또한 subscribe { } 블럭이 끝나야만 다음 라인이 실행된다.

하지만, 필요에 따라 subscribe를 호출하는 thread가 아닌 다른 thread에서 작업을 수행하기 위해, RxJava에서는 여러 Scheduler를 제공하고 있다.

Scheduler의 종류

  • Scheduler.io

    • 파일 / 네트워크 IO 작업을 할 때 용도로 사용한다.
    • 내부적으로 cachedPool을 사용하기 때문에 thread가 동시에 계속 늘어나면서 생성 가능하며, 유휴 thread가 있을 경우 재활용된다.

  • Scheduler.computation()

    • CPU 의존적인 계산을 수행하기 위한 thread pool을 사용한다.
    • 코어개수만큼 thread pool을 만들어 사용한다.

  • Scheduler.newThread()

    • new Thread(), kotlin의 thread 처럼 새로운 Thread를 하나 만들어 사용한다.

  • Scheduler.single()

    • singleThreadPool을 사용하여, 해당 스케줄러로 여러 작업 수행 시 Queuing되어 순서가 보장된다.

  • Scheduler.trampoline()

    • 호출을 수행한 thread를 이용하여 수행한다.
    • 호출한 thread 또한 단일 thread로 여러 작업 수행 시 Queuing되어 순서가 보장된다.
    • 호출한 thread를 사용하기에, Queuing된 모든 작업이 끝나야 다음 라인이 수행될 수 있다.

  • Scheduler.from()

    • Executor를 전달하여 새로운 Scheduler를 생성할 수 있다.

  • AndroidSchedulers.mainThread()

    • RxAndroid 사용 시 mainThread()에서 수행하기 위한 스케줄러

코드를 통해 사용되는 thread를 확인해보자.

 runBlocking {
    val observable = Observable.just(1)
    // Schedulers.io()
    observable.subscribeOn(Schedulers.io())
        .subscribe { println("io() - ${Thread.currentThread().name}") }
    // Schedulers.computation()
    observable.subscribeOn(Schedulers.computation())
        .subscribe { println("computation() - ${Thread.currentThread().name}") }
    // Schedulers.newThread()
    observable.subscribeOn(Schedulers.newThread())
        .subscribe { println("newThread() - ${Thread.currentThread().name}") }
    // Schedulers.single()
    observable.subscribeOn(Schedulers.single())
        .subscribe { println("single() - ${Thread.currentThread().name}") }
    // Schedulers.trampoline()
    observable.subscribeOn(Schedulers.trampoline())
        .subscribe { println("trampoline() - ${Thread.currentThread().name}") }
    // Schedulers.from()
    val executors = Executors.newFixedThreadPool(2)
    val schedulers = Schedulers.from(executors)
    observable.subscribeOn(schedulers)
        .subscribe { println("from - ${Thread.currentThread().name}") }
    delay(1000)
computation() - RxComputationThreadPool-1
io() - RxCachedThreadScheduler-1
newThread() - RxNewThreadScheduler-1
single() - RxSingleScheduler-1
trampoline() - main
from - pool-2-thread-1

Scheduler간 비교

Schedulers.io() vs Schedulers.computation()

둘 다 비동기로 다른 thread에서 동작하도록 한다.

runBlocking {
    val observable = Observable.just(1)

    println("start Schedulers.io()")
    observable.subscribeOn(Schedulers.io()).subscribe {
        runBlocking { delay(100) }
        println("$it-Schedulers.io() - ${Thread.currentThread().name}")
    }
    println("start Schedulers.computation()")
    observable.subscribeOn(Schedulers.computation()).subscribe {
        runBlocking { delay(100) }
        println("$it-Schedulers.computation() - ${Thread.currentThread().name}")
    }
    println("done")
    delay(100)
}
start Schedulers.io()
start Schedulers.computation()
done
1-Schedulers.computation() - RxComputationThreadPool-1
1-Schedulers.io() - RxCachedThreadScheduler-1

"start Schedulers.io()", "start Schedulers.computation()", "done"이 출력되고 나서 Thread 이름이 출력된다. 이를 통해 두 Scheduler 모두 비동기로 다른 Thread에서 동작하도록 한다는 것을 확인할 수 있다.

Schedulers.single() vs Schedulers.trampoline()

둘 다 single thread를 사용하여 순서룰 보장하지만, single()은 worker thread를 하나 생성하여 해당 thread queue에 작업을 넘겨주는 방식이고, trampoline()은 호출한 thread의 queue에 넘겨준다는 차이가 있다.

둘 다 순서를 보장하지만, single은 single을 사용하는 Observable간의 순서만을 보장하고, trampoline은 trampoline을 사용한 observable을 포함한 다른 코드에까지 영향을 준다.

runBlocking {
    val observable = Observable.just(1, 2, 3)

    println("start Schedulers.single()-#1")
    observable.subscribeOn(Schedulers.single()).subscribe {
        runBlocking { delay(100) }
        println("$it-Schedulers.single()-#1 - ${Thread.currentThread().name}")
    }
    println("start Schedulers.single()-#2")
    observable.subscribeOn(Schedulers.single()).subscribe {
        runBlocking { delay(100) }
        println("$it-Schedulers.single()-#2 - ${Thread.currentThread().name}")
    }
    println("done")
    delay(1000)
}
start Schedulers.single()-#1
start Schedulers.single()-#2
done
1-Schedulers.single()-#1 - RxSingleScheduler-1
2-Schedulers.single()-#1 - RxSingleScheduler-1
3-Schedulers.single()-#1 - RxSingleScheduler-1
1-Schedulers.single()-#2 - RxSingleScheduler-1
2-Schedulers.single()-#2 - RxSingleScheduler-1
3-Schedulers.single()-#2 - RxSingleScheduler-1

"start Schedulers.single()-#N", "done"이 먼저 찍혔으나, Observable간에 순서는 보장되었다. 이는 "start Schedulers.single()-#N", "done"은 main thread에서 출력하였으나, Observable의 동작은 하나의 worker thread에서 실행되었기 때문이다.

Scheduler 부분만 single()에서 trampoline()으로 변경했을 때의 결과는 아래와 같다

start Schedulers.trampoline()-#1
1-Schedulers.trampoline()-#1 - main
2-Schedulers.trampoline()-#1 - main
3-Schedulers.trampoline()-#1 - main
start Schedulers.trampoline()-#2
1-Schedulers.trampoline()-#2 - main
2-Schedulers.trampoline()-#2 - main
3-Schedulers.trampoline()-#2 - main
done

Observable이 코드를 호출한 main thread에서 수행되었기에, 코드 순서대로 진행됨을 볼 수 있다.

이번에는 subscribe 시 block되지 않는 interval 연산자를 이용한 Observable에 trampoline()을 사용해보자.

runBlocking {
    println("start Schedulers.trampoline()")
    val observable = Observable.interval(1, TimeUnit.SECONDS)
    observable.subscribeOn(Schedulers.trampoline())
        .subscribe { println(it) }
    println("done")
}
start Schedulers.trampoline()
done

trampoline()을 쓰더라도 시간값으로 생산하는 Observable에는 영향을 미치지 않는다.

subscribeOn() vs observeOn()

subscribeOn()

subscribeOn()은 어느 위치에서 선언되었든 Observable과 Observer 모두 특정 Scheduler에서 동작하도록 지정한다. 즉 생산자와 소비자를 동일하도록 지정해준다.

runBlocking {
    Observable.range(1, 5).map {
        println("map: $it - ${Thread.currentThread().name}")
    }.subscribeOn(Schedulers.io()).subscribe {
        println("subscribe: $it - ${Thread.currentThread().name}")
    }
    delay(100)
}
map: 1 - RxCachedThreadScheduler-1
subscribe: kotlin.Unit - RxCachedThreadScheduler-1
map: 2 - RxCachedThreadScheduler-1
subscribe: kotlin.Unit - RxCachedThreadScheduler-1
map: 3 - RxCachedThreadScheduler-1
subscribe: kotlin.Unit - RxCachedThreadScheduler-1
map: 4 - RxCachedThreadScheduler-1
subscribe: kotlin.Unit - RxCachedThreadScheduler-1
map: 5 - RxCachedThreadScheduler-1
subscribe: kotlin.Unit - RxCachedThreadScheduler-1

발행과 처리 모두 IO Scheduler에서 수행했음을 확인할 수 있다.

observeOn()

반면 observeOn()은 선언부분 이하의 downstream이 사용할 Scheduler를 지정한다.

runBlocking {
    Observable.range(1, 3).observeOn(Schedulers.io()).map {
        println("map-#1: $it - ${Thread.currentThread().name}")
    }.observeOn(Schedulers.computation()).map {
        println("map-#2: $it - ${Thread.currentThread().name}")
    }.observeOn(Schedulers.single()).subscribe {
        println("subscribe: $it - ${Thread.currentThread().name}")
    }
    delay(100)
}
map-#1: 1 - RxCachedThreadScheduler-1
map-#1: 2 - RxCachedThreadScheduler-1
map-#1: 3 - RxCachedThreadScheduler-1
map-#2: kotlin.Unit - RxComputationThreadPool-1
map-#2: kotlin.Unit - RxComputationThreadPool-1
map-#2: kotlin.Unit - RxComputationThreadPool-1
subscribe: kotlin.Unit - RxSingleScheduler-1
subscribe: kotlin.Unit - RxSingleScheduler-1
subscribe: kotlin.Unit - RxSingleScheduler-1

observeOn()을 통해 각각의 작업이 지정된 스케줄러로 수행되었음을 확인할 수 있다.
이는 observeOn() 연산자를 통해 손쉽게 문맥 교환(Context switching)을 할 수 있음을 의미한다.

Android 에서 사용 시 Background에서 작업을 수행하고, 처리가 완료되면 해당 데이터를 UI에서 보여줄 수 있도록 observeOn(AndroidSchedulers.mainThread())를 통해 subscribe 하도록 한다면 간단하게 비동기 작업을 처리할 수 있다.

subscribeOn(), observeOn() 우선순위

Scheduler를 지정하는 subscribeOn와 observeOn을 섞어서 쓸 때 어떤 스케줄러로 동작하는지 확인해보자.

  • subscribeOn와 observeOn의 혼용
runBlocking {
    println("start")
    val observable = Observable.just(1)

    observable.subscribeOn(Schedulers.io()).map {
        println("processed : ${Thread.currentThread().name}")
    }.observeOn(Schedulers.single()).subscribe {
        println("subscribed : ${Thread.currentThread().name}")
    }
    delay(100)
    println("end")
}
start
processed : RxCachedThreadScheduler-1
subscribed : RxSingleScheduler-1
end

의도한 대로 발행은 io thread에서 구독은 single thread에서 수행되었다.

  • observeOn 아래에 subscribeOn을 사용
runBlocking {
    println("start")
    val observable = Observable.just(1)

    observable.subscribeOn(Schedulers.io()).map {
        println("processed : ${Thread.currentThread().name}")
    }.observeOn(Schedulers.single()).subscribeOn(Schedulers.computation()).subscribe {
        println("subscribed : ${Thread.currentThread().name}")
    }
    delay(100)
    println("end")
}
start
processed : RxCachedThreadScheduler-1
subscribed : RxSingleScheduler-1
end

subscribeOn을 두번 사용했지만 먼저 선언된 Schedulers.io()가 동작한다. (cachedThread가 사용됨)
observeOn 아래에 subscribeOn이 선언되었으나, subscribe 블럭 내 동작은 observeOn의 영향을 받는다.

  • subscribeOn 여러개 사용
runBlocking {
    println("start")
    val observable = Observable.just(1)

    observable.subscribeOn(Schedulers.io()).map {
        println("processed : ${Thread.currentThread().name}")
    }.subscribeOn(Schedulers.computation()).subscribe {
        println("subscribed : ${Thread.currentThread().name}")
    }
    delay(100)
    println("end")
}
start
processed : RxCachedThreadScheduler-1
subscribed : RxCachedThreadScheduler-1
end

먼저 선언된 Scheduler로 사용됨을 다시 한번 확인할 수 있다.

  • subscribeOn의 위치와의 상관관계
runBlocking {
    println("start")
    val observable = Observable.just(1)

    observable.map { println("processed : ${Thread.currentThread().name}") }
        .observeOn(Schedulers.single())
        .subscribeOn(Schedulers.computation())
        .subscribe { println("subscribed : ${Thread.currentThread().name}") }
    delay(100)
    println("end")
}
start
processed : RxComputationThreadPool-1
subscribed : RxSingleScheduler-1
end

observeOn 아래 subscribeOn을 위치시켰으나, Observable은 computation 스케줄러에서 동작하고, subscribe는 single 스케줄러에서 동작함을 확인할 수 있었다.

ReactiveX, RxJava Github, RxAndroid Github
참고 포스트1, 참고 포스트2, 참고 포스트3, 참고 포스트4

profile
배운 것을 적어나가는 그런 공간.. 적다 보면 또 까먹는 그런 사람..

0개의 댓글