reactor 정리

아엘·2024년 3월 4일
0

spring framework

목록 보기
3/5

Mono, Flux

둘다 publisher이며 단건을 publisher할때는 Mono를 다건을 publisher할때는 Flux를 사용합니다.

Mono 예시

fun main(args: Array<String>) {
    // Mono는 publisher라서 subscribe해줘야 do next가 처리됩니다.
    Mono.just(1).doOnNext { logger.info { "from publisher -> ${it}" } }
        .subscribe()

    // log 함수도 제공합니다.
    Mono.just(1).doOnNext { logger.info { "from publisher -> ${it}" } }
        .log().subscribe()

    // 자료형 바꿔보기
    Mono.just(0).map { it + 1}.doOnNext { logger.info { "from publisher -> ${it}" } }
        .log().subscribe()

    /**
     * 23:34:56.012|INFO |main|Main.invoke|from publisher -> 1
     * 23:34:56.016|INFO |main|reactor.Mono.PeekFuseable.1.info|| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
     * 23:34:56.017|INFO |main|reactor.Mono.PeekFuseable.1.info|| request(unbounded)
     * 23:34:56.017|INFO |main|Main.invoke|from publisher -> 1
     * 23:34:56.018|INFO |main|reactor.Mono.PeekFuseable.1.info|| onNext(1)
     * 23:34:56.018|INFO |main|reactor.Mono.PeekFuseable.1.info|| onComplete()
     * 23:34:56.019|INFO |main|reactor.Mono.PeekFuseable.2.info|| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
     * 23:34:56.019|INFO |main|reactor.Mono.PeekFuseable.2.info|| request(unbounded)
     * 23:34:56.020|INFO |main|Main.invoke|from publisher -> 1
     * 23:34:56.020|INFO |main|reactor.Mono.PeekFuseable.2.info|| onNext(1)
     * 23:34:56.020|INFO |main|reactor.Mono.PeekFuseable.2.info|| onComplete()
     */
}

Flux 예시

Flux는 publisher라서 subscribe해줘야 do next가 처리됩니다.
1,2,3은 Stream입니다.


    
        Flux.just(1, 2, 3).doOnNext { logger.info { "from publisher -> ${it}" } }
            .subscribe()

    /**
     * 23:34:01.654|INFO |main|Main.invoke|from publisher -> 1
     * 23:34:01.655|INFO |main|Main.invoke|from publisher -> 2
     * 23:34:01.655|INFO |main|Main.invoke|from publisher -> 3
     */

배열

    Flux.just(arrayOf(1, 2, 3)).doOnNext { logger.info { "from publisher -> ${it}" } }
        .log().subscribe()

    /**
     * 23:36:07.691|INFO |main|Main.invoke|from publisher -> [Ljava.lang.Integer;@9353778
     * 23:36:07.691|INFO |main|reactor.Flux.PeekFuseable.1.info|| onNext([1, 2, 3])
     * 23:36:07.691|INFO |main|reactor.Flux.PeekFuseable.1.info|| onComplete()
     */

map과 flatMap

map은 block 방식, flatMap은 non-block 방식
it*it가 아닌 NIO 처리를 기다려야한다면, block 방식와 non-block 방식의 차이가 나타난다.

	Flux.range(1,10).map { it * it }.log().subscribe()
    /**
     * 23:37:33.444|INFO |main|reactor.Flux.MapFuseable.1.info|| onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
     * 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| request(unbounded)
     * 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(1)
     * 23:37:33.446|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(4)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(9)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(16)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(25)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(36)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(49)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(64)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(81)
     * 23:37:33.447|INFO |main|reactor.Flux.MapFuseable.1.info|| onNext(100)
     * 23:37:33.448|INFO |main|reactor.Flux.MapFuseable.1.info|| onComplete()
     */
     
     Flux.range(1,10).flatMap { Mono.just(it*it) }.log().subscribe()

    /**
     * 00:32:18.411|INFO |main|reactor.Flux.FlatMap.1.info|onSubscribe(FluxFlatMap.FlatMapMain)
     * 00:32:18.412|INFO |main|reactor.Flux.FlatMap.1.info|request(unbounded)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(1)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(4)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(9)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(16)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(25)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(36)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(49)
     * 00:32:18.440|INFO |main|reactor.Flux.FlatMap.1.info|onNext(64)
     * 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onNext(81)
     * 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onNext(100)
     * 00:32:18.441|INFO |main|reactor.Flux.FlatMap.1.info|onComplete()
     */

