Spring Webflux 리퀘스트바디 캐싱하기

이종완·2023년 6월 15일
1

개발이야기

목록 보기
11/12

개요

리액터 스트림 기반의 웹플럭스는 한번 방출된 리퀘스트 데이터를 재사용하지 못한다.
당연하게도, 태생자체가 논블로킹 비동기이며, 물이 흘러가듯 이미 흘러지나간 물을 다시 가져올 수 없기 때문이다.
그러나, 리퀘스트 로깅 등의 이유로 리퀘스트 바디를 여러번 사용해야하는 경우가 존재하기 때문에 해당 상황에 맞게 웹플럭스에서 리퀘스트 바디를 다회 이용할 수 있도록 설정하는 법을 알아본다.

방법

필터 구현

리퀘스트가 라우터를 거쳐 실제 리퀘스트 바디를 소모하는 핸들러까지 도달하기 전,
사전에 리퀘스트 바디에 대한 캐싱을 수행하는 필터를 걸어준다.

WebFilter vs HandlerFilterFunction

웹플럭스에서 필터를 구현하는 방법은 2가지가 있다.
WebFilter 또는 HandlerFilterFunction 인터페이스를 구현하는 것

WebFilter

기존 MVC와 동일한 Controller 엔드포인트를 포함하여 모든 엔드포인트에 대해 적용가능한 필터를 구현할때 사용하는 인터페이스.
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain)
메소드를 반드시 오버라이드해야 한다.

해당 메소드 구현 시 인자로 주어진 WebFilterChain의 filter() 메소드에 exchange 혹은 exchange의 뮤테이트를 인자로 넣어주고 실행하는 것으로 필터 체인이 완성된다.
이때, WebFilterChain.filter(T extends ServerWebExchange)는 Void에대한 퍼블리셔 Mono<Void>를 리턴하므로 자연스럽게 오버라이드 해야하는 WebFilter.filter()메소드의 마지막에 위치할 수 있다.

예시: 뿡 필터


// 리퀘스트가 들어올때마다 3초 뒤에 "뿡"을 출력하는 필터
@Component
public class BbungFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return Mono
        .just(exchange)
        .doOnNext(
            $ -> Mono.fromRunnable(
                () -> {
  
                    try { Thread.sleep(3000); System.out.println("뿡"); }
                    catch (Exception e) {}
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe())
        .flatMap(chain::filter);
    }
}

HandlerFilterFunction

WebFilter와 달리 라우터 함수에서만 사용이 가능하며, 라우터에 대해 필터로써 걸어주는 방식으로 동작한다.

예시: 랜덤 안녕 필터


// 리스폰스 응답 시, 50% 확률로 "안녕"을 응답하는 필터
public class HelloFilter implements HandlerFilterFunction<ServerResponse, ServerResponse> {

    private static final Random RANDOM = new Random();

    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {

        return Mono
        .just(RANDOM.nextBoolean())
        .filter(Boolean::booleanValue)
        .flatMap($ -> Server.ok().bodyValue("안녕"))
        .switchIfEmpty(Mono.defer(()->next.handle(request)));
    }
}

@Bean
public RouterFunction<ServerResponse> handle(SomeHandler handler) {

    return route(GET(properties.getApiPath()), handler::handle)
    .filter(new HelloFilter()); // 필터 장착
}

RequestBodyCacher by WebFilter

리퀘스트 바디의 경우, handler에서 리퀘스트 바디를 소모하기 전에 미리 캐싱 될 필요가 있다.
그러므로, 모든 엔드포인트에 대해 전처리를 해주는 WebFilter를 사용하여 캐싱을 구현해준다.

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class RequestBodyCacher implements WebFilter {

    private static final byte[] EMPTY_BYTES = new byte[0];

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return DataBufferUtils
        .join(exchange.getRequest().getBody())
        .map(databuffer -> {

            final byte[] bytes = new byte[databuffer.readableByteCount()];

            DataBufferUtils.release(databuffer.read(bytes));

            return bytes; // 한번 읽기가 가능한 리퀘스트 바디 데이터를 미리 가져옴
        })
        .defaultIfEmpty(EMPTY_BYTES)
        .flatMap(bytes -> {

            // 읽어온 데이터를 커스텀 리퀘스트 데코레이더를 통해 캐싱
            final RequestBodyDecorator decorator = new RequestBodyDecorator(exchange, bytes);

            // 캐싱된 리퀘스트 데이터를 가지고 있는 리퀘스트 데코레이터를 뮤테이션으로 등록
            return chain.filter(exchange.mutate().request(decorator).build());
        });
    }
}

