Reactive Prgram은 Observer, Iterator, Functional Programming을 결합한 비동기 프로그래밍이다.
일반적인 명령형 프로그램이 아닌 데이터의 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관되는 연산자를 통해 처리되는 방식을 Reaxtive Programming이라고 한다.
즉 필요에 의해 데이터를 요청하여 가공하는 것(절차형)이 아닌, 데이터를 관리주체하는 Observable에 데이터 변경시 요청을 받을 수 있도록 subscribe하여 변경사항을 전달하는 방식이다.
데이터의 흐름, 스트림을 만드는 Observable
subscribeOn흐름, 스트림에서 데이터를 처리하는 Subscribe
observeOnObserable은 데이터를 제공하는 생산자의 역할을 하여 3가지의 행동을 한다.
implementation ("io.reactivex.rxjava3:rxjava:3.0.9");
implementation ("io.reactivex.rxjava3:rxkotlin:3.0.1");
just를 사용하면 데이터를 1개만을 입력할 수 있다Observable
.just("a", "b", "c", "d", "e") //Observable 내부에 값을 순서대로 전달
.subscribe { //5번의 onNext로 전달됨
println(it) //Observable, 스트림으로 부터 데이터를 subsribe하여 출력
}
just는 매개변수 순서에 맟춰서 데이터를 발행한다.
subscribe는 발행된 데이터를 처리하는 로직이 작성된다.
subscribe는 다음과 같이 오버로딩되어 있다
public final void subscribe(@NonNull Observer<? super T> observer)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete)
Action은Runnable과 같이 매개변수가 없고 반환타입도 없다.실행 메서드도
run()인 함수형 인터페이스이다.
subscribe 메서드를 이용하여 발생된 데이터를 구독할 수 있다.
구독하는 방법에는 두 가지가 존재한다.
Observer 객체를 생성
객체가 아닌 사용자가 필요한 함수형 인터페이스만을 지정
val observer :Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable?) {
println("onSubscribe")
}
override fun onNext(t: String?) {
println("onNext = ${t}")
}
override fun onError(e: Throwable?) {
println("onError = ${e}")
}
override fun onComplete() {
println("onComplete")
}
}
Observable.just("a", "b", "c", "d", "e").subscribe(observer)
//결과
//onSubscribe
//onNext = a
//onNext = b
//onNext = c
//onNext = d
//onNext = e
//onComplete
public final void subscribe(@NonNull Observer<? super T> observer) //Observer 객체를 생성해서 구독하는 방법
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete)
Observable.just("a", "b", "c", "d", "e")
.subscribe ({
println("onNext")
},{
println("onError")
},{
println("onComplete")
})
구독해지에는 몇가지 방법이 존재한다.
구독시 observer객체를 넣어 해당 객체에 Disposable과 관련된 로직을 정의한다.
Disposable를 dispose()를 이용하면 구독이 해지된다.
Disposable타입 객체를 선언하고end데이터가 삽입되면 구독해지 되도록 설정하였다.val observer: Observer<String> = object : Observer<String> {
lateinit var disposable : Disposable
override fun onSubscribe(d: Disposable?) {
println("onSubscribe")
if (d != null) {
disposable = d
}
}
override fun onNext(t: String?) {
if(t == "end"){
disposable.dispose()
println("subscribe dispose")
}
println("onNext = ${t}")
}
override fun onError(e: Throwable?) { }
override fun onComplete() { }
}
Observable.just("a", "b", "c", "d", "end", "e")
.subscribe(observer)
//결과
//onSubscribe
//onNext = a
//onNext = b
//onNext = c
//onNext = d
//subscribe dispose
//e가 출력되지 않음
subscribe한 후 disposable타입의 객체를 dispose해준다val disposable = Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe {
println("onNext = ${it}")
}
Thread.sleep(2000)
disposable.dispose()
println("main end")
//결과
//...
//onNext = 17
//onNext = 18
//main end
val compositeDisposable= CompositeDisposable()
val disposable = Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe {
println("onNext = ${it}")
}
compositeDisposable.add(disposable)
Thread.sleep(2000)
compositeDisposable.clear()
Thread.sleep(2000)
println("main end")
//결과
//...
//onNext = 17
//onNext = 18
//main end

Observable.create<Int>{ //it:ObservableEmitter<Int!>!
it.onNext(10) //값을 발행
it.onNext(20)
it.onNext(30)
it.onComplete() //발행 종료
}.subscribe({
println(it)
},{
println(it)
})

Observable.just("a", 1, "string", 10.5)
.subscribe({
println(it)
},{
println(it)
})

