[Project Reactor] 8. Reactor에서의 Backpressure: 과도한 데이터 흐름을 제어하는 방법

y001·2025년 5월 1일

Reactive Programming

목록 보기
8/30
post-thumbnail

리액티브 프로그래밍의 세계에서는 데이터가 끊임없이 비동기적으로 흐르며 소비자에게 전달된다. 이 과정에서 생산자(Publisher)가 소비자(Subscriber)의 처리 속도보다 너무 빠르게 데이터를 발행하면 어떻게 될까? 메모리 누수나 과부하, 시스템 다운 등의 문제가 발생할 수 있다.

이런 상황을 제어하기 위한 핵심 개념이 바로 Backpressure(역압)이다.


Backpressure란?

Backpressure는 비동기 데이터 스트림에서 Subscriber의 처리 능력에 맞춰 Publisher가 데이터를 조절하도록 요구하는 흐름 제어 메커니즘이다. 즉, Subscriber가 아직 데이터를 처리하지 못했는데도 Publisher가 계속 데이터를 emit한다면, 버퍼가 차오르고 결국 시스템 장애로 이어질 수 있다.

Backpressure는 이러한 위험을 방지하고, 데이터 처리 흐름에 유연한 제어권을 부여하기 위해 등장했다.


Reactor에서 Backpressure는 어떻게 처리되는가?

Project Reactor는 Reactive Streams 사양을 구현하고 있으므로, 기본적으로 Backpressure를 지원한다. 특히, 다음과 같은 두 가지 방식으로 역압 상황에 대응할 수 있다.


1. 데이터 개수 요청 방식: request(n) 사용

가장 직접적인 방식은 Subscriber가 처리 가능한 데이터의 개수를 명시적으로 요청하는 것이다. 이를 통해 소비자는 자신이 처리할 수 있는 만큼만 데이터를 받아볼 수 있으며, 생산자는 그 요청 수치에 맞춰 데이터를 emit하게 된다.

Reactor에서는 BaseSubscriber 클래스를 활용하여 이 과정을 세밀하게 제어할 수 있다.

Flux.range(1, 5)
    .doOnRequest(data -> log.info("doOnRequest : {}", data))
    .subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            log.info("hookOnSubscribe");
            request(1); // 최초 요청은 한 개
        }

        @Override
        protected void hookOnNext(Integer value) {
            log.info("hookOnNext : {}", value);
            request(1); // 하나 처리할 때마다 다음 한 개 요청
        }
    });

이 예제에서는 Subscriber가 하나의 값을 처리한 후에만 다음 값을 요청하도록 구성되어 있어, 명확하게 흐름을 제어할 수 있다. 이를 통해 소비자의 부담을 최소화하고 시스템 안정성을 확보할 수 있다.


2. Backpressure 전략 설정: onBackpressureXXX() 연산자 사용

모든 상황에서 수동 제어가 가능한 것은 아니다. 특히 외부에서 들어오는 데이터가 너무 빠르거나 예측할 수 없는 경우에는 자동 대응 전략이 필요하다. Reactor는 이러한 상황을 위해 여러 종류의 onBackpressureXXX() 연산자를 제공한다.

전략 이름설명Reactor 연산자
IGNORE역압을 무시하고 데이터를 그대로 밀어넣는다. Subscriber가 감당하지 못하더라도 전송을 멈추지 않는다.사용하지 않음 또는 publish() 등으로 emit
ERRORSubscriber가 처리를 따라가지 못하면 즉시 예외를 발생시키고 스트림 종료onBackpressureError()
DROP버퍼가 꽉 차면 새로 들어오는 데이터를 무시한다. 기존 버퍼는 유지됨onBackpressureDrop()
LATEST최신 데이터만 유지하고, 이전 emit된 데이터는 모두 폐기한다onBackpressureLatest()
BUFFER버퍼를 지정한 크기만큼 유지하고, 그 이상 초과하면 다음 전략에 따라 데이터 제거onBackpressureBuffer() (옵션: DROP_OLDEST, DROP_LATEST)

이러한 전략들은 데이터 손실을 허용할 것인지, 예외를 발생시킬 것인지, 혹은 최신 데이터만 유지할 것인지 등 각자의 상황에 맞는 선택지를 제공한다.


예: onBackpressureDrop() 사용 예시

Flux.interval(Duration.ofMillis(1)) // 매우 빠르게 emit
    .onBackpressureDrop()
    .subscribe(data -> {
        try {
            Thread.sleep(100); // 느리게 처리
            System.out.println(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
  • 이 예제에서는 매우 빠른 속도로 데이터를 emit하지만, 처리 속도가 느려 역압이 발생한다.
  • onBackpressureDrop() 덕분에 처리 속도를 따라가지 못하는 데이터는 무시되고, 시스템은 안정적으로 동작한다.

0개의 댓글