<모르는 것은 바로바로 정리하기> 시리즈
Reactive Streams에는 다양한 구현체가 존재합니다.
RxJava, Reactor Core : 순수하게 스트림 연산 처리
Armeria, Spring WebFlux : 웹프로그래밍과 연결된 Reactive Streams
Mono는 Project Reactor라는 리액티브 라이브러리에서 제공하는 클래스로, 리액티브 스트림 프로그래밍에서 단일 결과나 이벤트를 표현하는 데 사용됩니다.
리액티브 스트림은 비동기 및 이벤트 기반 애플리케이션을 만들기 위한 패러다임으로, Java에서는 Reactor, RxJava 등의 라이브러리를 통해 구현됩니다.
- 리액티브 프로그래밍은 Observerble에서 시작한다.
- subscribe()는 Observerble를 구독하고, Observerble는 subscribe()함수를 호출해 변화한 데이터를 구독자에게 발행한다.
Mono 클래스는 하나의 결과[0,1]만을 발행하는 Publisher입니다.
Publisher는
그런 다음 subscribe() 메서드를 호출하여 Mono 스트림을 구독합니다. 구독을 통해 비동기 작업의 결과를 처리할 수 있습니다. subscribe() 메서드는 리액티브 스트림에서 발행된 이벤트를 처리하는데 사용됩니다. 이 때, Mono가 포함하는 값이나 오류 등을 처리할 수 있습니다.
Mono.just()
Mono.just().subscribe()
subscribe는 모노를 구독하고 트리거(실행시켜) 결과를 받아온다.
Disposable은 Reactor 프로젝트의 일부로서, 리액티브 스트림에서 구독을 해지하는 데 사용되는 인터페이스입니다.
리액티브 스트림에서 Publisher와 Subscriber 간의 구독은 비동기적으로 이루어지며, 이러한 구독을 해제하려면 Disposable을 사용합니다. 주로 리소스 누수를 방지하고 메모리 관리를 위해 사용됩니다.
스트림을 구독하고 이를 나중에 해지할 수 있는 subscribe() 메서드의 반환 값이 Disposable입니다.
Controller 방식은 동기식 요청 처리에 적합하며, Router는 비동기식 요청처리에 적합합니다.
Router Function
Handler Function
Mono
Flux
RxJava는 에러가 발생하면 onError 이벤트가 발생해 모든 데이터 흐름이 중단됩니다. 이때 기존의 try-catch문을 사용할 수 없기 때문에 RxJava에서 제공하는 다양한 예외처리 함수를 활용해야 합니다.
source.doOnNext(...) //어떤 데이터를 발행할때(시작하기 전에)
.doOnComplete(...) //모든 데이터를 발행하면
.doOnError(...) //중간에 에러 발생시
.subscribe();
OnError()는 데이터 흐름을 중단시키기 때문에 중대한 에러가 발생했을때만 사용합니다.
onErrorReturn 은 인자로 받은 기본값을 대신 발행하고 OnComplete이벤트가 발생한다. (함수 끝나는 시점으로 분기)
Mono에서는 onErrorReturn과 유사한 onErrorResume() 메서드가 있습니다.
WebFlux를 사용하는 환경에서는 WebFilter 또는 HandlerFilterFunction를 상속받아 필터를 구현할 수 있습니다.
filter 메서드는 ServerWebExchange exchange, WebFilterChain chain를 인자로 받아 Mono<Void>를 반환합니다. ( return webFilterChain.filter(serverWebExchange)를 통해 다음 필터를 호출하게 됨 )
filter메서드는 ServerRequest request, HandlerFunction<ServerResponse> next를 인자로 받아 ServerResponse를 반환합니다.
filter를 구현하는 방법에는 여러가지가 있지만 아래에는 필터 빈을 등록하는 방법을 사용했습니다.
@Component
public class ___Filter implements HandlerFilterFunction<ServerResponse, ServerResponse> {
public AuthenticationFilter() {
}
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
boolean isTokenValid = //토큰 검사 로직;
if (isTokenValid) {
// 토큰이 유효하면 다음 필터로 이동
return next.handle(request);
} else {
// 토큰이 유효하지 않으면 401 반환 후 필터 종료
return ServerResponse.status(HttpStatus.UNAUTHORIZED)
.contentType(MediaType.APPLICATION_JSON)
.body(responseBody.toMono(), ResponseFormat.class);
}
}
}
router를 buil하기 전 필터를 추가해줌으로써 필터를 적용할 수 있습니다.
@Bean
RouterFunction<ServerResponse> routes()
{
var route = RouterFunctions.route();
route.path("/", builder -> {
builder.route(POST("").and(accept(APPLICATION_JSON)),
(call 함수 지정));
...
}).filter(authenticationFilter); // 필터 추가
return route.build();
}