배압은 Reactor의 중요한 부분이다. 각 구독자는 구독 객체를 사용하여 처리한 이벤트를 요청한다.
발행자는 이벤트 요청의 수와 같거나 더 적은 이벤트를 발행해야한다.
각 구독자는 받은 이벤트를 처리하며, Subscription Handle 을 통하여 추가적으로 이벤트를 요청할 수 있다.
@Test
void testBackPressure() throws InterruptedException {
Flux<Integer> numberGenerator = Flux.create(x -> {
System.out.println("Requested Event: " + x.requestedFromDownStream());
int number = 1;
while(number < 100) {
x.next(number);
number++;
}
x.complete();
});
CountDownLatch latch = new CountDownLatch(1);
numberGenerator.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
}
@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
});
}
위에서 구독자는 1개의 이벤트만 요청하지만 발행자는 100개의 이벤트를 발행한다. Reactor는 이벤트를 큐에 보관하기 위한 몇 가지 작업을 한다. 여기서 이벤트 발행이 빠른 발행자에 대응하기 위한 몇 가지 OVERFLOW 전략을 제공한다.
OverFlowException
예외를 발생시킨다.@Test
void tastBackPressureError() throws InterruptedException {
Flux<Integer> numberGenerator = Flux.create(x -> {
System.out.println("Requested Events: " + x.requestFromDownStream());
int number = 1;
while(number < 100) {
x.next(number);
number++;
}
x.complete();
}, FluxSink.OverFlowStategy.ERROR);
CountDownLatch latch = new CountDownLatch(1);
numberGenerator.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
}
@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
protected void bookOnComplete() {
latch.countDown();
}
});
latch.await(1L, TimeUnit.SECONDS);
}
Requested Events : 1
1
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Reactor는 발행자에게 설정할 OVERFLOW 전략을 위한 연산자도 제공한다.
@Test
void onBackPreesureDrop() throws InterruptedException {
Flux<Inegeter> numberGenerator = Flux.create(x -> {
System.out.println("Request Events: " + x.requestedFromDownstream());
int number = 1;
while(number < 100) {
x.next(number);
number++;
}
x.complete();
});
CountDownLatch latch = new CountDownLatch(1);
numberGenerator
.onBackpressureDrop(x -> System.out.println("Dropped : " + x))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void bookOnNext(Integer value) {
System.out.println(value);
}
@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
});
latch.await(1L, TimeUnit.SECONDS);
}
Requested Events : 9223372036854775807
1
Dropped : 2
Dropped : 3
Dropped : 4
Dropped : 5
...
Dropped : 96
Dropped : 97
Dropped : 98
Dropped : 99
추가적으로 onBackpressureLatest(), onBackpressureError(), onBackpressureBuffer() 연산자가 있다.
여기서, onBackpressureBuffer() 메서드는 버퍼의 사이즈를 명시할 수 있으며 안래의 전략을 선택하여 OVERFLOW 전략을 선택할 수 있다.
OverFlowException
예외를 일으킨다.@Test
void onBackpressureBuffer() {
CountDownLatch latch = new CountDownLatch(1);
numberGenerator
.onBockpressureBuffer(
2,
x -> System.out.println("Dropped : " + x),
BufferOverflowStategy.DROP_LATEST)
.subscribe(new BaseSubscribe<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
}
@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
]);
latch.awiat(1L, TimeUnit.SECONDS);
}