이번에 WebFlux 기반으로 하나의 프로젝트를 진행하려고 하여서 Reactive 프로그래밍 방식에 대해서 공부해 보았습니다.
라이브러리 및 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘이며, 이 메커니즘을 편리하게 사용할 수 있는 인터페이스 제공
논블로킹(Non-blocking), 백프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준
중요하게 봐야하는 점은 Publisher 가 Subscriber 에서 데이터를 Push 하는 방식이 아니라,
Subscriber 가 Publisher 에게 데이터를 요청하는 Pull 방식이라는 점입니다. 이런 방식을 백프레셔라고 합니다. (한마디로 요청에 의해서만 데이터를 제공하는 방식 입니다.)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
reactive-streams.org 에서, Publisher 가 생성하고 Subscriber 가 소비하는 모든 신호는 non-blocking 이어야한다고 정의했습니다.
따라서, Publisher - Subscriber 를 구현할 때, block이 되지않도록 해야함
Reactive Streams는 위의 인터페이스로 데이터를 통지하는 구조를 제공하는데, 이 구조가 정상적으로 작동하려면 Reactive Streams의 규칙을 준수해야 함
구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생
통지는 순차적으로 이루어짐, 여러 통지를 동시에 수행할 수 없음
데이터 및 에러 통지 시 NULL을 통지하지 않음
Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료
데이터 개수 요청에 Long.MAX_VALUE를 설정하면 데이터 개수에 의한 통지 제한은 없어짐
Subscription의 메서드는 동기화된 상태로 호출되어야 함, 메서드를 동시에 호출해서는 안됨
Mono는 Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체입니다.
Flux 와의 차이점
예제
// java
Mono.fromCallable(System::currentTimeMillis)
.flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
.timeout(Duration.ofSeconds(3), errorHandler::fallback)
.doOnSuccess(r -> serviceM.incrementSuccess())
.subscribe(System.out::println);
// kotlin
val mono: Mono<String> = Mono.justOrEmpty("Hello Reactive World")
mono.subscribe(::println)
주로 체인형태로 많이 쓰게 되어서 Java랑 Kotlin이랑 형태가 유사합니다.
Flux는 Reactive Streams 에서 정의한 Publisher 의 구현체로서, 0-N 개의 데이터를 전달할 수 있습니다. 하나의 데이터를 전달할 때마다 onNext 이벤트를 발생합니다.
Flux 내의 모든 데이터의 전달 처리가 완료 되면 onComplete
이벤트가 발생하며, 데이터를 전달하는 과정에서 오류가 발생하면 onError
이벤트가 발생합니다.
Flux.fromIterable(getSomeLongList())
.mergeWith(Flux.interval(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResume(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
val intFlux = Flux.range(1, 5)
intFlux.subscribe { num ->
println("item: $num")
}
Flux 는 코루틴의 Flow 찰떡궁합으로 쓰일 수가 있는데 코루틴의 Flow는 나중에 자세히 공부하여 글을 올리겠습니다.
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
Reactive Streams 에 대해서 공부해 보았는데
Spring WebFlux에서 잘 적용시키면 될 것 같습니다.
이 Reactive Streams 사실 옛날에 개념을 알고 있긴 했는데
그때 당시에는 머리아프고 복잡하다고 생각을 하였는데
지금보니까 되게 재미있고 좋은 공부였던것같아요
개인적으로 자바스크립트의 Promis, Async Await 사용하는 느낌이랑 비슷하긴 헀습니다.
이런 내용을 알기 전에 사용을 해봤는데
왜 데이터가 안들어오는지, Publisher가 대체 뭔지, 체인을 걸때 흐름이 어떻게 흘러가는지에 대해서 이해가 안되 적용하기가 쉽지 않았는데
해당 내용들에 대해서 공부하니까 코드로 적용하기가 매우 편했습니다.
또한 Reactive Stream는 비동기와 관련이 많기 때문에 비동기 관련된 내용들도 공부를 하면 좋을 것 같습니다.