function

reactor에서 함수호출 방식을 비교해보자.

import mu.KotlinLogging
import reactor.core.publisher.Mono

private val logger = KotlinLogging.logger {}

fun getRequest(): Mono<Int> {
    return Mono.just(1)
}

fun addA(request: Mono<Int>): Mono<Int> {
    return request.map { it + 1 }
}

fun addB(request: Mono<Int>): Mono<Int> {
    return request.map { it + 2 }
}

fun nonPublisherAddA(request: Int): Mono<Int> {
    return Mono.fromCallable { request + 1 }
}

fun nonPublisherAddB(request: Int): Mono<Int> {
    return Mono.fromCallable { request + 2 }
}

...

Mono, Flux는 publisher이기 때문에 subscibe() 처리해야합니다.

fun main() {
    val request = getRequest()
    logger.debug { "request: $request" }

    val addA = addA(request)
    logger.debug { "request: $addA" }

    val addB = addB(request)
    logger.debug { "request: $addB" }

    /**
     * 23:45:12.550|DEBUG|main|Function.main|request: MonoJust
     * 23:45:12.550|DEBUG|main|Function.main|request: MonoMapFuseable
     * 23:45:12.551|DEBUG|main|Function.main|request: MonoMapFuseable
     */
}

block

fun main() {

    val request = getRequest()
    logger.debug { "request: ${request.block()}" }

    val addA = addA(request)
    logger.debug { "addA: ${addA.block()}" }

    val addB = addB(addA)
    logger.debug { "addB: ${addB.block()}" }

    /**
     * 23:49:46.129|DEBUG|main|Function.main|request: 1
     * 23:49:46.138|DEBUG|main|Function.main|addA: 2
     * 23:49:46.138|DEBUG|main|Function.main|addB: 4
     */
}

subscribe

	val request = getRequest().doOnNext { logger.debug { "request: $it" } }
    val addA = addA(request).doOnNext { logger.debug { "addA: $it" } }
    val addB = addB(addA).doOnNext { logger.debug { "addB: $it" } }

    addB.subscribe()

    /**
     * 23:50:11.857|DEBUG|main|Function.invoke|request: 1
     * 23:50:11.857|DEBUG|main|Function.invoke|addA: 2
     * 23:50:11.857|DEBUG|main|Function.invoke|addB: 4
     */

chaining

fun main() {

    getRequest()
        .doOnNext { logger.debug { "request: $it" } }
        .flatMap { addA(Mono.just(it)) }
        .doOnNext { logger.debug { "addA: $it" } }
        .flatMap { addB(Mono.just(it)) }
        .doOnNext { logger.debug { "addB: $it" } }
        .subscribe()
        
    /**
     * 23:51:36.904|DEBUG|main|Function.invoke|request: 1
     * 23:51:36.905|DEBUG|main|Function.invoke|addA: 2
     * 23:51:36.906|DEBUG|main|Function.invoke|addB: 4
     */
}

nonPublisher함수
flatMap 안에서 로직이 간결해짐
reactor에서는 input은 기본형 타입으로 받아서 반환은 publisher로 해주는 것이 좋다.

fun main() {

    getRequest()
        .doOnNext { logger.debug { "request: $it" } }
        .flatMap { nonPublisherAddA(it) }
        .doOnNext { logger.debug { "addA: $it" } }
        .flatMap { nonPublisherAddB(it) }
        .doOnNext { logger.debug { "addB: $it" } }
        .subscribe()

    /**
     * 23:54:43.800|DEBUG|main|Function.invoke|request: 1
     * 23:54:43.802|DEBUG|main|Function.invoke|addA: 2
     * 23:54:43.803|DEBUG|main|Function.invoke|addB: 4
     */
}

