[Spring WebFlux] Reactor Error Operator 정리

Donghyun Kim·2024년 3월 4일
0

실제 현업에서 Spring WebFlux를 통해 개발 시 서비스 로직에 대한
트랜잭션이 정상적으로 처리되지 않아 골머리를 앓는 도중
계속해서 디버깅해본 결과 트랜잭션의 문제가 아니라
Reactor에서 제공하는 onErrorResume 오퍼레이터를 사용하여 예외 발생 시 별도의 Publisher를 발행하는 코드가 있어 예외가 발생하였음에도 상위 레벨로 예외가 전파되지 않아 애초에 트랜잭션을 통한 롤백이 실행되지 않은 사례가 있었다.

따라서, 이번 기회에 Reactor에서 제공하는 Error 관련 오퍼레이터를
간략하게 정리하고자 한다.


Reactor Error Operator

1. error

stream에 에러를 명시적으로 발생시킨다.
특정 조건에서 에러를 발생시키고자 할 때 유용하다.
(Java에서 throw 키워드를 통해 예외를 의도적으로 던지는 것과 유사)

  • 소스코드
return Mono.error(throwable);

2. doOnError

에러가 발생했을 경우, 특정 로직을 실행 시킬 경우 사용

  • 소스코드
  @Test
  public void doOnError() {
    Mono.just(1)
            .map(num -> {
                throw new RuntimeException("Num is " + num);
            })
            .doOnError(throwable -> System.out.println("err : " + throwable.getMessage()))
            .log()
            .subscribe();
  }
  • 결과
| onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
| request(unbounded)
err : Num is 1
| onError(java.lang.RuntimeException: Num is 1)
java.lang.RuntimeException: Num is 1

3. onErrorResume

에러가 발생했을 경우, 상위로 에러를 전파하지 않고 대신 실행할 새로운 Publisher을 리턴한다.
복잡한 에러 복구 로직을 구현하거나, 다른 백업 stream으로 전환하고 싶을 때 유용하다. (Java try~catch 문에서 에러 catch 후 대체 되는 함수를 실행하는 것과 유사)

*주의 할 점: onErrorResume 사용 시 error 오퍼레이터를 통해 에러를 발생시키지 않으면 상위로 전파되지 않아 에러가 발생해도 호출 메서드에서 인식하지 못한다.

ex) 에러를 반환하지 않을 경우

실제로 서버에서 에러가 발생하지 않는다!

  • 소스코드
  @Test
  void onErrorResume() {
    Mono.just(1)
            .map(num -> {
              throw new RuntimeException("Num is " + num);
            })
            .onErrorResume(throwable -> {
              System.err.println("err : "+ throwable.getMessage());
//              return Mono.error(throwable);
              return Mono.empty();
            })
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
err : Num is 1
onComplete()

ex) 에러를 반환할 경우

