위 상황에서 emit된 데이터들은 Subscriber가 data 1을 처리하기 전까지 대기하게 된다. 이때 data 1의 처리 속도가 느리고 Publisher의 emit 속도가 빠르다면 오버플로우가 발생하게 된다. 이러한 문제를 해결하기 위한 수단이 Backpressure!
Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher 에게 요청
Flux.range(1, 5)
.doOnRequest(data -> log.info("# doOnRequest: {}", data))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// onSubscribe() 대신 구독 시점에 request() 메서드 호출해 최초 데이터 오청 개수 제어
request(1);
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
//onNext() 대신해 publisher가 emit한 데이터를 전달받아 처리한 후 Publisher에게 또다시 데이터 요청
Thread.sleep(2000L); //subscriber의 처리 속도가 느리도록 지연설정
log.info("# hookOnNext: {}", value);
request(1);
}
});
Reactor에서 지원하는 Backpressure 전략을 사용하는 것, onBackpressureXXX() Operator 를 사용해 적용할 수 있다.
Ignore
Error
Drop
LATEST
-> 현재 emit된 데이터를 나라고 가정한다면 DROP전략은 나 자신을 폐기하는 것이고, LATEST 전략은 내 앞을 폐기하는 것
BUFFER