ReactorPubsub

main 스레드에서 실행


private val logger = KotlinLogging.logger {}

fun main() {
    Flux.range(1, 10)
        .doOnNext{ logger.info { "no filter -> ${it}" } }
        .filter{ it % 2 == 0 }
        .doOnNext{ logger.info { "after filter -> ${it}" } }
        .subscribe()

    /**
     * 23:59:46.664|INFO |main|ReactorPubsub.invoke|no filter -> 1
     * 23:59:46.664|INFO |main|ReactorPubsub.invoke|no filter -> 2
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 2
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 3
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 4
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 4
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 5
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 6
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 6
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 7
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|no filter -> 8
     * 23:59:46.665|INFO |main|ReactorPubsub.invoke|after filter -> 8
     * 23:59:46.666|INFO |main|ReactorPubsub.invoke|no filter -> 9
     * 23:59:46.666|INFO |main|ReactorPubsub.invoke|no filter -> 10
     * 23:59:46.666|INFO |main|ReactorPubsub.invoke|after filter -> 10
     */
}

병렬스레드 처리

Schedulers.parallel()
core 개수 만큼의 워커를 만들어 병렬로 실행을 지원하는 스케줄러
Flux를 분할해서 각각 별도의 스레드에서 처리하도록 한다.


private val logger = KotlinLogging.logger {}

private val singleThread = Schedulers.newSingle("single-thread")

fun main() {

    /**
     * Delay each of this [Flux] elements ([Subscriber.onNext] signals)
     * by a given [Duration]. Signals are delayed and continue on the
     * [parallel][Schedulers.parallel] default Scheduler, but empty sequences or
     * immediate error signals are not delayed.
     *
     *
     *
     * <img class="marble" src="doc-files/marbles/delayElements.svg" alt=""></img>
     *
     * @param delay duration by which to delay each [Subscriber.onNext] signal
     * @return a delayed [Flux]
     * @see .delaySubscription
     */
//    fun delayElements(delay: Duration?): Flux<T> {
//        return delayElements(delay, Schedulers.parallel())
//    }


    Flux.range(1, 12)
        .doOnNext{ logger.info { "no filter -> ${it}" } }
        .filter{ it % 2 == 0 }
        .doOnNext{ logger.info { "after 2 filter -> ${it}" } }
        .filter{ it % 3 == 0 }
        .delayElements(Duration.ofMillis(10))
        .doOnNext{ logger.info { "after 3 filter -> ${it}" } }
        .filter{ it % 4 == 0 }
        .doOnNext{ logger.info { "after 4 filter -> ${it}" } }
        .blockLast()

    /**
     * 00:03:03.778|INFO |main|ReactorPubsub.invoke|no filter -> 1
     * 00:03:03.782|INFO |main|ReactorPubsub.invoke|no filter -> 2
     * 00:03:03.783|INFO |main|ReactorPubsub.invoke|after 2 filter -> 2
     * 00:03:03.783|INFO |main|ReactorPubsub.invoke|no filter -> 3
     * 00:03:03.784|INFO |main|ReactorPubsub.invoke|no filter -> 4
     * 00:03:03.785|INFO |main|ReactorPubsub.invoke|after 2 filter -> 4
     * 00:03:03.786|INFO |main|ReactorPubsub.invoke|no filter -> 5
     * 00:03:03.787|INFO |main|ReactorPubsub.invoke|no filter -> 6
     * 00:03:03.789|INFO |main|ReactorPubsub.invoke|after 2 filter -> 6
     * 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|after 3 filter -> 6
     * 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 7
     * 00:03:03.808|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 8
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 8
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 9
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 10
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 10
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 11
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 12
     * 00:03:03.809|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 12
     * 00:03:03.821|INFO |parallel-2|ReactorPubsub.invoke|after 3 filter -> 12
     * 00:03:03.822|INFO |parallel-2|ReactorPubsub.invoke|after 4 filter -> 12
     *
     */
}

특정 스레드에서 처리하기