Observable
.range(5, 10)
.subscribe({
println(it)
},{
println(it)
})




Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5) //5개 까지만 발행
.subscribe({
println(it)
}, {
println(it)
})
//출력은 0, 1, 2, 3, 4
Cold Observable이라 한다.Hot Observable이다.Hot Observable 중 하나로 connect()메서드를 호출하면 발행을 시작하는 Observable이다.publish()메서드로 기존 Cold Observable이 Hot Observable이 된다.val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish()
connectableObservable.subscribe { println("1 Observer = ${it}") }
connectableObservable.connect()
Thread.sleep(200)
connectableObservable.subscribe { println("2 Observer = ${it}") }
Thread.sleep(200)
//결과
//1 Observer = 0
//1 Observer = 1
//2 Observer = 1
//1 Observer = 2
//2 Observer = 2
connect() 메서드 호출전에 미리 구독한 1 observer은 0부터 데이터를 발행받는다.2 observer는 1부터 데이터를 받기 시작한다.

val observable = Observable.range(1, 10) //1부터 10번 카운트하여 정수값을 발행
val asyncSubject = AsyncSubject.create<Int>()
observable.subscribe(asyncSubject) //Subject는 Observer를 구현한 추상메서드라서 Observer타입의 매개변수가 될 수 있다.
asyncSubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(1000)
asyncSubject.subscribe { println("2 observer = ${it}") }
//결과
//1 observer = 10
//2 observer = 10

1번째 observer가 구독할 시점에는 발행된 데이터가 없어서 기본 데이터 발행
2번째 observer가 구독할 시점에 최신 데이터는 녹색이므로 녹색 데이터가 발행됨

val behaviorSubject = BehaviorSubject.createDefault(0)
behaviorSubject.subscribe { println("1 observer = ${it}") }
behaviorSubject.onNext(100)
behaviorSubject.onNext(200)
behaviorSubject.subscribe { println("2 observer = ${it}") }
behaviorSubject.onNext(300)
behaviorSubject.onNext(400)
//결과
//1 observer = 0 //구독시점에서 발행된 데이터가 없어서 기본값을 받음
//1 observer = 100
//1 observer = 200
//2 observer = 200
//1 observer = 300
//2 observer = 300
//1 observer = 400
//2 observer = 400
Create메서드를 이용하여 명시적인 Cold Observable로 생성해야 한다.
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val publishSubject = PublishSubject.create<Long>()
observable.subscribe(publishSubject)
publishSubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(200)
publishSubject.subscribe { println("2 observer = ${it}") }
Thread.sleep(200)
//결과
//1 observer = 0
//1 observer = 1
//1 observer = 2
//2 observer = 2
//1 observer = 3
//2 observer = 3
[Observable 계약](ReactiveX - The Observable Contract)

val replaySubject = ReplaySubject.interval(200, TimeUnit.MILLISECONDS)
replaySubject.subscribe { println("1 observer = ${it}") }
Thread.sleep(500)
replaySubject.subscribe { println("2 observer = ${it}") }
Thread.sleep(500)
//결과
//1 observer = 0
//1 observer = 1
//1 observer = 2
//2 observer = 0
//1 observer = 3
//2 observer = 1
create, empty, interval, just, range등이 존재한다.
Observable.just(1,2,3,4,5,6,7,8,9)
.filter { it -> it % 2 == 0 } //짝수 값만을 필터링
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 2
//onNext = 4
//onNext = 6
//onNext = 8
filter는 Predicate 함수형 인터페이스로 입력값 하나, boolean값을 리턴한다

Observable.just(1,2,3,1,2,3,4)
.distinct()
.subscribe(System.out::println)
//결과
//1
//2
//3
//4
Single이 된다.
Observable.just(1,2,3,4,5,6,7,8,9)
.filter { it -> it % 2 == 0 }
.first(0)
.subscribe({
println("onSuccess = ${it}"
},{
println("onError = ${it}")
})
//결과
//onSuccess = 2
해당 subscribe는 public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError)로 정의되어 있다.
public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess)
public final Disposable subscribe(@NonNull BiConsumer<? super T, ? super Throwable> onCallback)

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.filter { it % 2 == 0 }
.take(2)
.subscribe { it
println("onNext = ${it}")
}
//결과
//onNext = 2
//onNext = 4

Observable.just(1,2, 3, 4, 5)
.map { it -> it * 10}
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 10
//onNext = 20
//onNext = 30
//onNext = 40
//onNext = 50

