mono
.onErrorContinue{ e, _ -> System.err.println(e) }
쓰지말고
mono
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty())
를 쓰자
어느날, Webflux를 사용해 교내 프로젝트를 개발하던 도중, 로직이 제대로 작동하지 않는것을 발견하였습니다.
private fun parse(message: Message) =
ObjectMapper()
.registerModule(KotlinModule())
.registerModule(JavaTimeModule())
.toMono()
.map{ it.readValue(message.body, type.java) }
.onErrorContinue{ _, _ -> messagePublishService.publishError(Error.WRONG_PAYLOAD, WrongPayloadError(event)) }
[코드 1.1] Saga패턴 구현을 위한 Error발행로직
onErrorContinue의 인자값으로 error발행로직을 넘겨주었는데요. 테스트중, Error가 발생해야할 상황에서 MessageQueue에 아무런 메세지도 발행하지 않는것을 발견하였습니다.
❓ 여기서 잠깐! onErrorContinue가 뭐냐고요?
[그림 1.1] onErrorContinue 마블 다이어그램
마블 다이어그램을 보시면 알 수 있듯이
ReactiveStream 내 특정 데이터를 처리하는 도중, 오류가 발생하면 인자로 받은 handling 로직을 처리하고, 다음 데이터로 건너뛰어(onNext) 처리를 진행합니다. (참고로 Upstream 에게 연산을 전파합니다)
하지만 Mono는 데이터가 0-1개이기 때문에, 데이터가 2개 이상이어야 하는 onErrorContinue가 재대로 동작하지 않은것이라고 생각하였습니다.
위에서 말한 가설을 검증해보기 위해, Mono와 Flux 에서 onErrorContinue를 각각 실행하여 로깅을 하는 test method를 작성했습니다. 또한, Data 개수의 문제인지, Mono인것이 문제인지를 명확히 알아보기 위해, 데이터가 1개밖에 없는 Flux 또한 만들어보았습니다.
fun main() {
Mono.just("")
.flatMap { Mono.error<String>(RuntimeException("Mono에서 오류가 발생하였습니다!")) }
.onErrorContinue{ e, _ -> println(e.message) }
.subscribe()
Flux.just("가", "오류발생", "다", "라")
.flatMap {
if(it.equals("오류발생"))
Mono.error<String>(RuntimeException("Flux에서 오류가 발생하였습니다!"))
else Mono.just(it)
}
.onErrorContinue{ e, _ -> println(e.message) }
.subscribe()
Flux.just("오류발생")
.flatMap {
if(it.equals("오류발생"))
Mono.error<String>(RuntimeException("Flux2에서 오류가 발생하였습니다!"))
else Mono.just(it)
}
.onErrorContinue{ e, _ -> println(e.message) }
.subscribe()
}
[코드 2.1] Mono, Flux별 onErrorContinue 테스트
결과, 예상대로 Flux에서는 정상적으로 로그가 찍혔지만, Mono에서는 error핸들링이 되지 않은 것을 볼 수 있었습니다. 하지만 아직 의문은 남아있었습니다. 왜 Flux에서는 1개의 데이터만 있어도 정상적으로 작동하는걸까요?
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Mono에서 오류가 발생하였습니다!
Caused by: java.lang.RuntimeException: Mono에서 오류가 발생하였습니다!
at com.iplease.server.ip.manage.infra.message.data.dto.WrongPayloadErrorKt.main$lambda-0(WrongPayloadError.kt:16)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4385)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515)
at reactor.core.publisher.Mono.subscribe(Mono.java:4232)
at com.iplease.server.ip.manage.infra.message.data.dto.WrongPayloadErrorKt.main(WrongPayloadError.kt:18)
at com.iplease.server.ip.manage.infra.message.data.dto.WrongPayloadErrorKt.main(WrongPayloadError.kt)
Flux에서 오류가 발생하였습니다!
Flux2에서 오류가 발생하였습니다!
[코드 2.2] 테스트 결과 콘솔
오류의 원인은 찾았지만, 아직 깔끔한 해결책을 찾지는 못했습니다. 그러니 조금 더 파고들어보.. 려고 했으나, 블로그에서도, SOF에서도 관련한 인사이트를 얻지 못했습니다 😭
마지막으로, Mono.onErrorContinue 에 대한 javadoc을 파헤쳐 본 결과 원하던 답을 얻을 수 있었습니다
The mode doesn't really make sense on a Mono,
since we're sure there will be no further value to continue with.
onErrorResume(Function) is a more classical fit.
[코드 3.1] Mono.onErrorContinue javadoc 일부발췌
예상한대로, Mono에서는 다음 값이 존재하지 않아, onErrorContinue를 사용할 수 없다고 합니다.
또한, onErrorResume이 더 적합하다고 추천하며, 예시코드까지 알려주었죠. 역시 공식문서
mono
.flatMap(id -> repository.retrieveById(id)
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty()))
[코드 3.2] onErrorResume BestPractice
doOnError를 통해 handling로직을 처리하고, onErrorResume으로 처리된 오류가 Downstream으로 전파되는것을 막는 방식이더라구요.
private fun parse(message: Message) =
ObjectMapper()
.registerModule(KotlinModule())
.registerModule(JavaTimeModule())
.toMono()
.map{ it.readValue(message.body, type.java) }
.doOnError { messagePublishService.publishError(Error.WRONG_PAYLOAD, WrongPayloadError(event)) }
.onErrorResume{ Mono.empty() }
[코드 4.2] 리펙토링된 코드
원인과 해결책을 알게 된 후, 공식문서에 나온 BestPractice를 참고하여 코드를 리팩토링 하였습니다.
우선, onErrorContinue의 인자로 전달하던 BiConsumer를 doOnError 에게 Consumer형태로 전달해주도록 하였으며, onErrorResume을 통해, handling된 오류가 DownStream으로 전파되는것을 방지하였습니다.
Mono에서 onErrorContinue를 지원하지 않는다면, 이 메서드는 도대체 왜 존재하는걸까요?
심지어 구현체까지 떡하니 만들어져있고요. 마지막으로 이것도 한번 알아봅시다!
우선, javadoc의 나머지 부분을 세세히 살펴보았는데요.
This operator is offered on Mono mainly as a way to propagate the configuration to upstream Flux.
[코드 5.1] Mono.onErrorContinue() javadoc 일부 발췌(2)
그 결과, 위와같은 문구를 찾을 수 있었습니다.
또한 조금 더 자세히 알아보기위해, 해당 문구를 기반으로 구글링을 해본 결과, 관련한 설명이 있는 해외 문서를 발견했죠.
⚠️ 이 다음부터는 순전히 저자의 추측에 의거한 내용입니다.
즉, 공신력이 있지도 않을 뿐더러 잘못된 내용일수도 있습니다. 이점 참고바랍니다 😀
upstream이 Flux일 경우, 해당 Flux에게 Mono의 configuration을 전파하기 위해서 사용한다. 라고 해석하였는데요. upstream 이 Flux라는건 무슨뜻일까요? configuration은 뭐죠?
Upstream과 Downstream 은 Publisher/Subscriber간의 연결로 이루어진 ReactiveStream의 연산흐름에서, 연산에 대한 상대적인 위치를 뜻합니다. 예시를 볼까요?
Flux.just("안녕", "강녕", "코딩", "벨로그 원고작성")
.map{ "${it}하세요" }
.filter { it.length < 6 }
.flatMap { Mono.just(it) }
.subscribe { println(it) }
[코드 5.2] ReactiveStreams 연산 예시
다음과 같은 ReactiveStream 에 대한 연산을 수행한다고 가정해볼까요?
이처럼, Reactor에서 데이터처리는 이전 Publisher에 대한 구독과 처리된 결과의 발행으로 이루어지는데요.
이때, 이러한 연산의 흐름에서 특정한 연산(map, flatMap등)을 기준으로,
즉, map은 filter, flatmap, subscribe의 Upstream이고, flatmap은 map, filter의 Downstream 인것이죠.
이부분만큼은 인터넷을 아무리 찾아보아도, 알 수 없었습니다.
javadoc에서도 해당 문구이외에 configuration이 사용된 부분들을 찾아보았지만, 유의미한 정보는 얻지 못했죠. 따라서 onErrorContinue 메서드의 body부분을 추적해보기로 하였습니다.
public final Mono<T> onErrorContinue(BiConsumer<Throwable, Object> errorConsumer) {
BiConsumer<Throwable, Object> genericConsumer = errorConsumer;
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resume(genericConsumer)
));
}
[코드 5.3] Mono.onErrorContinue() 메서드
보시면 subscriberContext라는 메서드에 Context라는 인터페이스를 넘기는것을 볼 수 있는데요.
subscriberContext 라는 메서드에 대한 설명을 찾아본 결과 configuration이 무엇인지 유추할 수 있었습니다.
Enrich a potentially empty downstream Context by adding all values from the given Context, producing a new Context that is propagated upstream.
//지정된 컨텍스트의 모든 값을 추가하여 잠재적으로 비어 있는 다운스트림 컨텍스트를 풍부하게 하여 업스트림으로 전파되는 새 컨텍스트를 생성합니다. (Papago)
//potentially empty downstream에 Enrich하다. givenContext로 부터 불러온 값들을 통해서, 새로은Context를 produce하고 그것은 upstream으로 전파될 것 이다. (자체해석)
[코드 5.4] Mono.subscriberContext() javadoc 일부 발췌
현재 연산의 DownStream에 인자로 받은 Context를 통해 값을 추가하고, 이를 Upstream으로 전파한다는 뜻으로 해석하였는데요. Upstream 으로 전파… 뭔가 익숙하지 않나요?
이전에, Mono.onErrorContinue는 Upstream에 존재하는 Flux에게 configuration을 전파하기 위함이라고 하였죠. 그렇다면, Configuration이 Context 라고 보아도 무방하겠네요!
정리해보자면, Mono의 onErrorContinue는 Flux가 모종의 연산으로Mono로 변환되기 전, Flux에서 발생한 오류에 대해 onErrorContinue연산을 실행하기 위함 이라고 생각할 수 있을 것 같습니다.
드디어 궁금증이 풀렸군요! 😁
항상 마무리 멘트가 가장 어렵더라구요
오늘은 Mono에서 onErrorContinue가 제대로 동작하지 않는 이유부터, 동작하지 않는 onErrorContinue가 존재하는 이유까지 여러 부분들을 깊게 파보았습니다.
역시 ReactiveStream은 어렵네요..
Node JS에서의 사가패턴 예시도 있을까요?