Reactor Core - Mono, Flux, JavaWebFlux 공부 일기

오영선·2024년 3월 26일

실습

목록 보기
7/12

<모르는 것은 바로바로 정리하기> 시리즈

Reactive Streams에는 다양한 구현체가 존재합니다.
RxJava, Reactor Core : 순수하게 스트림 연산 처리
Armeria, Spring WebFlux : 웹프로그래밍과 연결된 Reactive Streams

Mono는 Project Reactor라는 리액티브 라이브러리에서 제공하는 클래스로, 리액티브 스트림 프로그래밍에서 단일 결과나 이벤트를 표현하는 데 사용됩니다.

리액티브 스트림은 비동기 및 이벤트 기반 애플리케이션을 만들기 위한 패러다임으로, Java에서는 Reactor, RxJava 등의 라이브러리를 통해 구현됩니다.

  • 리액티브 프로그래밍은 Observerble에서 시작한다.
  • subscribe()는 Observerble를 구독하고, Observerble는 subscribe()함수를 호출해 변화한 데이터를 구독자에게 발행한다.

Mono 클래스는 하나의 결과[0,1]만을 발행하는 Publisher입니다.
Publisher는

  • Subscriber에게 데이터를 전달할 때는 onNext 시그널
  • 데이터를 모두 전달했다고 알릴 때는 onComplete
  • 에러를 전달할 때는 onError 시그널을 트리거합니다.

그런 다음 subscribe() 메서드를 호출하여 Mono 스트림을 구독합니다. 구독을 통해 비동기 작업의 결과를 처리할 수 있습니다. subscribe() 메서드는 리액티브 스트림에서 발행된 이벤트를 처리하는데 사용됩니다. 이 때, Mono가 포함하는 값이나 오류 등을 처리할 수 있습니다.

모노 객체 만들기

Mono.just()

객체의 결과 확인하기 - Subscribe

Mono.just().subscribe()

subscribe는 모노를 구독하고 트리거(실행시켜) 결과를 받아온다.

구독 해제하기 - Disposable

Disposable은 Reactor 프로젝트의 일부로서, 리액티브 스트림에서 구독을 해지하는 데 사용되는 인터페이스입니다.

리액티브 스트림에서 Publisher와 Subscriber 간의 구독은 비동기적으로 이루어지며, 이러한 구독을 해제하려면 Disposable을 사용합니다. 주로 리소스 누수를 방지하고 메모리 관리를 위해 사용됩니다.

스트림을 구독하고 이를 나중에 해지할 수 있는 subscribe() 메서드의 반환 값이 Disposable입니다.

block

router vs controller

Controller 방식은 동기식 요청 처리에 적합하며, Router는 비동기식 요청처리에 적합합니다.
Router Function

  • Handler Function을 호출하는 역할을 합니다.

Handler Function

  • 단일 입력과 단일 출력을 가지며, Spring WebFlux Framework가 HTTP 요청을 처리하기 위해 호출하는 메서드입니다.

Mono

  • Reactor 라이브러리에서 제공하는 Reactive Streams의 Publisher 중 하나로 오직 ‘0개 또는 하나의 데이터항목 생성’하고 이 결과가 생성되고 나면 스트림이 종료되면 결과 생성을 종료합니다.

Flux

  • Reactor 라이브러리에서 제공하는 Reactive Streams의 Publisher 중 하나로 Mono와 달리 ‘여러 개의 데이터 항목’를 생성하고 스트림이 종료되면 결과 생성을 종료합니다.

예외처리

RxJava는 에러가 발생하면 onError 이벤트가 발생해 모든 데이터 흐름이 중단됩니다. 이때 기존의 try-catch문을 사용할 수 없기 때문에 RxJava에서 제공하는 다양한 예외처리 함수를 활용해야 합니다.

  • 함수형 프로그래밍은 함수의 부수효과를 없도록 하는 것이 원칙이지만 doOnXXX() 계열 함수는 부수효과를 일으켜 내가 작성하는 코드가 문제는 없는지 확인하도록 돕습니다.

doOnError() :

  • Observable의 알림 이벤트
    어떤 데이터를 발행하는 중간 에러가 발생할때 해당 이벤트가 발생합니다.

source.doOnNext(...) //어떤 데이터를 발행할때(시작하기 전에)
	.doOnComplete(...) //모든 데이터를 발행하면
    .doOnError(...)  //중간에 에러 발생시
	.subscribe();

onErrorReturn():

  • RxJava에서는 에러 역시 특정한 데이터로 보게 됩니다. 따라서 예외가 발생했을때 개발자는
  1. 에러를 잡아 에러를 의미하는 다른 데이터로 대체할 수 있습니다.
    OnError()는 데이터 흐름을 중단시키기 때문에 중대한 에러가 발생했을때만 사용합니다.

onErrorReturn 은 인자로 받은 기본값을 대신 발행하고 OnComplete이벤트가 발생한다. (함수 끝나는 시점으로 분기)

onErrorResume() :

Mono에서는 onErrorReturn과 유사한 onErrorResume() 메서드가 있습니다.

WebFlux 환경에서 Filter 적용하기

WebFlux를 사용하는 환경에서는 WebFilter 또는 HandlerFilterFunction를 상속받아 필터를 구현할 수 있습니다.

WebFilter

  • Controller 엔드포인트를 포함하여 모든 엔드포인트에 대해 적용가능

filter 메서드는 ServerWebExchange exchange, WebFilterChain chain를 인자로 받아 Mono<Void>를 반환합니다. ( return webFilterChain.filter(serverWebExchange)를 통해 다음 필터를 호출하게 됨 )

HandlerFilterFunction

  • WebFilter와 달리 라우터 기반 함수에서만 사용 가능. Request, Response를 명확하게 명시하기 때문에 개인적으로 사용이 쉬웠습니다.

filter메서드는 ServerRequest request, HandlerFunction<ServerResponse> next를 인자로 받아 ServerResponse를 반환합니다.

filter를 구현하는 방법에는 여러가지가 있지만 아래에는 필터 빈을 등록하는 방법을 사용했습니다.


@Component
public class ___Filter implements HandlerFilterFunction<ServerResponse, ServerResponse> {
    
    public AuthenticationFilter() {
    }

    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
        boolean isTokenValid = //토큰 검사 로직;
        if (isTokenValid) {
            // 토큰이 유효하면 다음 필터로 이동
            return next.handle(request);
        } else {
            // 토큰이 유효하지 않으면 401 반환 후 필터 종료
            return  ServerResponse.status(HttpStatus.UNAUTHORIZED)
                      .contentType(MediaType.APPLICATION_JSON)
                      .body(responseBody.toMono(), ResponseFormat.class);
        }
    }
}

router를 buil하기 전 필터를 추가해줌으로써 필터를 적용할 수 있습니다.

    @Bean
    RouterFunction<ServerResponse> routes()

    {
        var route = RouterFunctions.route();

        route.path("/", builder -> {
            builder.route(POST("").and(accept(APPLICATION_JSON)),
                (call 함수 지정));
            ...
        }).filter(authenticationFilter); // 필터 추가
        return route.build();
    }

참고한 책 및 인터넷 자료

  • RxJava 프로그래밍

0개의 댓글