// ServerWebExchange mutation을 위한 커스텀 리퀘스트 데코레이터;
// -> 실질적인 리퀘스트 바디 데이터를 바이트 형태로 필드로써 가지고 있으며,
// 리퀘스트 바디 필요시 호출하는 getBody 메소드를 오버라이드하여
// 캐싱된 바이트로부터 데이터를 매번 읽어오는 Flux 퍼블리셔를 리턴하도록 한다.
class RequestBodyDecorator extends ServerHttpRequestDecorator {

    private final byte[] bytes;
    private final ServerWebExchange exchange;

    public RequestBodyDecorator(ServerWebExchange exchange, byte[] bytes) {

        super(exchange.getRequest());
        this.bytes = bytes;
        this.exchange = exchange;
    }

    @Override
    public Flux<DataBuffer> getBody() {

        return bytes==null||bytes.length==0?
        Flux.empty(): Flux.just(exchange.getResponse().bufferFactory().wrap(bytes));
    }
}

응용: RequestLogger

실시간으로 시스템 로그에 리퀘스트 정보를 입력하는 로거를 구현한다

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Component
public class RequestLogger implements WebFilter {

    private static final byte[] EMPTY_BYTES = new byte[0];

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        return DataBufferUtils
        .join(exchange.getRequest().getBody())
        .map(databuffer -> {

            final byte[] bytes = new byte[databuffer.readableByteCount()];

            DataBufferUtils.release(databuffer.read(bytes));

            return bytes;
        })
        .defaultIfEmpty(EMPTY_BYTES)
        .doOnNext(
            bytes -> Mono.fromRunnable(
                ()->{

                    // 리퀘스트 헤더와 바디를 출력
                    System.out.println(exchange.getRequest().getHeaders());
                    System.out.println(new String(bytes));
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe())
        .flatMap(
            bytes -> chain.filter(
                exchange
                .mutate()
                .request(new RequestBodyDecorator(exchange, bytes))
                .build()));
    }
}

class RequestBodyDecorator extends ServerHttpRequestDecorator {

    private final byte[] bytes;
    private final ServerWebExchange exchange;

    public RequestBodyDecorator(ServerWebExchange exchange, byte[] bytes) {

        super(exchange.getRequest());
        this.bytes = bytes;
        this.exchange = exchange;
    }

    @Override
    public Flux<DataBuffer> getBody() {

        return bytes==null||bytes.length==0?
        Flux.empty(): Flux.just(exchange.getResponse().bufferFactory().wrap(bytes));
    }
}

Test

위의 리퀘스트 로거를 테스트하기 위해 간단한 핸들러, 라우터를 설정한다.

handler

리퀘스트 바디를 읽은 후(리퀘스트 바디 소모), 그대로 다시 응답으로 반환해주는 핸들러

@Component
public class TestHandler {

    public Mono<ServerResponse> handle(ServerRequest serverRequest) {

        return serverRequest
        .bodyToMono(String.class)
        .flatMap(ServerResponse.ok()::bodyValue);
    }
}

router

    @Bean
    public RouterFunction<ServerResponse> test(TestHandler handler) {

        return route(GET("/test"), handler::handle);
    }

실행결과

포스트맨에서 임의로 보낸 바디에 대해 정상 반환이 왔으며,
스프링 로그에도 리퀘스트헤더, 바디가 로깅된 것을 볼 수 있다.

소스

바로가기

profile
안녕하세요...

0개의 댓글