Observable.just("1","2","3","4","5")
.flatMap { value: String ->
Observable.interval(200, TimeUnit.MILLISECONDS)
.map { cnt: Long -> "$value nowCnt = $cnt" }
.take(2)
}
.subscribe {
println("onNext = ${it}")
}
//결과
//onNext = 3 nowCnt = 0
//onNext = 1 nowCnt = 0
//onNext = 2 nowCnt = 0
//onNext = 4 nowCnt = 0
//onNext = 5 nowCnt = 0
//onNext = 1 nowCnt = 1
//onNext = 2 nowCnt = 1
//onNext = 3 nowCnt = 1
//onNext = 4 nowCnt = 1
//onNext = 5 nowCnt = 1
concatMap을 사용하여 순서를 보장할 수 있지만 성능은 flatMap에 비해 떨어진다.concatMap에 여러 Observable이 사용되면 다음과 같이 병합되지 않고 연결된다.
Observable.interval(105, TimeUnit.MILLISECONDS)
.take(5)
.debounce(100, TimeUnit.MILLISECONDS)
.subscribe(System.out::println)
//결과 (매번바뀜)
//1
//3
//4

Observable.just("a","b","c","d","e")
.scan { now, next ->
"${now} -> ${next}"
}
.subscribe(System.out::println)
//결과
//a
//a -> b
//a -> b -> c
//a -> b -> c -> d
//a -> b -> c -> d -> e

Observable.just("a","b","c","d","e")
.reduce { now, next ->
"${now} -> ${next}"
}
.subscribe(System.out::println)
//결과
//a -> b -> c -> d -> e

val observable1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { "observable1 = $it" }
.take(5)
val observable2 = Observable.interval(0, 200, TimeUnit.MILLISECONDS)
.map { "observable2 = $it" }
.take(5)
Observable.concat(observable1, observable2)
.subscribe(System.out::println)
//결과
//observable1 = 0
//observable1 = 1
//observable1 = 2
//observable1 = 3
//observable1 = 4
//observable2 = 0
//observable2 = 1
//observable2 = 2
//observable2 = 3
//observable2 = 4

val observable1 = Observable.just(1,2,3,4,5,6,7)
val observable2 = Observable.just("a","b","c")
Observable.zip(observable1, observable2, BiFunction { obs1, obs2 ->
return@BiFunction "$obs1 + $obs2"
}).subscribe(System.out::println)
//결과
//1 + a
//2 + b
//3 + c

val list1 = mutableListOf(1,2,3,4,5,6,7,8,9)
val list2 = mutableListOf("dog","cat","elephant", "human","thanos","deer")
val observable1 = Observable.fromIterable(list1)
.zipWith(Observable.interval(200, TimeUnit.MILLISECONDS), BiFunction { int, time ->
return@BiFunction int
})
val observable2 = Observable.fromIterable(list2)
.zipWith(Observable.interval(150, 200, TimeUnit.MILLISECONDS), BiFunction { str, time ->
return@BiFunction str
})
Observable.combineLatest(observable1, observable2, BiFunction { obs1, obs2 ->
return@BiFunction "$obs1 + $obs2"
}).subscribe(System.out::println)
//결과
//1 + dog
//1 + cat
//2 + cat
//2 + elephant
//3 + elephant
//3 + human
//4 + human
//4 + thanos
//5 + thanos
//5 + deer
//6 + deer
//7 + deer
//8 + deer
//9 + deer

val observable1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { "observable1 = $it" }
.take(5)
val observable2 = Observable.interval(0, 200, TimeUnit.MILLISECONDS)
.map { "observable2 = $it" }
.take(5)
Observable.merge(observable1, observable2)
.subscribe(System.out::println)
//결과
//observable1 = 0
//observable2 = 0
//observable1 = 1
//observable1 = 2
//observable2 = 1
//observable1 = 3
//observable2 = 2
//observable1 = 4
//observable2 = 3
//observable2 = 4

Observable.just("a", "b", "c", "d", "e", 5)
.all { it is String }
.subscribe({
if (it == true) println("all is String type")
else println("there are another type")
}, {
println("error")
})
//결과
//there are another type
Predicate이므로 결과가 boolean타입으로 반환되고 이는 1개의 값만을 발행하는 것을 보장하기 때문에 Single을 반환한다.
val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS).take(3).map { "observable1 = $it" }
val observable2 = Observable.interval(50, TimeUnit.MILLISECONDS).take(3).map { "observable2 = $it" }
val observable3 = Observable.interval(0,100, TimeUnit.MILLISECONDS).take(3).map { "observable3 = $it" }
val list = listOf(observable1, observable2, observable3)
Observable.amb(list).subscribe(System.out::println)
//결과
//observable3 = 0
//observable3 = 1
//observable3 = 2

