둘다 publisher이며 단건을 publisher할때는 Mono를 다건을 publisher할때는 Flux를 사용합니다.
fun main(args: Array<String>) {
// Mono는 publisher라서 subscribe해줘야 do next가 처리됩니다.
Mono.just(1).doOnNext { logger.info { "from publisher -> ${it}" } }
.subscribe()
// log 함수도 제공합니다.
Mono.just(1).doOnNext { logger.info { "from publisher -> ${it}" } }
.log().subscribe()
// 자료형 바꿔보기
Mono.just(0).map { it + 1}.doOnNext { logger.info { "from publisher -> ${it}" } }
.log().subscribe()
/**
* 23:34:56.012|INFO |main|Main.invoke|from publisher -> 1
* 23:34:56.016|INFO |main|reactor.Mono.PeekFuseable.1.info|| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
* 23:34:56.017|INFO |main|reactor.Mono.PeekFuseable.1.info|| request(unbounded)
* 23:34:56.017|INFO |main|Main.invoke|from publisher -> 1
* 23:34:56.018|INFO |main|reactor.Mono.PeekFuseable.1.info|| onNext(1)
* 23:34:56.018|INFO |main|reactor.Mono.PeekFuseable.1.info|| onComplete()
* 23:34:56.019|INFO |main|reactor.Mono.PeekFuseable.2.info|| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
* 23:34:56.019|INFO |main|reactor.Mono.PeekFuseable.2.info|| request(unbounded)
* 23:34:56.020|INFO |main|Main.invoke|from publisher -> 1
* 23:34:56.020|INFO |main|reactor.Mono.PeekFuseable.2.info|| onNext(1)
* 23:34:56.020|INFO |main|reactor.Mono.PeekFuseable.2.info|| onComplete()
*/
}
Flux는 publisher라서 subscribe해줘야 do next가 처리됩니다.
1,2,3은 Stream입니다.
Flux.just(1, 2, 3).doOnNext { logger.info { "from publisher -> ${it}" } }
.subscribe()
/**
* 23:34:01.654|INFO |main|Main.invoke|from publisher -> 1
* 23:34:01.655|INFO |main|Main.invoke|from publisher -> 2
* 23:34:01.655|INFO |main|Main.invoke|from publisher -> 3
*/
배열
Flux.just(arrayOf(1, 2, 3)).doOnNext { logger.info { "from publisher -> ${it}" } }
.log().subscribe()
/**
* 23:36:07.691|INFO |main|Main.invoke|from publisher -> [Ljava.lang.Integer;@9353778
* 23:36:07.691|INFO |main|reactor.Flux.PeekFuseable.1.info|| onNext([1, 2, 3])
* 23:36:07.691|INFO |main|reactor.Flux.PeekFuseable.1.info|| onComplete()
*/
map과 flatMap
map은 block 방식, flatMap은 non-block 방식
it*it가 아닌 NIO 처리를 기다려야한다면, block 방식와 non-block 방식의 차이가 나타난다.
Flux.range(1,10).map { it * it }.log().subscribe()
/**
* 23:37:33.444|INFO |main|reactor.Flux.MapFuseable.1.info|| onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
* 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| request(unbounded)
* 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(1)
* 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(4)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(9)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(16)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(25)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(36)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(49)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(64)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(81)
* 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(100)
* 23:37:33.448|INFO |main|reactor.Flux.MapFuseable.1.info|| onComplete()
*/
Flux.range(1,10).flatMap { Mono.just(it*it) }.log().subscribe()
/**
* 00:32:18.411|INFO |main|reactor.Flux.FlatMap.1.info|onSubscribe(FluxFlatMap.FlatMapMain)
* 00:32:18.412|INFO |main|reactor.Flux.FlatMap.1.info|request(unbounded)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(1)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(4)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(9)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(16)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(25)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(36)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(49)
* 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(64)
* 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onNext(81)
* 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onNext(100)
* 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onComplete()
*/
reactor에서 함수호출 방식을 비교해보자.
import mu.KotlinLogging
import reactor.core.publisher.Mono
private val logger = KotlinLogging.logger {}
fun getRequest(): Mono<Int> {
return Mono.just(1)
}
fun addA(request: Mono<Int>): Mono<Int> {
return request.map { it + 1 }
}
fun addB(request: Mono<Int>): Mono<Int> {
return request.map { it + 2 }
}
fun nonPublisherAddA(request: Int): Mono<Int> {
return Mono.fromCallable { request + 1 }
}
fun nonPublisherAddB(request: Int): Mono<Int> {
return Mono.fromCallable { request + 2 }
}
...
Mono, Flux는 publisher이기 때문에 subscibe() 처리해야합니다.
fun main() {
val request = getRequest()
logger.debug { "request: $request" }
val addA = addA(request)
logger.debug { "request: $addA" }
val addB = addB(request)
logger.debug { "request: $addB" }
/**
* 23:45:12.550|DEBUG|main|Function.main|request: MonoJust
* 23:45:12.550|DEBUG|main|Function.main|request: MonoMapFuseable
* 23:45:12.551|DEBUG|main|Function.main|request: MonoMapFuseable
*/
}
block
fun main() {
val request = getRequest()
logger.debug { "request: ${request.block()}" }
val addA = addA(request)
logger.debug { "addA: ${addA.block()}" }
val addB = addB(addA)
logger.debug { "addB: ${addB.block()}" }
/**
* 23:49:46.129|DEBUG|main|Function.main|request: 1
* 23:49:46.138|DEBUG|main|Function.main|addA: 2
* 23:49:46.138|DEBUG|main|Function.main|addB: 4
*/
}
subscribe
val request = getRequest().doOnNext { logger.debug { "request: $it" } }
val addA = addA(request).doOnNext { logger.debug { "addA: $it" } }
val addB = addB(addA).doOnNext { logger.debug { "addB: $it" } }
addB.subscribe()
/**
* 23:50:11.857|DEBUG|main|Function.invoke|request: 1
* 23:50:11.857|DEBUG|main|Function.invoke|addA: 2
* 23:50:11.857|DEBUG|main|Function.invoke|addB: 4
*/
chaining
fun main() {
getRequest()
.doOnNext { logger.debug { "request: $it" } }
.flatMap { addA(Mono.just(it)) }
.doOnNext { logger.debug { "addA: $it" } }
.flatMap { addB(Mono.just(it)) }
.doOnNext { logger.debug { "addB: $it" } }
.subscribe()
/**
* 23:51:36.904|DEBUG|main|Function.invoke|request: 1
* 23:51:36.905|DEBUG|main|Function.invoke|addA: 2
* 23:51:36.906|DEBUG|main|Function.invoke|addB: 4
*/
}
nonPublisher함수
flatMap 안에서 로직이 간결해짐
reactor에서는 input은 기본형 타입으로 받아서 반환은 publisher로 해주는 것이 좋다.
fun main() {
getRequest()
.doOnNext { logger.debug { "request: $it" } }
.flatMap { nonPublisherAddA(it) }
.doOnNext { logger.debug { "addA: $it" } }
.flatMap { nonPublisherAddB(it) }
.doOnNext { logger.debug { "addB: $it" } }
.subscribe()
/**
* 23:54:43.800|DEBUG|main|Function.invoke|request: 1
* 23:54:43.802|DEBUG|main|Function.invoke|addA: 2
* 23:54:43.803|DEBUG|main|Function.invoke|addB: 4
*/
}
main 스레드에서 실행
private val logger = KotlinLogging.logger {}
fun main() {
Flux.range(1, 10)
.doOnNext{ logger.info { "no filter -> ${it}" } }
.filter{ it % 2 == 0 }
.doOnNext{ logger.info { "after filter -> ${it}" } }
.subscribe()
/**
* 23:59:46.664|INFO |main|ReactorPubsub.invoke|no filter -> 1
* 23:59:46.664|INFO |main|ReactorPubsub.invoke|no filter -> 2
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 2
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 3
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 4
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 4
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 5
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 6
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 6
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 7
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 8
* 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 8
* 23:59:46.666|INFO |main|ReactorPubsub.invoke|no filter -> 9
* 23:59:46.666|INFO |main|ReactorPubsub.invoke|no filter -> 10
* 23:59:46.666|INFO |main|ReactorPubsub.invoke|after filter -> 10
*/
}
병렬스레드 처리
Schedulers.parallel()
core 개수 만큼의 워커를 만들어 병렬로 실행을 지원하는 스케줄러
Flux를 분할해서 각각 별도의 스레드에서 처리하도록 한다.
private val logger = KotlinLogging.logger {}
private val singleThread = Schedulers.newSingle("single-thread")
fun main() {
/**
* Delay each of this [Flux] elements ([Subscriber.onNext] signals)
* by a given [Duration]. Signals are delayed and continue on the
* [parallel][Schedulers.parallel] default Scheduler, but empty sequences or
* immediate error signals are not delayed.
*
*
*
* <img class="marble" src="doc-files/marbles/delayElements.svg" alt=""></img>
*
* @param delay duration by which to delay each [Subscriber.onNext] signal
* @return a delayed [Flux]
* @see .delaySubscription
*/
// fun delayElements(delay: Duration?): Flux<T> {
// return delayElements(delay, Schedulers.parallel())
// }
Flux.range(1, 12)
.doOnNext{ logger.info { "no filter -> ${it}" } }
.filter{ it % 2 == 0 }
.doOnNext{ logger.info { "after 2 filter -> ${it}" } }
.filter{ it % 3 == 0 }
.delayElements(Duration.ofMillis(10))
.doOnNext{ logger.info { "after 3 filter -> ${it}" } }
.filter{ it % 4 == 0 }
.doOnNext{ logger.info { "after 4 filter -> ${it}" } }
.blockLast()
/**
* 00:03:03.778|INFO |main|ReactorPubsub.invoke|no filter -> 1
* 00:03:03.782|INFO |main|ReactorPubsub.invoke|no filter -> 2
* 00:03:03.783|INFO |main|ReactorPubsub.invoke|after 2 filter -> 2
* 00:03:03.783|INFO |main|ReactorPubsub.invoke|no filter -> 3
* 00:03:03.784|INFO |main|ReactorPubsub.invoke|no filter -> 4
* 00:03:03.785|INFO |main|ReactorPubsub.invoke|after 2 filter -> 4
* 00:03:03.786|INFO |main|ReactorPubsub.invoke|no filter -> 5
* 00:03:03.787|INFO |main|ReactorPubsub.invoke|no filter -> 6
* 00:03:03.789|INFO |main|ReactorPubsub.invoke|after 2 filter -> 6
* 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|after 3 filter -> 6
* 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 7
* 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 8
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 8
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 9
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 10
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 10
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 11
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 12
* 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 12
* 00:03:03.821|INFO |parallel-2|ReactorPubsub.invoke|after 3 filter -> 12
* 00:03:03.822|INFO |parallel-2|ReactorPubsub.invoke|after 4 filter -> 12
*
*/
}
특정 스레드에서 처리하기
subscribeOn으로 스케줄러 전환하기
single 유형의 스레드 풀을 사용했고, 작업이 순서대로 실행되고 두 작업이 동시에 실행되지 않도록 해야 할때 유용함
private val logger = KotlinLogging.logger {}
private val singleThread = Schedulers.newSingle("single-thread")
fun main() {
Flux.range(1, 12)
.doOnNext{ logger.info { "no filter -> ${it}" } }
.filter{ it % 2 == 0 }
.doOnNext{ logger.info { "after 2 filter -> ${it}" } }
.filter{ it % 3 == 0 }
.doOnNext{ logger.info { "after 3 filter -> ${it}" } }
.filter{ it % 4 == 0 }
.doOnNext{ logger.info { "after 4 filter -> ${it}" } }
.subscribeOn(singleThread)
.blockLast()
/**
* 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 1
* 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 2
* 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 2
* 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 3
* 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 4
* 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 4
* 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 5
* 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 6
* 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 6
* 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 3 filter -> 6
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 7
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 8
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 8
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 9
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 10
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 10
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 11
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 12
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 12
* 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 3 filter -> 12
* 00:23:54.107|INFO |single-thread-1|ReactorPubsub.invoke|after 4 filter -> 12
*
*/
}
parallel, single 모두 사용
subscribeOn은 publisheOn으로 정의되지 않은 chain에 한해서 특정 스레드에서 처리합니다.
private val logger = KotlinLogging.logger {}
private val singleThread = Schedulers.newSingle("single-thread")
fun main() {
Flux.range(1, 12)
.doOnNext{ logger.info { "no filter -> ${it}" } }
.filter{ it % 2 == 0 }
.doOnNext{ logger.info { "after 2 filter -> ${it}" } }
.filter{ it % 3 == 0 }
.delayElements(Duration.ofMillis(10)) // parallel
.doOnNext{ logger.info { "after 3 filter -> ${it}" } }
.filter{ it % 4 == 0 }
.doOnNext{ logger.info { "after 4 filter -> ${it}" } }
.subscribeOn(singleThread) // single
.blockLast()
/**
* 00:48:11.098|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 1
* 00:48:11.099|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 2
* 00:48:11.099|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 2
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 3
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 4
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 4
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 5
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 6
* 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 6
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 3 filter -> 6
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 7
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 8
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 8
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 9
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 10
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 10
* 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 11
* 00:48:11.115|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 12
* 00:48:11.115|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 12
* 00:48:11.127|INFO |parallel-2|ReactorPubsub.invoke|after 3 filter -> 12
* 00:48:11.128|INFO |parallel-2|ReactorPubsub.invoke|after 4 filter -> 12
*/
}
publishOn과 subscribeOn은 스트림 배압 상세하게 처리할때 유용하게 쓰입니다.
publishOn은 처리가 느린 chain에 대한 처리를 위해서 사용하고,
subscribeOn은 들어오는 chain에 처리는 느린데, 나가는 chain의 처리가 빠를때 사용합니다.
출처:
publisher 구현체: https://brunch.co.kr/@springboot/154
delayElements: https://hackernoon.com/ko/Spring-Webflux-%EC%8A%A4%EB%A0%88%EB%94%A9-%EB%AA%A8%EB%8D%B8-%EC%86%8C%EA%B0%9C
subscribeOn, 병렬스레드: https://tech.kakao.com/2018/05/29/reactor-programming/
더 자세한 배압처리