에러가 발생함

  • 소스코드
  @Test
  void onErrorResume() {
    Mono.just(1)
            .map(num -> {
              throw new RuntimeException("Num is " + num);
            })
            .onErrorResume(throwable -> {
              System.out.println("err : "+ throwable.getMessage());
              return Mono.error(throwable);
//              return Mono.empty();
            })
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
err : Num is 1
onError(java.lang.RuntimeException: Num is 1)

4. onErrorMap

에러가 발생하였을 경우, 에러를 다른 에러로 대체

  • 소스코드
  void onErrorMap() {
    Mono.just(1)
            .map(num -> {
              throw new RuntimeException("Num is " + num);
            })
            .onErrorMap(throwable -> {
              throw new IllegalStateException("new exception");
            })
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorResume.ResumeSubscriber)
request(unbounded)
onError(java.lang.IllegalStateException: new exception)

onErrorReturn

에러가 발생하였을 경우, 사전 정의된 정적 값(fallbackValue)를 리턴하고 stream을 정상 종료한다.
에러를 로깅하거나 안전한 값으로 대체하여 애플리케이션의 중단 없이 계속 진행하고자 할 때 사용된다.

*주의 할 점: onErrorResume과 마찬가지로 에러가 상위로 전파되지 않으며 에러가 발생한 시점 이후의 데이터는 emit되지 않고 fallbackValue를 리턴하고 종료된다.

  • 소스코드
  @Test
  void onErrorReturn() {
    Flux.range(1, 10)
            .flatMap((v) -> {
              if (v == 5) {
                return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
              }
              return Mono.just(v);
            })
            .onErrorReturn(100000)
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onNext(100000)
onComplete()
// 5에서 에러가 발생된 후 fallbackValue인 100000 리턴 후 종료

5. onErrorComplete

에러 발생 시 논리 표현식 (Predicate) 결과에 따라 complete 이벤트를 보낼 지 error 이벤트를 보낼지 선택할 수 있도록 지원하는 연산자

ex) true 리턴 시

  • 소스코드
  @Test
  void onErrorComplete() {
    Flux.range(1, 10)
            .flatMap((v) -> {
              if (v == 5) {
                return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
              }
              return Mono.just(v);
            })
            .onErrorComplete(throwable -> true)
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onComplete()
// 에러 발생 시 Compelete 이벤트를 발생시킨다.

ex) false 리턴 시

  • 소스코드
  @Test
  void onErrorComplete() {
    Flux.range(1, 10)
            .flatMap((v) -> {
              if (v == 5) {
                return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
              }
              return Mono.just(v);
            })
            .onErrorComplete(throwable -> false)
            .log()
            .subscribe();
  }
  • 결과
onSubscribe(FluxOnErrorReturn.ReturnSubscriber)
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onError(java.lang.RuntimeException: v가 5이면 에러를 발생시킵니다.)
// 에러 발생 시 Error 이벤트를 발생시킨다.

6. onErrorContinue

에러가 발생하였을 경우 stream이 종료되지 않고 에러 이후의 데이터를
계속해서 emit 해주는 오퍼레이터

  • 소스코드
  void onErrorComplete() {
    Flux.range(1, 10)
            .flatMap((v) -> {
              if (v == 5) {
                return Flux.error(new RuntimeException("v가 5이면 에러를 발생시킵니다."));
              }
              return Mono.just(v);
            })
            .onErrorContinue((error, v) -> {
              System.out.println("error : " + error);
              System.out.println("에러 발생 값 :" + v);
            })
            .log()
            .subscribe();
  }
  • 결과
onSubscribe([Fuseable] 
request(unbounded)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
error : java.lang.RuntimeException: v가 5이면 에러를 발생시킵니다.
에러 발생 값 :5
onNext(6)
onNext(7)
onNext(8)
onNext(9)
onNext(10)
onComplete()

7. onErrorStop

에러가 발생했을 때 stream을 즉시 중단하고 나머지 이벤트가 무시되어 에러를
발생시킨다. 특정 조건에서 에러가 발생하면 down stream에서 에러 처리 로직이 있어도 더 이상의 처리를 하지 않고 즉시 종료하고자 할 때 유용하다.
(굳이 사용하지 않아도 동일하게 결과가 작동할 경우가 있지만, 명시적으로 작성함으로써의 의미도 있다.)

  • 소스코드
  @Test
  void onErrorStop() {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4)
            .map(i -> {
              if (i == 3) {
                throw new RuntimeException("Error at i=3");
              }
              return i;
            })
            .onErrorStop()
            .log();

    flux.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed")
    );
  }
  • 결과
onSubscribe([Fuseable] 
request(unbounded)
onNext(1)
Received: 1
onNext(2)
Received: 2
onError(java.lang.RuntimeException: Error at i=3)

정리

Reactor에서 제공하는 Error 오퍼레이터도 이처럼 다양한 에러 핸들링을 제어하기 위한 메서드들이 제공되며 실제 개발 시에도 무턱대고 특정 Error 오퍼레이터 메서드만 사용하는 것이 아니라 서비스 로직에 대한 명확한 이해를 바탕으로 보다 면밀한 예외 처리 로직이 구현되어야 함을 다시 한번 느끼게 되었다.

참고자료

profile
"Hello World"

0개의 댓글