Observable.interval(100, TimeUnit.MILLISECONDS)
.map { "observable = $it" }
.takeUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
.subscribe(System.out::println)
//결과
//observable = 0
//observable = 1
//observable = 2
//observable = 3

Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.skipUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
.subscribe(System.out::println)
//결과
//5
//6
//7
//8
//9
interval, timer로 생성된 Observable은 계산 Scheduler인 Computation을 사용함subscribeOn로 지정observeOn로 지정val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//2. RxCachedThreadScheduler-2 is working, value is 1
//1. RxCachedThreadScheduler-1 is working, value is 1
//1. RxCachedThreadScheduler-1 is working, value is 2
//2. RxCachedThreadScheduler-2 is working, value is 2
//1. RxCachedThreadScheduler-1 is working, value is 3
//2. RxCachedThreadScheduler-2 is working, value is 3
val observable = Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
observable
.subscribeOn(Schedulers.io())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxComputationThreadPool-1 is working, value is 0
//1. RxComputationThreadPool-1 is working, value is 1
//1. RxComputationThreadPool-1 is working, value is 2
//1. RxComputationThreadPool-1 is working, value is 3
//1. RxComputationThreadPool-1 is working, value is 4
interval, timer로 Obversable를 생성하면 scheduler가 자동적으로 computation으로 바뀐다.val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.newThread())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.newThread())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxNewThreadScheduler-1 is working, value is 1
//1. RxNewThreadScheduler-1 is working, value is 2
//1. RxNewThreadScheduler-1 is working, value is 3
//2. RxNewThreadScheduler-2 is working, value is 1
//2. RxNewThreadScheduler-2 is working, value is 2
//2. RxNewThreadScheduler-2 is working, value is 3
val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.single())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.single())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. RxSingleScheduler-1 is working, value is 1
//1. RxSingleScheduler-1 is working, value is 2
//1. RxSingleScheduler-1 is working, value is 3
//2. RxSingleScheduler-1 is working, value is 1
//2. RxSingleScheduler-1 is working, value is 2
//2. RxSingleScheduler-1 is working, value is 3
val observable = Observable.just(1,2,3)
observable
.subscribeOn(Schedulers.trampoline())
.subscribe {
println("1. ${Thread.currentThread().name} is working, value is ${it}")
}
observable
.subscribeOn(Schedulers.trampoline())
.subscribe {
println("2. ${Thread.currentThread().name} is working, value is ${it}")
}
//결과
//1. main is working, value is 1
//1. main is working, value is 2
//1. main is working, value is 3
//2. main is working, value is 1
//2. main is working, value is 2
//2. main is working, value is 3


기본적으로 Observable과 사용자가 지정한 연산자 체인의 작업을 수행하고 subscribe 메서드에 대해 관찰자에게 알린다.
SubscribeOn연산자는 Observable이 작동해야 하는 다른 Scheduler를 지정하여 작동 스레드를 변경한다.
observeOn연산자는 Observable가 observe에게 변경사항을 보내는데 다른 scheduler를사용하게끔 만들것이다.
다음 이미지를 통해 subscribeOn과 observeOn의 우선순위에 대해 알수 있다.

주황색 Scheduler를 사용되고 마지막에 보라색 Scheduler로 변형되었다.subscribeOn 연산자의 경우 여러번 호출가능하지만 가장 먼저 선언된 Scheduler를 사용하게 된다.

Observable.just(0, 1, 2)
.doOnNext { println("doOnNext = $it") }
.doOnComplete { println("doOnComplete") }
.doOnError { println("doOnError") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
//결과
//doOnNext = 0
//onNext = 0
//doOnNext = 1
//onNext = 1
//doOnNext = 2
//onNext = 2
//doOnComplete
//onComplete
Observable.just(0, 1, 2)
.map { it % 0 }
.doOnNext { println("doOnNext = $it") }
.doOnComplete { println("doOnComplete") }
.doOnError { println("doOnError") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
//결과
//doOnError
//onError = java.lang.ArithmeticException: / by zero

val dispose = Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnSubscribe { println("doOnSubscribe") }
.doOnDispose { println("doOnDispose") }
.subscribe({
println("onNext = $it")
}, {
println("onError = $it")
}, {
println("onComplete")
})
Thread.sleep(500)
dispose.dispose()
//결과
//doOnSubscribe
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 3
//onNext = 4
//doOnDispose

val list = mutableListOf("0", "1", "2","T3", "4")
Observable.fromIterable(list)
.map{Integer.parseInt(it)}
.onErrorReturn {
println("error => $it")
return@onErrorReturn -1
}
.subscribe {
println("onNext = $it")
}
//결과
//onNext = 0
//onNext = 1
//onNext = 2
//error => java.lang.NumberFormatException: For input string: "T3"
//onNext = -1
val observable = Observable.range(0, 10)
.map {
if (it == 3) throw Exception("error")
else it
}
observable.retry(2).subscribe({
println("onNext = $it")
},{
println("onError = $it")
})
//결과
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 0
//onNext = 1
//onNext = 2
//onNext = 0
//onNext = 1
//onNext = 2
//onError = java.lang.Exception: error
참고 :