Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 데이터 처리 방식
Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 방식
- BaseSubscriber 객체
- hookOnSubscrbie
- hookOnNext
Flux.range(1, 5)
.doOnRequest(data -> log.info("# doOnRequest: {}", data))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
Thread.sleep(2000L);
log.info("# hookOnNext: {}", value);
request(1);
}
});
}
// 02:12:48.282 [main] INFO - # doOnRequest: 1
// 02:12:50.287 [main] INFO - # hookOnNext: 1
// 02:12:50.288 [main] INFO - # doOnRequest: 1
// 02:12:52.293 [main] INFO - # hookOnNext: 2
// 02:12:52.294 [main] INFO - # doOnRequest: 1
// 02:12:54.301 [main] INFO - # hookOnNext: 3
// 02:12:54.303 [main] INFO - # doOnRequest: 1
// 02:12:56.309 [main] INFO - # hookOnNext: 4
// 02:12:56.311 [main] INFO - # doOnRequest: 1
// 02:12:58.312 [main] INFO - # hookOnNext: 5
// 02:12:58.313 [main] INFO - # doOnRequest: 1
종류 | 설명 |
---|---|
IGNORE 전략 | Backpressure를 적용하지 않는다. |
ERROR 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략 |
DROP 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시키는 전략 |
LATEST 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에 emit된 데이터부터 버퍼에 채우는 전략 |
BUFFER 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시키는 전략 |
IllegalStateException
이 발생할 수 있다.IllegalStateException
을 발생시킨다.Flux
.interval(Duration.ofMillis(1L))
.onBackpressureError()
.doOnNext(data -> log.info("# doOnNext: {}", data))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
/*
13:38:32.009 [parallel-2] INFO - # doOnNext: 0
13:38:32.011 [parallel-2] INFO - # doOnNext: 1
13:38:32.011 [parallel-2] INFO - # doOnNext: 2
...
13:38:32.017 [parallel-1] INFO - # onNext: 0
...
13:38:33.683 [parallel-1] INFO - # onNext: 255
13:38:33.687 [parallel-1] ERROR- # onError
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739)
at reactor.core.publisher.FluxOnBackpressureDrop$DropSubscriber.onNext(FluxOnBackpressureDrop.java:135)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
*/
interval()
OperatoronBackpressureError()
doOnNext()
OperatorpublishOn()
OperatorFlux
.interval(Duration.ofMillis(1L))
.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
/*
14:31:14.551 [parallel-1] INFO - # onNext: 0
14:31:14.558 [parallel-1] INFO - # onNext: 1
14:31:14.565 [parallel-1] INFO - # onNext: 2
14:31:14.570 [parallel-1] INFO - # onNext: 3
// ...
14:31:14.788 [parallel-1] INFO - # onNext: 38
14:31:14.794 [parallel-1] INFO - # onNext: 39
14:31:14.801 [parallel-1] INFO - # onNext: 40
14:31:14.801 [parallel-2] INFO - # dropped: 256
14:31:14.802 [parallel-2] INFO - # dropped: 257
// ...
14:31:14.806 [parallel-2] INFO - # dropped: 261
14:31:14.806 [parallel-1] INFO - # onNext: 41
*/
onBackpressureDrop()
OperatorFlux
.interval(Duration.ofMillis(1L))
.onBackpressureLatest()
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
/*
14:41:14.230 [parallel-1] INFO - # onNext: 0
14:41:14.238 [parallel-1] INFO - # onNext: 1
14:41:14.245 [parallel-1] INFO - # onNext: 2
14:41:14.251 [parallel-1] INFO - # onNext: 3
// ...
14:41:15.844 [parallel-1] INFO - # onNext: 254
14:41:15.850 [parallel-1] INFO - # onNext: 255
14:41:15.857 [parallel-1] INFO - # onNext: 1224
14:41:15.863 [parallel-1] INFO - # onNext: 1225
*/
DROP 전략
은 나 자신을 폐기하는 것LATEST 전략
은 나 자신 보다 앞에 있는 누군가를 폐기하는 것Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_LATEST)
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
/*
15:02:44.771 [parallel-2] INFO - # emitted by original Flux: 0
15:02:44.778 [parallel-2] INFO - [ # emitted by Buffer: 0 ]
15:02:45.074 [parallel-2] INFO - # emitted by original Flux: 1
15:02:45.370 [parallel-2] INFO - # emitted by original Flux: 2
15:02:45.671 [parallel-2] INFO - # emitted by original Flux: 3
15:02:45.674 [parallel-2] INFO - ** Overflow & Dropped: 3 **
15:02:45.784 [parallel-1] INFO - # onNext: 0
15:02:45.784 [parallel-1] INFO - [ # emitted by Buffer: 1 ]
15:02:45.969 [parallel-2] INFO - # emitted by original Flux: 4
15:02:46.269 [parallel-2] INFO - # emitted by original Flux: 5
15:02:46.269 [parallel-2] INFO - ** Overflow & Dropped: 5 **
15:02:46.573 [parallel-2] INFO - # emitted by original Flux: 6
15:02:46.574 [parallel-2] INFO - ** Overflow & Dropped: 6 **
15:02:46.785 [parallel-1] INFO - # onNext: 1
15:02:46.787 [parallel-1] INFO - [ # emitted by Buffer: 2 ]
15:02:46.872 [parallel-2] INFO - # emitted by original Flux: 7
*/
doOnNext()
Operator를 통해 interval()
Operator에서 생성된 원본 Flux 데이터가 emit되는 과정을 확인할 수 있다.doOnNext()
Operator를 통해 Buffer에서 Downstream으로 emit되는 데이터를 확인할 수 있다.BUFFER DROP_LATEST 전략
과 정반대Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_OLDEST)
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
/*
15:14:53.788 [parallel-2] INFO - # emitted by original Flux: 0
15:14:53.795 [parallel-2] INFO - [ # emitted by Buffer: 0 ]
15:14:54.089 [parallel-2] INFO - # emitted by original Flux: 1
15:14:54.383 [parallel-2] INFO - # emitted by original Flux: 2
15:14:54.683 [parallel-2] INFO - # emitted by original Flux: 3
15:14:54.686 [parallel-2] INFO - ** Overflow & Dropped: 1 **
15:14:54.797 [parallel-1] INFO - # onNext: 0
15:14:54.798 [parallel-1] INFO - [ # emitted by Buffer: 2 ]
15:14:54.985 [parallel-2] INFO - # emitted by original Flux: 4
15:14:55.282 [parallel-2] INFO - # emitted by original Flux: 5
15:14:55.284 [parallel-2] INFO - ** Overflow & Dropped: 3 **
15:14:55.586 [parallel-2] INFO - # emitted by original Flux: 6
15:14:55.586 [parallel-2] INFO - ** Overflow & Dropped: 4 **
15:14:55.804 [parallel-1] INFO - # onNext: 2
15:14:55.804 [parallel-1] INFO - [ # emitted by Buffer: 5 ]
15:14:55.881 [parallel-2] INFO - # emitted by original Flux: 7
*/