Spring Webflux 리스폰스바디 캐싱하기

이종완·2023년 8월 20일
0

개발이야기

목록 보기
12/12

개요

웹플럭스를 이용하여 개발을 할 때, 로깅 등의 이유로 리퀘스트 바디와 리스폰스 바디는 재활용되어야 할 때가 있다.
리퀘스트바디 캐싱하기와 마찬가지로, 다용도를 위해 리스폰스 바디를 재활용하는 설정을 알아본다.

방법

필터 구현

사실 리스폰스바디는 리퀘스트 바디와는 다르게 핸들러 혹은 서비스에서 데이터를 라우트로 최종 반환하기 전, 얼마든지 자유롭게 재활용하는 것이 쉽다. 그러나 어떤 핸들러나 서비스라도, 리스폰스 바디에 대해서 공통으로 적용되어야 하는 기능들에 대해서는 코드의 재사용이 필요 할 때가 있다. 그런 공통화 작업을 위해 리스폰스 바디를 캐싱해주는 필터를 웹플럭스의 WebFilter 인터페이스를 통해 구현한다.

WebFilter

기존 MVC의 컨트롤러를 포함한 웹플럭스의 라우터 함수로 나타낸 모든 엔드포인트에 대해 적용 가능한 필터를 구현하는 인터페이스.
해당 인터페이스 구현 시 반드시 오버라이드 해야하는 filter() 메소드에 exchange 혹은 exchange객체의 mutate를 인자로 설정하여 실행하는 것으로 필터 체인이 완성된다.

예시: 메롱 필터

// 리퀘스트가 들어올때마다 "메롱"을 출력하고 헤더에 "merong=메롱" 을 추가하는 필터
@Component
public class MerongFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return Mono
        .just(
            exchange
            .mutate()
            .request(request -> request.header("merong", "메롱"))
            .build())
        .doOnNext($ -> System.out.println("메롱"))
        .flatMap(chain::filter);
    }
}

ResponseBodyCacher by WebFilter

리스폰스 바디의 경우, 결과적으로 프로젝트 리액터에서 다루는 HTTP 리스폰스 객체인 ServerHttpResponse 객체가 writeWith 메소드를 호출하기 전에 리스폰스 데이터를 복사 혹은 가로채서 이용 후 리스폰스 데이터를 writeWith로 다시 넘겨주면 된다.

나는 히스토리 로깅이 주된 목적이었기 때문에, ServerWebExchange 객체의 어트리뷰트로 임시 저장해주었다. (스프링클라우드에서 리퀘스트바디 캐싱할때 사용하는 방식으로 알고 있다.)
특히, 리스폰스 바디의 경우 먼저 클라이언트로 응답을 보내준 후에 후처리를 해주기 위해 필터체인 호출 후에 캐시처리된 리스폰스 바디를 사용할 수 있도록 구성해주었다.

특히 Mono<ServerResponse> 에 대해서만 처리할수 있게 데코레이터를 만들어주었는데 필요하다면 Flux에 대해서도 같은 방식으로 구현하면 될 듯 하다.

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Component
public class ResponseBodyCacher implements WebFilter {

    private static final String RESPONSE = "response";
    private static final byte[] EMPTY_BYTES = {};

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return chain
        .filter(
            exchange
            .mutate()
            .response(new ResponseBodyDecorator(exchange)) // 리스폰스 데이터 캐싱 데코레이터
            .build())
        .doFinally(
            $-> Mono.fromRunnable(
                ()-> {

                    // 캐싱된 response 데이터 꺼내오기
                    final byte[] bytes = (byte[]) exchange.getAttributes().remove(RESPONSE);

                    // ... 캐시 데이터 사용 ..
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe());
    }

    public static class ResponseBodyDecorator extends ServerHttpResponseDecorator {

        private final ServerWebExchange exchange;

        public ResponseBodyDecorator(ServerWebExchange exchange) {

            super(exchange.getResponse());
            this.exchange = exchange;
        }

        // writeWith 메소드로 리스폰스 응답을 하기 전 캐싱을 위한 커스터마이즈
        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

            // Mono에 대해서만 구현; Flux의 경우 추가 구현 필요
            if (body instanceof Mono) {

                final Mono<? extends DataBuffer> mono = (Mono<? extends DataBuffer>) body;

                return super
                .writeWith(mono.map(dataBuffer -> {

                    // 리스폰스 바이트 추출 후 어트리뷰트로 보관
                    final byte[] bytes = new byte[dataBuffer.readableByteCount()];

                    DataBufferUtils.release(dataBuffer.read(bytes));

                    exchange.getAttributes().put(RESPONSE, bytes);

                    // 캐싱 후 클라이언트로 응답 데이터 반환 실시
                    return exchange.getResponse().bufferFactory().wrap(bytes);
                }))
                .onErrorResume(e -> {

                    e.printStackTrace();
                    return Mono.empty();
                });
            }

            return super.writeWith(body);
        }
    }
}

