Reactive Prgram은 Observer, Iterator, Functional Programming을 결합한 비동기 프로그래밍이다.
일반적인 명령형 프로그램이 아닌 데이터의 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관되는 연산자를 통해 처리되는 방식을 Reaxtive Programming이라고 한다.
즉 필요에 의해 데이터를 요청하여 가공하는 것(절차형)이 아닌, 데이터를 관리주체하는 Observable
에 데이터 변경시 요청을 받을 수 있도록 subscribe하여 변경사항을 전달하는 방식이다.
데이터의 흐름, 스트림을 만드는 Observable
subscribeOn
흐름, 스트림에서 데이터를 처리하는 Subscribe
observeOn
Obserable은 데이터를 제공하는 생산자의 역할을 하여 3가지의 행동을 한다.
implementation ("io.reactivex.rxjava3:rxjava:3.0.9");
implementation ("io.reactivex.rxjava3:rxkotlin:3.0.1");
just
를 사용하면 데이터를 1개만을 입력할 수 있다Observable
.just("a", "b", "c", "d", "e") //Observable 내부에 값을 순서대로 전달
.subscribe { //5번의 onNext로 전달됨
println(it) //Observable, 스트림으로 부터 데이터를 subsribe하여 출력
}
just
는 매개변수 순서에 맟춰서 데이터를 발행한다.
subscribe는 발행된 데이터를 처리하는 로직이 작성된다.
subscribe는 다음과 같이 오버로딩되어 있다
public final void subscribe(@NonNull Observer<? super T> observer)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete)
Action
은Runnable
과 같이 매개변수가 없고 반환타입도 없다.실행 메서드도
run()
인 함수형 인터페이스이다.
subscribe
메서드를 이용하여 발생된 데이터를 구독할 수 있다.
구독하는 방법에는 두 가지가 존재한다.
Observer 객체를 생성
객체가 아닌 사용자가 필요한 함수형 인터페이스만을 지정
val observer :Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable?) {
println("onSubscribe")
}
override fun onNext(t: String?) {
println("onNext = ${t}")
}
override fun onError(e: Throwable?) {
println("onError = ${e}")
}
override fun onComplete() {
println("onComplete")
}
}
Observable.just("a", "b", "c", "d", "e").subscribe(observer)
//결과
//onSubscribe
//onNext = a
//onNext = b
//onNext = c
//onNext = d
//onNext = e
//onComplete
public final void subscribe(@NonNull Observer<? super T> observer) //Observer 객체를 생성해서 구독하는 방법
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete)
Observable.just("a", "b", "c", "d", "e")
.subscribe ({
println("onNext")
},{
println("onError")
},{
println("onComplete")
})
구독해지에는 몇가지 방법이 존재한다.
구독시 observer객체를 넣어 해당 객체에 Disposable
과 관련된 로직을 정의한다.
Disposable
를 dispose()
를 이용하면 구독이 해지된다.
Disposable
타입 객체를 선언하고end
데이터가 삽입되면 구독해지 되도록 설정하였다.val observer: Observer<String> = object : Observer<String> {
lateinit var disposable : Disposable
override fun onSubscribe(d: Disposable?) {
println("onSubscribe")
if (d != null) {
disposable = d
}
}
override fun onNext(t: String?) {
if(t == "end"){
disposable.dispose()
println("subscribe dispose")
}
println("onNext = ${t}")
}
override fun onError(e: Throwable?) { }
override fun onComplete() { }
}
Observable.just("a", "b", "c", "d", "end", "e")
.subscribe(observer)
//결과
//onSubscribe
//onNext = a
//onNext = b
//onNext = c
//onNext = d
//subscribe dispose
//e가 출력되지 않음
subscribe
한 후 disposable
타입의 객체를 dispose
해준다val disposable = Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe {
println("onNext = ${it}")
}
Thread.sleep(2000)
disposable.dispose()
println("main end")
//결과
//...
//onNext = 17
//onNext = 18
//main end
val compositeDisposable= CompositeDisposable()
val disposable = Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe {
println("onNext = ${it}")
}
compositeDisposable.add(disposable)
Thread.sleep(2000)
compositeDisposable.clear()
Thread.sleep(2000)
println("main end")
//결과
//...
//onNext = 17
//onNext = 18
//main end
Observable.create<Int>{ //it:ObservableEmitter<Int!>!
it.onNext(10) //값을 발행
it.onNext(20)
it.onNext(30)
it.onComplete() //발행 종료
}.subscribe({
println(it)
},{
println(it)
})
Observable.just("a", 1, "string", 10.5)
.subscribe({
println(it)
},{
println(it)
})
Observable
.range(5, 10)
.subscribe({
println(it)
},{
println(it)
})
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5) //5개 까지만 발행
.subscribe({
println(it)
}, {
println(it)
})
//출력은 0, 1, 2, 3, 4
Cold Observable
이라 한다.Hot Observable
이다.Hot Observable
중 하나로 connect()
메서드를 호출하면 발행을 시작하는 Observable이다.publish()
메서드로 기존 Cold Observable
이 Hot Observable
이 된다.val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish()
connectableObservable.subscribe { println("1 Observer = ${it}") }
connectableObservable.connect()
Thread.sleep(200)
connectableObservable.subscribe { println("2 Observer = ${it}") }
Thread.sleep(200)
//결과
//1 Observer = 0
//1 Observer = 1
//2 Observer = 1
//1 Observer = 2
//2 Observer = 2
connect()
메서드 호출전에 미리 구독한 1 observer
은 0부터 데이터를 발행받는다.2 observer
는 1부터 데이터를 받기 시작한다.val observable = Observable.range(1, 10) //1부터 10번 카운트하여 정수값을 발행
val asyncSubject = AsyncSubject.create<Int>()
observable.subscribe(asyncSubject) //Subject는 Observer를 구현한 추상메서드라서 Observer타입의 매개변수가 될 수 있다.
asyncSubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(1000)
asyncSubject.subscribe { println("2 observer = ${it}") }
//결과
//1 observer = 10
//2 observer = 10
1번째 observer가 구독할 시점에는 발행된 데이터가 없어서 기본 데이터 발행
2번째 observer가 구독할 시점에 최신 데이터는 녹색이므로 녹색 데이터가 발행됨
val behaviorSubject = BehaviorSubject.createDefault(0)
behaviorSubject.subscribe { println("1 observer = ${it}") }
behaviorSubject.onNext(100)
behaviorSubject.onNext(200)
behaviorSubject.subscribe { println("2 observer = ${it}") }
behaviorSubject.onNext(300)
behaviorSubject.onNext(400)
//결과
//1 observer = 0 //구독시점에서 발행된 데이터가 없어서 기본값을 받음
//1 observer = 100
//1 observer = 200
//2 observer = 200
//1 observer = 300
//2 observer = 300
//1 observer = 400
//2 observer = 400
Create
메서드를 이용하여 명시적인 Cold Observable
로 생성해야 한다.val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val publishSubject = PublishSubject.create<Long>()
observable.subscribe(publishSubject)
publishSubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(200)
publishSubject.subscribe { println("2 observer = ${it}") }
Thread.sleep(200)
//결과
//1 observer = 0
//1 observer = 1
//1 observer = 2
//2 observer = 2
//1 observer = 3
//2 observer = 3
[Observable 계약](ReactiveX - The Observable Contract)
val replaySubject = ReplaySubject.interval(200, TimeUnit.MILLISECONDS)
replaySubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(500)
replaySubject.subscribe { println("2 observer = ${it}") }
Thread.sleep(500)
//결과
//1 observer = 0
//1 observer = 1
//1 observer = 2
//2 observer = 0
//1 observer = 3
//2 observer = 1
create
, empty
, interval
, just
, range
등이 존재한다.Observable.just(1,2,3,4,5,6,7,8,9)
.filter { it -> it % 2 == 0 } //짝수 값만을 필터링
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 2
//onNext = 4
//onNext = 6
//onNext = 8
filter는 Predicate 함수형 인터페이스로 입력값 하나, boolean값을 리턴한다
Observable.just(1,2,3,1,2,3,4)
.distinct()
.subscribe(System.out::println)
//결과
//1
//2
//3
//4
Single
이 된다.Observable.just(1,2,3,4,5,6,7,8,9)
.filter { it -> it % 2 == 0 }
.first(0)
.subscribe({
println("onSuccess = ${it}"
},{
println("onError = ${it}")
})
//결과
//onSuccess = 2
해당 subscribe는 public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError)로 정의되어 있다.
public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess)
public final Disposable subscribe(@NonNull BiConsumer<? super T, ? super Throwable> onCallback)
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.filter { it % 2 == 0 }
.take(2)
.subscribe { it
println("onNext = ${it}")
}
//결과
//onNext = 2
//onNext = 4
Observable.just(1,2, 3, 4, 5)
.map { it -> it * 10}
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 10
//onNext = 20
//onNext = 30
//onNext = 40
//onNext = 50
Observable.just("1","2","3","4","5")
.flatMap { value: String ->
Observable.interval(200, TimeUnit.MILLISECONDS)
.map { cnt: Long -> "$value nowCnt = $cnt" }
.take(2)
}
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 3 nowCnt = 0
//onNext = 1 nowCnt = 0
//onNext = 2 nowCnt = 0
//onNext = 4 nowCnt = 0
//onNext = 5 nowCnt = 0
//onNext = 1 nowCnt = 1
//onNext = 2 nowCnt = 1
//onNext = 3 nowCnt = 1
//onNext = 4 nowCnt = 1
//onNext = 5 nowCnt = 1
concatMap
을 사용하여 순서를 보장할 수 있지만 성능은 flatMap에 비해 떨어진다.concatMap
에 여러 Observable이 사용되면 다음과 같이 병합되지 않고 연결된다.Observable.interval(105, TimeUnit.MILLISECONDS)
.take(5)
.debounce(100, TimeUnit.MILLISECONDS)
.subscribe(System.out::println)
//결과 (매번바뀜)
//1
//3
//4
Observable.just("a","b","c","d","e")
.scan { now, next ->
"${now} -> ${next}"
}
.subscribe(System.out::println)
//결과
//a
//a -> b
//a -> b -> c
//a -> b -> c -> d
//a -> b -> c -> d -> e
Observable.just("a","b","c","d","e")
.reduce { now, next ->
"${now} -> ${next}"
}
.subscribe(System.out::println)
//결과
//a -> b -> c -> d -> e
val observable1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { "observable1 = $it" }
.take(5)
val observable2 = Observable.interval(0, 200, TimeUnit.MILLISECONDS)
.map { "observable2 = $it" }
.take(5)
Observable.concat(observable1, observable2)
.subscribe(System.out::println)
//결과
//observable1 = 0
//observable1 = 1
//observable1 = 2
//observable1 = 3
//observable1 = 4
//observable2 = 0
//observable2 = 1
//observable2 = 2
//observable2 = 3
//observable2 = 4
val observable1 = Observable.just(1,2,3,4,5,6,7)
val observable2 = Observable.just("a","b","c")
Observable.zip(observable1, observable2, BiFunction { obs1, obs2 ->
return@BiFunction "$obs1 + $obs2"
}).subscribe(System.out::println)
//결과
//1 + a
//2 + b
//3 + c
val list1 = mutableListOf(1,2,3,4,5,6,7,8,9)
val list2 = mutableListOf("dog","cat","elephant", "human","thanos","deer")
val observable1 = Observable.fromIterable(list1)
.zipWith(Observable.interval(200, TimeUnit.MILLISECONDS), BiFunction { int, time ->
return@BiFunction int
})
val observable2 = Observable.fromIterable(list2)
.zipWith(Observable.interval(150, 200, TimeUnit.MILLISECONDS), BiFunction { str, time ->
return@BiFunction str
})
Observable.combineLatest(observable1, observable2, BiFunction { obs1, obs2 ->
return@BiFunction "$obs1 + $obs2"
}).subscribe(System.out::println)
//결과
//1 + dog
//1 + cat
//2 + cat
//2 + elephant
//3 + elephant
//3 + human
//4 + human
//4 + thanos
//5 + thanos
//5 + deer
//6 + deer
//7 + deer
//8 + deer
//9 + deer
val observable1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { "observable1 = $it" }
.take(5)
val observable2 = Observable.interval(0, 200, TimeUnit.MILLISECONDS)
.map { "observable2 = $it" }
.take(5)
Observable.merge(observable1, observable2)
.subscribe(System.out::println)
//결과
//observable1 = 0
//observable2 = 0
//observable1 = 1
//observable1 = 2
//observable2 = 1
//observable1 = 3
//observable2 = 2
//observable1 = 4
//observable2 = 3
//observable2 = 4
Observable.just("a", "b", "c", "d", "e", 5)
.all { it is String }
.subscribe({
if (it == true) println("all is String type")
else println("there are another type")
}, {
println("error")
})
//결과
//there are another type
Predicate
이므로 결과가 boolean
타입으로 반환되고 이는 1개의 값만을 발행하는 것을 보장하기 때문에 Single
을 반환한다.val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS).take(3).map { "observable1 = $it" }
val observable2 = Observable.interval(50, TimeUnit.MILLISECONDS).take(3).map { "observable2 = $it" }
val observable3 = Observable.interval(0,100, TimeUnit.MILLISECONDS).take(3).map { "observable3 = $it" }
val list = listOf(observable1, observable2, observable3)
Observable.amb(list).subscribe(System.out::println)
//결과
//observable3 = 0
//observable3 = 1
//observable3 = 2
Observable.interval(100, TimeUnit.MILLISECONDS)
.map { "observable = $it" }
.takeUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
.subscribe(System.out::println)
//결과
//observable = 0
//observable = 1
//observable = 2
//observable = 3
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.skipUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
.subscribe(System.out::println)
//결과
//5
//6
//7
//8
//9
interval
, timer
로 생성된 Observable은 계산 Scheduler인 Computation을 사용함subscribeOn
로 지정observeOn
로 지정val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//2. RxCachedThreadScheduler-2 is working, value is 1
//1. RxCachedThreadScheduler-1 is working, value is 1
//1. RxCachedThreadScheduler-1 is working, value is 2
//2. RxCachedThreadScheduler-2 is working, value is 2
//1. RxCachedThreadScheduler-1 is working, value is 3
//2. RxCachedThreadScheduler-2 is working, value is 3
val observable = Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxComputationThreadPool-1 is working, value is 0
//1. RxComputationThreadPool-1 is working, value is 1
//1. RxComputationThreadPool-1 is working, value is 2
//1. RxComputationThreadPool-1 is working, value is 3
//1. RxComputationThreadPool-1 is working, value is 4
interval
, timer
로 Obversable를 생성하면 scheduler가 자동적으로 computation으로 바뀐다.val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.newThread())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.newThread())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxNewThreadScheduler-1 is working, value is 1
//1. RxNewThreadScheduler-1 is working, value is 2
//1. RxNewThreadScheduler-1 is working, value is 3
//2. RxNewThreadScheduler-2 is working, value is 1
//2. RxNewThreadScheduler-2 is working, value is 2
//2. RxNewThreadScheduler-2 is working, value is 3
val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.single())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.single())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxSingleScheduler-1 is working, value is 1
//1. RxSingleScheduler-1 is working, value is 2
//1. RxSingleScheduler-1 is working, value is 3
//2. RxSingleScheduler-1 is working, value is 1
//2. RxSingleScheduler-1 is working, value is 2
//2. RxSingleScheduler-1 is working, value is 3
val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.trampoline())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.trampoline())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. main is working, value is 1
//1. main is working, value is 2
//1. main is working, value is 3
//2. main is working, value is 1
//2. main is working, value is 2
//2. main is working, value is 3
기본적으로 Observable과 사용자가 지정한 연산자 체인의 작업을 수행하고 subscribe 메서드에 대해 관찰자에게 알린다.
SubscribeOn연산자는 Observable이 작동해야 하는 다른 Scheduler를 지정하여 작동 스레드를 변경한다.
observeOn연산자는 Observable가 observe에게 변경사항을 보내는데 다른 scheduler를사용하게끔 만들것이다.
다음 이미지를 통해 subscribeOn과 observeOn의 우선순위에 대해 알수 있다.
주황색 Scheduler
를 사용되고 마지막에 보라색 Scheduler
로 변형되었다.subscribeOn 연산자의 경우 여러번 호출가능하지만 가장 먼저 선언된 Scheduler를 사용하게 된다.
Observable.just(0, 1, 2)
.doOnNext { println("doOnNext = $it") }
.doOnComplete { println("doOnComplete") }
.doOnError { println("doOnError") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
//결과
//doOnNext = 0
//onNext = 0
//doOnNext = 1
//onNext = 1
//doOnNext = 2
//onNext = 2
//doOnComplete
//onComplete
Observable.just(0, 1, 2)
.map { it % 0 }
.doOnNext { println("doOnNext = $it") }
.doOnComplete { println("doOnComplete") }
.doOnError { println("doOnError") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
//결과
//doOnError
//onError = java.lang.ArithmeticException: / by zero
val dispose = Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnSubscribe { println("doOnSubscribe") }
.doOnDispose { println("doOnDispose") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
Thread.sleep(500)
dispose.dispose()
//결과
//doOnSubscribe
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 3
//onNext = 4
//doOnDispose
val list = mutableListOf("0", "1", "2","T3", "4")
Observable.fromIterable(list)
.map{Integer.parseInt(it)}
.onErrorReturn {
println("error => $it")
return@onErrorReturn -1
}
.subscribe {
println("onNext = $it")
}
//결과
//onNext = 0
//onNext = 1
//onNext = 2
//error => java.lang.NumberFormatException: For input string: "T3"
//onNext = -1
val observable = Observable.range(0, 10)
.map {
if (it == 3) throw Exception("error")
else it
}
observable.retry(2).subscribe({
println("onNext = $it")
},{
println("onError = $it")
})
//결과
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 0
//onNext = 1
//onNext = 2
//onError = java.lang.Exception: error
참고 :