실제 현업에서 Spring WebFlux를 통해 개발 시 서비스 로직에 대한
트랜잭션이 정상적으로 처리되지 않아 골머리를 앓는 도중
계속해서 디버깅해본 결과 트랜잭션의 문제가 아니라
Reactor에서 제공하는 onErrorResume 오퍼레이터를 사용하여 예외 발생 시 별도의 Publisher를 발행하는 코드가 있어 예외가 발생하였음에도 상위 레벨로 예외가 전파되지 않아 애초에 트랜잭션을 통한 롤백이 실행되지 않은 사례가 있었다.
따라서, 이번 기회에 Reactor에서 제공하는 Error 관련 오퍼레이터를
간략하게 정리하고자 한다.
stream에 에러를 명시적으로 발생시킨다.
특정 조건에서 에러를 발생시키고자 할 때 유용하다.
(Java에서 throw 키워드를 통해 예외를 의도적으로 던지는 것과 유사)
return Mono.error(throwable);
에러가 발생했을 경우, 특정 로직을 실행 시킬 경우 사용
@Test
public void doOnError() {
Mono.just(1)
.map(num -> {
throw new RuntimeException("Num is " + num);
})
.doOnError(throwable -> System.out.println("err : " + throwable.getMessage()))
.log()
.subscribe();
}
| onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
| request(unbounded)
err : Num is 1
| onError(java.lang.RuntimeException: Num is 1)
java.lang.RuntimeException: Num is 1
에러가 발생했을 경우, 상위로 에러를 전파하지 않고 대신 실행할 새로운 Publisher을 리턴한다.
복잡한 에러 복구 로직을 구현하거나, 다른 백업 stream으로 전환하고 싶을 때 유용하다. (Java try~catch 문에서 에러 catch 후 대체 되는 함수를 실행하는 것과 유사)
*주의 할 점: onErrorResume 사용 시 error 오퍼레이터를 통해 에러를 발생시키지 않으면 상위로 전파되지 않아 에러가 발생해도 호출 메서드에서 인식하지 못한다.
실제로 서버에서 에러가 발생하지 않는다!
@Test
void onErrorResume() {
Mono.just(1)
.map(num -> {
throw new RuntimeException("Num is " + num);
})
.onErrorResume(throwable -> {
System.err.println("err : "+ throwable.getMessage());
// return Mono.error(throwable);
return Mono.empty();
})
.log()
.subscribe();
}
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
err : Num is 1
onComplete()
에러가 발생함
@Test
void onErrorResume() {
Mono.just(1)
.map(num -> {
throw new RuntimeException("Num is " + num);
})
.onErrorResume(throwable -> {
System.out.println("err : "+ throwable.getMessage());
return Mono.error(throwable);
// return Mono.empty();
})
.log()
.subscribe();
}
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
err : Num is 1
onError(java.lang.RuntimeException: Num is 1)
에러가 발생하였을 경우, 에러를 다른 에러로 대체
void onErrorMap() {
Mono.just(1)
.map(num -> {
throw new RuntimeException("Num is " + num);
})
.onErrorMap(throwable -> {
throw new IllegalStateException("new exception");
})
.log()
.subscribe();
}
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
onError(java.lang.IllegalStateException: new exception)
에러가 발생하였을 경우, 사전 정의된 정적 값(fallbackValue)를 리턴하고 stream을 정상 종료한다.
에러를 로깅하거나 안전한 값으로 대체하여 애플리케이션의 중단 없이 계속 진행하고자 할 때 사용된다.
*주의 할 점: onErrorResume과 마찬가지로 에러가 상위로 전파되지 않으며 에러가 발생한 시점 이후의 데이터는 emit되지 않고 fallbackValue를 리턴하고 종료된다.
@Test
void onErrorReturn() {
Flux.range(1, 10)
.flatMap((v) -> {
if (v == 5) {
return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
}
return Mono.just(v);
})
.onErrorReturn(100000)
.log()
.subscribe();
}
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onNext(100000)
onComplete()
// 5에서 에러가 발생된 후 fallbackValue인 100000 리턴 후 종료
에러 발생 시 논리 표현식 (Predicate) 결과에 따라 complete 이벤트를 보낼 지 error 이벤트를 보낼지 선택할 수 있도록 지원하는 연산자
@Test
void onErrorComplete() {
Flux.range(1, 10)
.flatMap((v) -> {
if (v == 5) {
return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
}
return Mono.just(v);
})
.onErrorComplete(throwable -> true)
.log()
.subscribe();
}
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onComplete()
// 에러 발생 시 Compelete 이벤트를 발생시킨다.
@Test
void onErrorComplete() {
Flux.range(1, 10)
.flatMap((v) -> {
if (v == 5) {
return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
}
return Mono.just(v);
})
.onErrorComplete(throwable -> false)
.log()
.subscribe();
}
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onError(java.lang.RuntimeException: v가 5이면 에러를 발생시킵니다.)
// 에러 발생 시 Error 이벤트를 발생시킨다.
에러가 발생하였을 경우 stream이 종료되지 않고 에러 이후의 데이터를
계속해서 emit 해주는 오퍼레이터
void onErrorComplete() {
Flux.range(1, 10)
.flatMap((v) -> {
if (v == 5) {
return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
}
return Mono.just(v);
})
.onErrorContinue((error, v) -> {
System.out.println("error : " + error);
System.out.println("에러 발생 값 :" + v);
})
.log()
.subscribe();
}
onSubscribe([Fuseable]
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
error : java.lang.RuntimeException: v가 5이면 에러를 발생시킵니다.
에러 발생 값 :5
onNext(6)
onNext(7)
onNext(8)
onNext(9)
onNext(10)
onComplete()
에러가 발생했을 때 stream을 즉시 중단하고 나머지 이벤트가 무시되어 에러를
발생시킨다. 특정 조건에서 에러가 발생하면 down stream에서 에러 처리 로직이 있어도 더 이상의 처리를 하지 않고 즉시 종료하고자 할 때 유용하다.
(굳이 사용하지 않아도 동일하게 결과가 작동할 경우가 있지만, 명시적으로 작성함으로써의 의미도 있다.)
@Test
void onErrorStop() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Error at i=3");
}
return i;
})
.onErrorStop()
.log();
flux.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
}
onSubscribe([Fuseable]
request(unbounded)
onNext(1)
Received: 1
onNext(2)
Received: 2
onError(java.lang.RuntimeException: Error at i=3)
Reactor에서 제공하는 Error 오퍼레이터도 이처럼 다양한 에러 핸들링을 제어하기 위한 메서드들이 제공되며 실제 개발 시에도 무턱대고 특정 Error 오퍼레이터 메서드만 사용하는 것이 아니라 서비스 로직에 대한 명확한 이해를 바탕으로 보다 면밀한 예외 처리 로직이 구현되어야 함을 다시 한번 느끼게 되었다.