스프링 디폴트 에러 리스폰스에 대한 캐싱 추가

위의 방식으로 리스폰스 데이터 캐싱을 진행할 경우, 스프링이 예외에 대해서 디폴트로 에러 리스폰스는 캐싱하지 못한다.
이를 개선하기 위해서, 디폴트 에러 반환하는 AbstractErrorWebExceptionHandler 를 추가로 커스터마이징 해준다.
위의 ResponseBodyCacher 개발과 마찬가지로, ServerWebExchange 의 어트리뷰트로 임시저장 해준다.

import java.util.Map;

import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.error.ErrorAttributeOptions.Include;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;

@Component @Order(-2)
public class CustomWebExceptionHandler extends AbstractErrorWebExceptionHandler {

    private static final String RESPONSE = "response";

    // ResourcePropeties가 스프링부트 2.6.x부터 제거되었기에
    // 생성자에서 ResourceProperties 추가 설정해줌 (deprecated since 2.4)
    public CustomWebExceptionHandler(
        ErrorAttributes errorAttributes, WebProperties webProperties,
        ApplicationContext applicationContext, ServerCodecConfigurer configurer) {

        super(errorAttributes, webProperties.getResources(), applicationContext);
        this.setMessageWriters(configurer.getWriters());
    }

    // 디폴트 에러 리스폰스 핸들링 라우팅 함수 오버라이드
    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {

        return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
    }

    // 커스터마이징 관련 구현 메소드
    private Mono<ServerResponse> renderErrorResponse(final ServerRequest request) {

        // 리스폰스하게 될 에러 어트리뷰트 데이터를 로드
        final Map<String, Object> errorAttributes =
        getErrorAttributes(request, ErrorAttributeOptions.of(Include.MESSAGE));

        // ... 에러 어트리뷰트 가공 작업

        // 에러 어트리뷰트(에러 리스폰스)의 바이트 데이터를 exchange 어트리뷰트에 보관
        final byte[] bytes = errorAttributes.toString().getBytes();

        request.attributes().put(RESPONSE, bytes);

        return ServerResponse.ok().bodyValue(errorAttributes);
    }
}

Test

실제 리스폰스 데이터 캐싱 후 활용할 수 있는지 테스트 해본다.

TestFilter

리스폰스 바디 캐싱 수행 및 서버 응답 약 3초 후에 리스폰스 데이터를 서버 로그에 출력하는 테스트 필터

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Slf4j
@Component
@RequiredArgsConstructor
public class TestFilter implements WebFilter {

    private static final String RESPONSE = "response";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return chain
        .filter(
                exchange
                .mutate()
                .response(new ResponseBodyDecorator(exchange))
                .build())
        .doFinally(
            $-> Mono.fromRunnable(
                ()-> {

                    log.info("응답 완료");

                    // 3초 슬립 (리스폰스 시간에는 영향을 주지 않는다)
                    try { Thread.sleep(3000); }
                    catch (InterruptedException e) {}

                    final byte[] bytes = (byte[]) exchange.getAttributes().remove(RESPONSE);

                    log.info("서버 리스폰스 후에 캐시에서 데이터 꺼내오기: {}", new String(bytes));
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe());
    }

    public static class ResponseBodyDecorator extends ServerHttpResponseDecorator {

        private final ServerWebExchange exchange;

        public ResponseBodyDecorator(ServerWebExchange exchange) {

            super(exchange.getResponse());
            this.exchange = exchange;
        }

        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

            if (body instanceof Mono) {

                final Mono<? extends DataBuffer> mono = (Mono<? extends DataBuffer>) body;

                return super
                .writeWith(mono.map(dataBuffer -> {

                    final byte[] bytes = new byte[dataBuffer.readableByteCount()];

                    DataBufferUtils.release(dataBuffer.read(bytes));

                    exchange.getAttributes().put(RESPONSE, bytes);

                    return exchange.getResponse().bufferFactory().wrap(bytes);
                }))
                .onErrorResume(e -> {

                    e.printStackTrace();
                    return Mono.empty();
                });
            }

            return super.writeWith(body);
        }
    }
}

TestHandler & TestRouterConfig

테스트 응답 데이터입니다를 응답하는 핸들러와 라우터
TestHandler

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
@Component
public class TestHandler {

    public Mono<ServerResponse> handle(ServerRequest serverRequest) {

        log.info("응답 시작");
        return ServerResponse.ok().bodyValue("테스트 응답 데이터입니다");
    }
}

TestRouterConfig

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.shb.nittetsu.test.handler.TestHandler;

import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;

@Configuration
public class TestRouterConfig {

    @Bean
    public RouterFunction<ServerResponse> test(TestHandler testHandler) {

        return route(path("/test"), testHandler::handle);
    }
}

실행결과
테스트결과 6ms로 빠르게 서버응답이 왔으며 약 3초후에 리스폰스 캐시를 이용한 로직이 동작한 것을 확인할 수가 있다

소스

바로가기

profile
안녕하세요...

0개의 댓글