subscribeOn으로 스케줄러 전환하기
single 유형의 스레드 풀을 사용했고, 작업이 순서대로 실행되고 두 작업이 동시에 실행되지 않도록 해야 할때 유용함

private val logger = KotlinLogging.logger {}

private val singleThread = Schedulers.newSingle("single-thread")

fun main() {

    Flux.range(1, 12)
        .doOnNext{ logger.info { "no filter -> ${it}" } }
        .filter{ it % 2 == 0 }
        .doOnNext{ logger.info { "after 2 filter -> ${it}" } }
        .filter{ it % 3 == 0 }
        .doOnNext{ logger.info { "after 3 filter -> ${it}" } }
        .filter{ it % 4 == 0 }
        .doOnNext{ logger.info { "after 4 filter -> ${it}" } }
        .subscribeOn(singleThread)
        .blockLast()

    /**
     * 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 1
     * 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 2
     * 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 2
     * 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 3
     * 00:23:54.104|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 4
     * 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 4
     * 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 5
     * 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 6
     * 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 6
     * 00:23:54.105|INFO |single-thread-1|ReactorPubsub.invoke|after 3 filter -> 6
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 7
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 8
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 8
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 9
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 10
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 10
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 11
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 12
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 12
     * 00:23:54.106|INFO |single-thread-1|ReactorPubsub.invoke|after 3 filter -> 12
     * 00:23:54.107|INFO |single-thread-1|ReactorPubsub.invoke|after 4 filter -> 12
     *
     */
}

parallel, single 모두 사용
subscribeOn은 publisheOn으로 정의되지 않은 chain에 한해서 특정 스레드에서 처리합니다.

private val logger = KotlinLogging.logger {}

private val singleThread = Schedulers.newSingle("single-thread")

fun main() {

    Flux.range(1, 12)
        .doOnNext{ logger.info { "no filter -> ${it}" } }
        .filter{ it % 2 == 0 }
        .doOnNext{ logger.info { "after 2 filter -> ${it}" } }
        .filter{ it % 3 == 0 }
        .delayElements(Duration.ofMillis(10)) // parallel
        .doOnNext{ logger.info { "after 3 filter -> ${it}" } }
        .filter{ it % 4 == 0 }
        .doOnNext{ logger.info { "after 4 filter -> ${it}" } }
        .subscribeOn(singleThread) // single
        .blockLast()

    /**
     * 00:48:11.098|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 1
     * 00:48:11.099|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 2
     * 00:48:11.099|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 2
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 3
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 4
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 4
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 5
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|no filter -> 6
     * 00:48:11.100|INFO |single-thread-1|ReactorPubsub.invoke|after 2 filter -> 6
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 3 filter -> 6
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 7
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 8
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 8
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 9
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 10
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 10
     * 00:48:11.114|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 11
     * 00:48:11.115|INFO |parallel-1|ReactorPubsub.invoke|no filter -> 12
     * 00:48:11.115|INFO |parallel-1|ReactorPubsub.invoke|after 2 filter -> 12
     * 00:48:11.127|INFO |parallel-2|ReactorPubsub.invoke|after 3 filter -> 12
     * 00:48:11.128|INFO |parallel-2|ReactorPubsub.invoke|after 4 filter -> 12
     */
}

publishOn과 subscribeOn은 스트림 배압 상세하게 처리할때 유용하게 쓰입니다.
publishOn은 처리가 느린 chain에 대한 처리를 위해서 사용하고,
subscribeOn은 들어오는 chain에 처리는 느린데, 나가는 chain의 처리가 빠를때 사용합니다.

출처:
publisher 구현체: https://brunch.co.kr/@springboot/154
delayElements: https://hackernoon.com/ko/Spring-Webflux-%EC%8A%A4%EB%A0%88%EB%94%A9-%EB%AA%A8%EB%8D%B8-%EC%86%8C%EA%B0%9C
subscribeOn, 병렬스레드: https://tech.kakao.com/2018/05/29/reactor-programming/

TODO

더 자세한 배압처리

profile
하루 하나씩

0개의 댓글