Backpressure: WebFlux 예제

김기현·2025년 8월 1일

Spring WebFlux

목록 보기
10/28

1. 기본 백프레셔 동작 확인 (take(N) 사용)

가장 간단한 형태로 WebFlux가 데이터를 어떻게 제어하는지 보여준다.
Flux.interval은 기본적으로 요청하는 대로 데이터를 발행한다.

@Slf4j
@RestController
public class BackpressureController {

    @GetMapping("/default-backpressure")
    public Flux<String> defaultBackpressure() {
        return Flux.interval(Duration.ofMillis(100))    // 100mx마다 데이터 생성 (매루 빠른 속도임)
                .map(i -> String.format("Item %s at %d", i, System.currentTimeMillis()))
                .doOnRequest(n -> log.info("생산자: {}개 요청을 받음", n))
                .doOnNext(item -> log.info("생산자: {} 발행", item)) // 소비자가 요청한 개수
                .take(10)   // 10개만 발행하도록 제한(이것이 백프레셔에 영향을 준다)
                .doOnComplete(() -> log.info("스트림 완료"));
    }
}

설명

  • Flux.interval(Duration.ofMillis(100))은 100밀리초마다 매우 빠르게 데이터를 생산한다.
  • doOnRequest(n -> log.info("생산자: {}개 요청 받음", n)) 훅은 소비자가 생산자에게 request(n) 신호를 보낼 때마다 그 개수를 로깅한다.
  • .take(10)은 스트림의 요소를 10개까지만 받겠다는 의미이다. 이것이 바로 소비자 측에서의 간접적인 백프레셔 요청 역할을 합니다. WebFlux는 클라이언트가 10개를 요청했음을 인지하고, 생산자에게 10개만 요청합니다.

2. limitRate()를 이용한 백프레셔 명시적 조절

@Slf4j
@RestController
public class BackpressureController {

    @GetMapping("/limit-rate-backpressure")
    public Flux<String> limitRateBackpressure() {
        return Flux.interval(Duration.ofMillis(100))
                .map(i -> String.format("Limited Item %s at %d", i, System.currentTimeMillis()))
                .doOnRequest(n -> log.info("생산자: {}개 요청 받음", n))
                .limitRate(5) // 중요: 5개씩 요청 (버퍼 5 / 프리패리 3.75)
                .doOnNext(item -> log.info("소비자: {} 처리 중", item))
                .take(20) // 총 20개만 받음
                .doOnComplete(() -> log.info("스트림 완료"));
    }
}

limitRate(N) 설명

limitRate(N)은 내부적으로 버퍼링과 프리패치(Prefetch)를 사용하여 백프레셔를 구현한다.

  • N: 소비자가 한 번에 최대로 요청할 수 있는 데이터의 개수.
  • N이 지정되면 Reactor는 기본적으로 N x 0.75개의 데이터가 처리될 때마다 N / 4개만큼의 추가 요청을 보낸다.
    • 이렇게 미리 데이터를 가져오는 것을 프리패치라고 한다.

3. onBackpressureDrop(): 데이터 손실 허용

어떤 시나리오에서는 모든 데이터를 처리할 필요가 없고 최신 데이터만 중요하거나 시스템의 안정성이 데이터 보존보다 우선될 수 있다.
이럴 때는 데이터를 드롭(Drop)하는 전략을 사용한다.

@Slf4j
@RestController
public class BackpressureController {
    
    @GetMapping("/backpressure-drop")
    public Flux<String> backpressureDrop() {
        return Flux.interval(Duration.ofMillis(10)) // 10ms마다 데이터 생성 (매우매우 빠름)
                .map(i -> "Dropped Item " + i + " at " + System.currentTimeMillis())
                .doOnRequest(n -> log.info("생산자: {}개 요청 받음", n))
                .onBackpressureDrop(droppedItem -> log.warn("백프레셔: {} 데이터 드롭됨!", droppedItem)) // 백프레셔 발생 시 드롭
                .doOnNext(item -> {
                    // 실제 처리 로직을 시뮬레이션 (느리게 처리)
                    try {
                        Thread.sleep(500); // 500ms 처리 시간
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    log.info("소비자: {} 처리 완료", item);
                })
                .take(5) // 5개만 받음 (테스트를 위해)
                .doOnComplete(() -> log.info("스트림 완료"));
    }
}

설명

  • Flux.interval(Duration.ofMillis(10))은 데이터를 10밀리초마다 생성하여 생산 속도가 매우 빠르다.
  • doOnNext 내부의 Thread.sleep(500)은 소비자가 하나의 데이터를 처리하는 데 500밀리초가 걸린다고 가정한다. 이는 소비자의 처리 속도가 생산자보다 훨씬 느리다는 것을 의미한다.
  • onBackpressureDrop() 연산자는 소비자가 처리할 수 있는 속도보다 데이터가 빠르게 들어올 때, 새로운 데이터를 버리는 전략을 사용한다. 버려진 데이터는 람다 표현식으로 로그를 남길 수 있다.
profile
백엔드 개발자를 목표로 공부하는 대학생

0개의 댓글