웹플럭스를 이용하여 개발을 할 때, 로깅 등의 이유로 리퀘스트 바디와 리스폰스 바디는 재활용되어야 할 때가 있다.
리퀘스트바디 캐싱하기와 마찬가지로, 다용도를 위해 리스폰스 바디를 재활용하는 설정을 알아본다.
사실 리스폰스바디는 리퀘스트 바디와는 다르게 핸들러 혹은 서비스에서 데이터를 라우트로 최종 반환하기 전, 얼마든지 자유롭게 재활용하는 것이 쉽다. 그러나 어떤 핸들러나 서비스라도, 리스폰스 바디에 대해서 공통으로 적용되어야 하는 기능들에 대해서는 코드의 재사용이 필요 할 때가 있다. 그런 공통화 작업을 위해 리스폰스 바디를 캐싱해주는 필터를 웹플럭스의 WebFilter 인터페이스를 통해 구현한다.
기존 MVC의 컨트롤러를 포함한 웹플럭스의 라우터 함수로 나타낸 모든 엔드포인트에 대해 적용 가능한 필터를 구현하는 인터페이스.
해당 인터페이스 구현 시 반드시 오버라이드 해야하는 filter()
메소드에 exchange 혹은 exchange객체의 mutate를 인자로 설정하여 실행하는 것으로 필터 체인이 완성된다.
예시: 메롱 필터
// 리퀘스트가 들어올때마다 "메롱"을 출력하고 헤더에 "merong=메롱" 을 추가하는 필터
@Component
public class MerongFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return Mono
.just(
exchange
.mutate()
.request(request -> request.header("merong", "메롱"))
.build())
.doOnNext($ -> System.out.println("메롱"))
.flatMap(chain::filter);
}
}
리스폰스 바디의 경우, 결과적으로 프로젝트 리액터에서 다루는 HTTP 리스폰스 객체인 ServerHttpResponse 객체가 writeWith 메소드를 호출하기 전에 리스폰스 데이터를 복사 혹은 가로채서 이용 후 리스폰스 데이터를 writeWith로 다시 넘겨주면 된다.
나는 히스토리 로깅이 주된 목적이었기 때문에, ServerWebExchange 객체의 어트리뷰트로 임시 저장해주었다. (스프링클라우드에서 리퀘스트바디 캐싱할때 사용하는 방식으로 알고 있다.)
특히, 리스폰스 바디의 경우 먼저 클라이언트로 응답을 보내준 후에 후처리를 해주기 위해 필터체인 호출 후에 캐시처리된 리스폰스 바디를 사용할 수 있도록 구성해주었다.
특히 Mono<ServerResponse>
에 대해서만 처리할수 있게 데코레이터를 만들어주었는데 필요하다면 Flux에 대해서도 같은 방식으로 구현하면 될 듯 하다.
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Component
public class ResponseBodyCacher implements WebFilter {
private static final String RESPONSE = "response";
private static final byte[] EMPTY_BYTES = {};
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain
.filter(
exchange
.mutate()
.response(new ResponseBodyDecorator(exchange)) // 리스폰스 데이터 캐싱 데코레이터
.build())
.doFinally(
$-> Mono.fromRunnable(
()-> {
// 캐싱된 response 데이터 꺼내오기
final byte[] bytes = (byte[]) exchange.getAttributes().remove(RESPONSE);
// ... 캐시 데이터 사용 ..
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe());
}
public static class ResponseBodyDecorator extends ServerHttpResponseDecorator {
private final ServerWebExchange exchange;
public ResponseBodyDecorator(ServerWebExchange exchange) {
super(exchange.getResponse());
this.exchange = exchange;
}
// writeWith 메소드로 리스폰스 응답을 하기 전 캐싱을 위한 커스터마이즈
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
// Mono에 대해서만 구현; Flux의 경우 추가 구현 필요
if (body instanceof Mono) {
final Mono<? extends DataBuffer> mono = (Mono<? extends DataBuffer>) body;
return super
.writeWith(mono.map(dataBuffer -> {
// 리스폰스 바이트 추출 후 어트리뷰트로 보관
final byte[] bytes = new byte[dataBuffer.readableByteCount()];
DataBufferUtils.release(dataBuffer.read(bytes));
exchange.getAttributes().put(RESPONSE, bytes);
// 캐싱 후 클라이언트로 응답 데이터 반환 실시
return exchange.getResponse().bufferFactory().wrap(bytes);
}))
.onErrorResume(e -> {
e.printStackTrace();
return Mono.empty();
});
}
return super.writeWith(body);
}
}
}
위의 방식으로 리스폰스 데이터 캐싱을 진행할 경우, 스프링이 예외에 대해서 디폴트로 에러 리스폰스는 캐싱하지 못한다.
이를 개선하기 위해서, 디폴트 에러 반환하는 AbstractErrorWebExceptionHandler 를 추가로 커스터마이징 해준다.
위의 ResponseBodyCacher 개발과 마찬가지로, ServerWebExchange 의 어트리뷰트로 임시저장 해준다.
import java.util.Map;
import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.error.ErrorAttributeOptions.Include;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component @Order(-2)
public class CustomWebExceptionHandler extends AbstractErrorWebExceptionHandler {
private static final String RESPONSE = "response";
// ResourcePropeties가 스프링부트 2.6.x부터 제거되었기에
// 생성자에서 ResourceProperties 추가 설정해줌 (deprecated since 2.4)
public CustomWebExceptionHandler(
ErrorAttributes errorAttributes, WebProperties webProperties,
ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
super(errorAttributes, webProperties.getResources(), applicationContext);
this.setMessageWriters(configurer.getWriters());
}
// 디폴트 에러 리스폰스 핸들링 라우팅 함수 오버라이드
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
// 커스터마이징 관련 구현 메소드
private Mono<ServerResponse> renderErrorResponse(final ServerRequest request) {
// 리스폰스하게 될 에러 어트리뷰트 데이터를 로드
final Map<String, Object> errorAttributes =
getErrorAttributes(request, ErrorAttributeOptions.of(Include.MESSAGE));
// ... 에러 어트리뷰트 가공 작업
// 에러 어트리뷰트(에러 리스폰스)의 바이트 데이터를 exchange 어트리뷰트에 보관
final byte[] bytes = errorAttributes.toString().getBytes();
request.attributes().put(RESPONSE, bytes);
return ServerResponse.ok().bodyValue(errorAttributes);
}
}
실제 리스폰스 데이터 캐싱 후 활용할 수 있는지 테스트 해본다.
리스폰스 바디 캐싱 수행 및 서버 응답 약 3초 후에 리스폰스 데이터를 서버 로그에 출력하는 테스트 필터
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Slf4j
@Component
@RequiredArgsConstructor
public class TestFilter implements WebFilter {
private static final String RESPONSE = "response";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain
.filter(
exchange
.mutate()
.response(new ResponseBodyDecorator(exchange))
.build())
.doFinally(
$-> Mono.fromRunnable(
()-> {
log.info("응답 완료");
// 3초 슬립 (리스폰스 시간에는 영향을 주지 않는다)
try { Thread.sleep(3000); }
catch (InterruptedException e) {}
final byte[] bytes = (byte[]) exchange.getAttributes().remove(RESPONSE);
log.info("서버 리스폰스 후에 캐시에서 데이터 꺼내오기: {}", new String(bytes));
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe());
}
public static class ResponseBodyDecorator extends ServerHttpResponseDecorator {
private final ServerWebExchange exchange;
public ResponseBodyDecorator(ServerWebExchange exchange) {
super(exchange.getResponse());
this.exchange = exchange;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Mono) {
final Mono<? extends DataBuffer> mono = (Mono<? extends DataBuffer>) body;
return super
.writeWith(mono.map(dataBuffer -> {
final byte[] bytes = new byte[dataBuffer.readableByteCount()];
DataBufferUtils.release(dataBuffer.read(bytes));
exchange.getAttributes().put(RESPONSE, bytes);
return exchange.getResponse().bufferFactory().wrap(bytes);
}))
.onErrorResume(e -> {
e.printStackTrace();
return Mono.empty();
});
}
return super.writeWith(body);
}
}
}
테스트 응답 데이터입니다
를 응답하는 핸들러와 라우터
TestHandler
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class TestHandler {
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
log.info("응답 시작");
return ServerResponse.ok().bodyValue("테스트 응답 데이터입니다");
}
}
TestRouterConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.shb.nittetsu.test.handler.TestHandler;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
@Configuration
public class TestRouterConfig {
@Bean
public RouterFunction<ServerResponse> test(TestHandler testHandler) {
return route(path("/test"), testHandler::handle);
}
}
실행결과
테스트결과 6ms로 빠르게 서버응답이 왔으며 약 3초후에 리스폰스 캐시를 이용한 로직이 동작한 것을 확인할 수가 있다