RxKotlin란 ReactiveX(Reactive Extensions)를 코틀린으로 구현한 라이브러리입니다. ReactiveX는 Microsoft사 주도 아래 옵저버 패턴, 이터레이터 패턴, 함수형 프로그래밍의 장점과 개념을 접목한 반응형 프로그래밍 기법을 의미합니다. RxKotlin은 이벤트 처리 및 비동기 처리의 구성에 최적화된 라이브러리입니다. Observable 추상화 및 관련 상위 함수에 중점을 둔 단일 JAR로 가벼운 라이브러리이며, Kotlin 및 Java 이외에도 C++, Swift 등 여러 언어를 지원합니다.
반응형 프로그래밍은 주변 환경과 끊임없이 상호 작용을 하는 프로그래밍을 의미하며, 프로그램이 주도하는 것이 아닌 환경이 변하면 이벤트를 받아 동작하도록 만드는 프로그래밍 기법을 말합니다. 반응형 프로그램은 외부 요구에 끊임없이 반응하고 처리합니다.
반응형 프로그래밍에 대한 자세한 개념이 궁금하시다면 프로그래밍 패러다임과 반응형 프로그래밍 그리고 Rx를 확인하시면 좋습니다. 프론트엔드와 반응형 프로그래밍을 묶어서 설명하였지만 개념에 대해 이해하기 좋습니다.
ReactiveX 공식 사이트서 볼 수 있는 그림입니다. ReactiveX를 마블 다이어그램과 함께 CREATE
, COMBINE
, LISTEN
이라는 용어로 요약하고 있습니다. 각 용어는 다음과 같은 기능을 설명합니다.
CREATE: ReactiveX는 이벤트 스트림 또는 데이터 스트림을 쉽게 만들 수 있습니다.
COMBINE: ReactiveX는 쿼리 같은 연산자를 사용해 스트림을 구성하고 변환시킬 수 있습니다.
LISTEN: ReactiveX는 관찰 가능한 스트림을 subscribe하여 side effect(부작용)을 수행할 수 있습니다.
즉, 이 세 가지 용어를 요약하여 ReactiveX를 설명하자면 이벤트 스트림 또는 데이터 스트림을 만들고 사용자의 입맛에 맞게 연산자를 통해서 이를 변형시킨 후, 이에 관심있는 사람들이 해당 스트림을 subscribe하여 결과를 받는 것 입니다. 이제 ReactiveX가 무엇인지 알아보았으니 주요 개념(Observable, Observer, Subscribe)에 대해 알아보겠습니다.
Side effect(부작용): Computer Science에서 함수가 결과값 이외에 다른 상태를 변경시킬 때 Side effet가 있다고 말합니다. 예를 들어, 함수가 전역변수나 정적변수를 수정하거나, 인자로 넘어온 것들 중 하나를 변경하거나 화면이나 파일에 데이터를 쓰거나, 다른 부작용이 있는 함수에서 데이터를 읽어오는 경우가 있습니다. 명령형 프로그래밍은 이런 부작용을 사용하여 프로그램이 동작하게 하는 것입니다.
ReactiveX는 위에서 설명하였듯이 옵저버 패턴을 사용하기에 다음과 같은 과정이 이루어집니다. Observer(관찰자)는 Observable(발행자, 생산자)을 구독(Subscribe)하고, Observable이 발행하는 데이터에 반응합니다. 즉, Observable은 하나 혹은 연속된 데이터(아이템)을 발행하는 역할을 가지고 있습니다. 그리고 데이터가 발행되면 그 시점을 감시하는 관찰자를 Observer 안에 두고 관찰자를 통해 Observer는 데이터 발행 알림을 받는 것입니다. Observable은 아래 세 개의 이벤트를 사용하여 Observer에게 알림을 전달합니다.
onNext(item: T): 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행합니다.
onComplet(): 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 더는 onNext() 호출이 발생하지 않음을 나타냅니다.
onError(e: Throwable): 오류가 발생했음을 Observer에게 전달합니다.
이 세 가지 메서드들은 Emitter라는 인터페이스에 선언되어 있고, 메서드를 사용하여 Observable은 이벤트를 통지하고, Observer는 이벤트 알림을 받아서 적절하게 처리하고 사용자에게 이를 보여주는 방식입니다.
데이터를 발행하는 Observable을 생성하는 방법을 알아보도록 하겠습니다. RxKotlin(RxJava)에서는 연산자(Operator)라고 부르는 여러 메서드를 통해서 기존 데이터를 참조하거나 변형하여 Observable을 생성할 수 있습니다. Observable을 생성하는 주요 메서드를 알아보겠습니다.
Observable.create() 를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료 및 오류의 알림을 직접 설정할 수 있습니다.
Observable을 생성할 때 onError와 onComplete는 한 번씩 호출하는 것이 좋습니다.
주의할 점은 create() 연산자는 개발자가 직접 Emitter를 제어하므로 주의하여 사용해야 합니다. 예를 들어 Observable이 폐기되었을 때 등록된 콜백을 모두 해제하지 않으면 메모리 누수가 발생합니다.
create(source: ObservableOnSubscribe<T>)
fun main() {
// create 메서드를 사용하여 Observable 생성
val observable = Observable.create<String> { emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
emitter.onError(Throwable("오류입니다."))
emitter.onComplete()
}
// Observer 생성
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
println("onSubscribe() - $d")
}
override fun onNext(t: String) {
println("onNext() - $t")
}
override fun onError(e: Throwable) {
println("onError() - ${e.message}")
}
override fun onComplete() {
println("onComplete()")
}
}
// Kotlin 확장 함수를 사용하여 subscribe
observable.subscribeBy(
onNext = { data -> println("Data : $data") },
onError = { error -> println("Error!! - ${error.message}") },
onComplete = { println("Complete!!") }
)
// 메서드를 사용하여 Observer가 Observable 구독
observable.subscribe(observer)
/*
* 결과
* Data : Hello
* Data : World
* Error!! - 오류입니다.
* onSubscribe() - CreateEmitter{null}
* onNext() - Hello
* onNext() - World
* onError() - 오류입니다.
*/
}
just() 연산자는 해당 아이템을 그대로 발행하는 Observable을 생성해 줍니다. 연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수 있고, 타입이 같은 여러 개의 아이템을 넣을 수도 있습니다.
뒤에서 확인할 from--() 메서드와 비슷해 보이지만 just()는 단일 데이터만을 다룰 수 있고, from--()은 단일 데이터가 아닌 경우에도 사용할 수 있습니다.
just() 연산자에 null을 전달하면 null을 발행합니다. 만약 아무런 아이템을 발행하지 않는 빈 Observable을 만들고 싶다면 Observable.empty() 연산자를 사용해야 합니다.
just(item: T)
val observable = Observable.just("Hello", "World")
observable.subscribe { data ->
println("Data : $data")
}
/*
결과
Data : Hello
Data : World
*/
이미 참조할 수 있는 배열 및 리스트 등의 자료 구조나 Future, Callable 또는 Publisher가 있다면 from으로 시작하는 연산자를 통해서 Observable로 변환할 수 있습니다.
fromxxx과 관련된 메서드
자바에서는 Observable.fromArray(array)를 사용하고, 코틀린에서는 Array.toObservable()을 사용하면 됩니다.
val itemArray: Array<Int> = arrayOf(1, 2, 3)
val arrayObservable = itemArray.toObservable()
arrayObservable.subscribe {
println(it)
}
/*
결과
1
2
3
*/
예제에서는 ArrayList를 사용하였지만 Iterable 인터페이스를 구현하는 모든 클래스는 해당 함수를 통해서 Observable로 변경할 수 있습니다.
val itemList = arrayListOf(1, 2, 3)
val iterableObservable = itemList.toObservable()
// val iterableObservable = Observable.fromIterable(iterableObservable)
iterableObservable.subscribe {
println(it)
}
/*
결과
1
2
3
*/
Callable 인터페이스는 비동기적인 실행 결과를 반환합니다. Runnable 인터페이스와 비슷하지만 Runnable은 실행 결과를 반환하지 않는다는 점에서 차이가 있습니다.
val callable = Callable<String> {
"Hello World"
}
val callableObservable = Observable.fromCallable(callable)
callableObservable.subscribe {
println(it)
}
/*
결과
Hello World
*/
Future 인터페이스는 비동기적인 작업의 결과를 구할 때 사용합니다. Future를 이용하면 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달할 수 있습니다. Future 내부적으로 Thread-Safe 하도록 구현되었기에, synchronized
를 사용하지 않아도 됩니다. 보통 ExecutorService를 통해 비동기적인 작업을 할 때 사용됩니다.
val future = Executors.newSingleThreadExecutor().submit<String> { "Hello World" }
val futureObservable = Observable.fromFuture(future)
futureObservable.subscribe {
println(it)
}
/*
결과
Hello World
*/
Publisher는 잠재적인 아이템 발행을 제공하는 생산자입니다.
val publisher = Publisher<String> {
it.onNext("Hello World")
it.onComplete()
}
val publisherObservable = Observable.fromPublisher(publisher)
publisherObservable.subscribe {
println(it)
}
/*
결과
Hello World
*/
empty(), never(), error() 연산자 모두 아이템을 발행하지 않지만, onComplete() 또는 onError() 호출 여부에 차이점이 있습니다.
empty 연산자는 아이템을 발행하지는 않지만, 정상적으로 스트림을 종료시킵니다.
never 연산자는 아이템을 발행하지는 않지만, 스트림을 종료시키지도 않습니다.
error 연산자는 아이템을 발행하지는 않지만, 에러를 발생시킵니다.
Observable.empty<String>()
.subscribeBy(
onNext = { println("empty - onNext")},
onError = { println("empty - onError")},
onComplete = { println("empty - onComplete")}
)
Observable.never<String>()
.subscribeBy(
onNext = { println("never - onNext")},
onError = { println("never - onError")},
onComplete = { println("never - onComplete")}
)
Observable.error<String>(Throwable("Error"))
.subscribeBy(
onNext = { println("error - onNext")},
onError = { println("error - onError")},
onComplete = { println("error - onComplete")}
)
/*
결과
empty - onComplete
error - onError
*/
interval 연산자는 주어진 시간 간격으로 순서대로 정수를 발행하는 Observable을 생성합니다.
주의할 점은 구독을 중지하기 전까지 무한히 데이터를 발행하므로 적절한 시점에 폐기하는 것이 중요합니다.
interval(period: Long, unit: TimeUnit)
// interval() 메서드의 첫 번째 매개 변수는 시간 크기, 두 번째 매개 변수는 시간 단위가 옵니다.
val intervalObservable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe {
println(it)
}
Thread.sleep(5000)
intervalObservable.dispose() // 아이템 발행 중단
/*
결과
0
1
2
3
4
*/
특정 범위의 정수를 순서대로 발행하는 Observable을 생성합니다.
interval 연산자와 비슷하지만 특정 범위의 아이템을 발행하고, 발행이 끝나면 스트림을 종료시킨다는 점에서 차이가 있습니다.
range(start: Int, count: Int)
// range 함수의 첫 번째 매개 변수는 시작 값, 두 번째 매개 변수는 생성할 숫자의 개수
Observable.range(0, 5)
.subscribe {
println(it)
}
/*
결과
0
1
2
3
4
*/
특정 시간 동안 지연시킨 뒤 아이템(0L)을 발행하고, 종료합니다.
timer(delay: Long, unit: TimeUnit)
Observable.timer(5, TimeUnit.SECONDS)
.subscribe {
println("Timer 끝!")
}
println("시작!")
Thread.sleep(6000)
/*
결과
시작!
Timer 끝!
*/
val justSrc = Observable.just(
System.currentTimeMillis()
)
val deferSrc = Observable.defer {
Observable.just(System.currentTimeMillis())
}
// 현재 시간 출력
println("#1 now = ${System.currentTimeMillis()}")
try {
Thread.sleep(5000)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
println("#2 now = ${System.currentTimeMillis()}")
// just만을 사용하여 바로 아이템 발행 -> #1 now와 비슷
justSrc.subscribe {
println("#1 time = $it")
}
// defer를 사용하여 구독이 들어왔을 때 아이템 발행 -> #2 now와 비슷
deferSrc.subscribe {
println("#2 time = $it")
}
/*
결과
#1 now = 1663741055883
#2 now = 1663741060887
#1 time = 1663741055816
#2 time = 1663741060900
*/
위에서 Observable 객체에서 발행하는 아이템을 받기 위해서 subscribe() 메서드를 사용하였습니다. subscribe() 메서드는 Observer를 Observable에 연결하는 메서드로, Observable이 발행하는 아이템을 받고, error 또는 complete 알림을 받기 위해서 사용합니다. subscribe() 메서드는 파라미터에 따라서 다양하게 오버로딩되어 있습니다.
subscribe(): Disposable
subscribe(onNext: Consumer): Disposable
subscribe(onNext: Consumer, onError: Consumer)
subscribe(onNext: Consumer, onError: Consumer, onComplete: Action)
subscribe(onNext: Consumer, onError: Consumer, onComplete: Action, container: DisposableContainer): Disposable
subscribe(observer: Observer): Disposable
해당 메서드들의 공통점은 Disposable 객체를 반환하는 것입니다. 유한한 아이템을 발행하는 Observable의 경우 onComplete() 호출로 안전하게 종료됩니다. 하지만 무한하게 아이템을 발행하거나 오랫동안 실행되는 Observable의 경우에는 사용자의 액션 또는 안드로이드 컴포넌트의 생명주기와 관련해서 이미 활성화되어 있다면 구독이 더는 필요하지 않을 수 있습니다. 이럴 때, 메모리 누수 방지를 위해서 Disposable 클래스의 dispose() 메서드를 사용해 아이템 발행을 중단할 수 있습니다.
val observable = Observable.interval(1, TimeUnit.SECONDS)
// 1초에 한 번씩 아이템 발행(무한히)
val disposable = observable.subscribe {
println(it)
}
Thread().apply {
try {
Thread.sleep(3500)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
// 아이템 발행 해제
disposable.dispose()
}.start()
/*
결과
0
1
2
*/
interval 메서드는 무한하게 아이템을 발행하는데, Thread에서 3.5초 이후에 dispose를 호출하여 아이템 발행을 중지시키고 리소스가 폐기되는 코드입니다.
리소스가 이미 폐기되었는지 확인하는 데 Disposable.isDisposed() 메서드를 활용할 수 있습니다.
onComplete()를 명시적으로 호출하거나 호출됨을 보장한다면 dispose()를 호출할 필요가 없습니다.
구독자(Observer)가 여러 곳에 있고, 이들을 폐기하려면 각각의 Disposable 객체에 대해서 dispose()를 호출해야 합니다. 이때, CompositeDisposable을 이용하면 이들을 한꺼번에 폐기할 수 있습니다.
val observable = Observable.interval(1, TimeUnit.SECONDS)
// 1초에 한 번씩 아이템 발행(무한히)
val disposable = observable.subscribe {
println("Disposable1 : $it")
}
val disposable2 = observable.subscribe {
println("Disposable2 : $it")
}
val disposable3 = observable.subscribe {
println("Disposable3 : $it")
}
val compositeDisposable = CompositeDisposable()
compositeDisposable.addAll(disposable, disposable2, disposable3)
Thread().apply {
try {
Thread.sleep(3500)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
// 아이템 발행 해제
compositeDisposable.dispose()
}.start()
/*
결과
Disposable2 : 0
Disposable1 : 0
Disposable3 : 0
Disposable3 : 1
Disposable1 : 1
Disposable2 : 1
Disposable1 : 2
Disposable3 : 2
Disposable2 : 2
*/
Observable은 데이터를 발행하는 시점이 다릅니다. 이러한 시점에 따라서 Hot Observable과 Cold Observable로 구분하는데, 자세한 개념은 다음과 같습니다.
ConnectableObservable은 Hot Observable을 구현할 수 있도록 도와주는 타입으로 아무 Observable 타입이나 publish 연산자를 이용하여 간단히 ConnectableObservable로 변환할 수 있습니다. 다만 ConnectableObservable은 구독을 요청해도 Observable을 발행하지 않고, connect() 연산자를 호출할 때 비로소 아이템을 발행하기 시작합니다.
val connectableObservable = Observable.interval(1, TimeUnit.SECONDS).publish()
// 1번 구독자 등록
connectableObservable.connect()
connectableObservable.subscribe {
println("#1: $it")
}
Thread.sleep(3000)
// 2번 구독자 등록
connectableObservable.subscribe {
println("#2: $it")
}
Thread.sleep(3000)
/*
결과
#1: 0
#1: 1
#1: 2
#1: 3
#2: 3
#1: 4
#2: 4
#1: 5
#2: 5
*/
autoConnect 연산자는 connect 연산자를 호출하지 않더라도, 구독 시에 즉각 아이템을 발행할 수 있도록 도와주는 연산자입니다.
autoConnect 연산자의 매개 변수는 아이템을 발행하는 구독자 수로, 만약 2를 인자로 준다면, 구독자가 2개 이상 붙어야 아이템을 발행합니다.
val observable = Observable.interval(1, TimeUnit.SECONDS)
.publish()
.autoConnect(2)
observable.subscribe {
println("A: $it")
}
Thread.sleep(3000)
observable.subscribe {
println("B: $it")
}
Thread.sleep(3000)
/*
결과
A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
*/
참조 및 참고
틀린 부분은 댓글로 남겨주시면 바로 수정하겠습니다..!!
2022-09-23
에 작성되었습니다.
아키텍처를 알아야 앱 개발이 보인다.
RxJava Docs
Grokking RxJava, Part 1: Tha basics
Reactive 코틀린 #2 - Observable, ConnectableObservable