Reactor: BackPressure (배압)

xellos·2022년 5월 15일
0

JAVA-Reactor

목록 보기
6/11

소개

배압은 Reactor의 중요한 부분이다. 각 구독자는 구독 객체를 사용하여 처리한 이벤트를 요청한다.
발행자는 이벤트 요청의 수와 같거나 더 적은 이벤트를 발행해야한다.

  • 이벤드 요청 수를 Lonb.MAX_VALUE로 설정하면 가지고 있는 이벤트를 전부 보낸다.

각 구독자는 받은 이벤트를 처리하며, Subscription Handle 을 통하여 추가적으로 이벤트를 요청할 수 있다.

  • 만약 발행자가 이벤트를 빠른 속도로 발행하면, non-requests 이벤트를 다루는 전략을 고려해야 한다.

1) 기본 예제

  • 아래의 코드에서 구독자는 1개의 이벤트만 요청한다.
  • 그러나 발행자는 100개의 이벤트를 발행한다.
@Test
void testBackPressure() throws InterruptedException {
	Flux<Integer> numberGenerator = Flux.create(x -> {
    	System.out.println("Requested Event: " + x.requestedFromDownStream());
        
        int number = 1;
        while(number < 100) {
        	x.next(number);
            number++;
        }
        x.complete();
    });
    
    CountDownLatch latch = new CountDownLatch(1);
    numberGenerator.subscribe(new BaseSubscriber<Integer>() {
    	@Override
        protected void hookOnSubscribe(Subscription subscription) {
        	request(1);
        }
        
        @Override
        protected void hookOnNext(Integer value) {
        	System.out.println(value);
        }
        
        @Override
        protected void hookOnError(Throwable throwable) {
        	throwable.printStackTrace();
            latch.countDown();
        }
        
        @Override
        protected void hookOnComplete() {
			latch.countDown();
		}
    });
}

위에서 구독자는 1개의 이벤트만 요청하지만 발행자는 100개의 이벤트를 발행한다. Reactor는 이벤트를 큐에 보관하기 위한 몇 가지 작업을 한다. 여기서 이벤트 발행이 빠른 발행자에 대응하기 위한 몇 가지 OVERFLOW 전략을 제공한다.

  • IGNORE: 이벤트를 계속 구독자에게 전달하기 위해서, 구독자가 정한 Limit를 무시한다.
  • BUFFER: 전달되지 않은 이벤트를 버퍼에 보관한다. 구독자가 추가적인 요청을 하면 버퍼의 이벤트가 발행된다. (DEFAULT)
  • DROP: 발행된 이벤트를 버린다. 다음 요청을 할 경우 버려지지 않은 새로운 데이터를 받는다.
  • LATEST: 버퍼에 가능한 최신의 데이터를 저장한다. 다음 요청이 발생하면 최근의 요청을 받는다.
  • ERROR: 요청 이상의 이벤트를 받게되면 구독자가 OverFlowException 예외를 발생시킨다.

2) Error 예제

@Test
void tastBackPressureError() throws InterruptedException {
	Flux<Integer> numberGenerator = Flux.create(x -> {
    	System.out.println("Requested Events: " + x.requestFromDownStream());
        
        int number = 1;
        while(number < 100) {
        	x.next(number);
            number++;
        }
        x.complete();
    }, FluxSink.OverFlowStategy.ERROR);
    
    CountDownLatch latch = new CountDownLatch(1);
    numberGenerator.subscribe(new BaseSubscriber<Integer>() {
		@Override
        protected void hookOnSubscribe(Subscription subscription) {
        	request(1);
        }
        
        @Override
        protected void hookOnNext(Integer value) {
        	System.out.println(value);
        }
        
        @Override
        protected void hookOnError(Throwable throwable) {
        	throwable.printStackTrace();
            latch.countDown();
        }
        
        @Override
        protected void bookOnComplete() {
        	latch.countDown();
        }
	});
    
    latch.await(1L, TimeUnit.SECONDS);
}
  • 결과
Requested Events : 1
1
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

3) OnBackpressure

Reactor는 발행자에게 설정할 OVERFLOW 전략을 위한 연산자도 제공한다.

  • OnBackPressureXXX() 형태의 메서드를 제공
  • 발행자가 IGNORE 전략으로 구성되는 경우도 있다. 이 경우 발행자를 구동하는 구독자의 설정에 따라 배압이 설정된다.

DROP 예제 )

@Test
void onBackPreesureDrop() throws InterruptedException {
	Flux<Inegeter> numberGenerator = Flux.create(x -> {
    	System.out.println("Request Events: " + x.requestedFromDownstream());
        int number = 1;
        while(number < 100) {
        	x.next(number);
            number++;
        }
        x.complete();
    });
    
    CountDownLatch latch = new CountDownLatch(1);
    numberGenerator
    	.onBackpressureDrop(x -> System.out.println("Dropped : " + x))
        .subscribe(new BaseSubscriber<Integer>() {
			@Override
            protected void hookOnSubscribe(Subscription subscription) {
            	request(1);
            }
            
            @Override
            protected void bookOnNext(Integer value) {
            	System.out.println(value);
            }
            
            @Override
            protected void hookOnError(Throwable throwable) {
            	throwable.printStackTrace();
                latch.countDown();
            }
            
            @Override
            protected void hookOnComplete() {
            	latch.countDown();
            }
		});
        
    latch.await(1L, TimeUnit.SECONDS);
}
  • 결과
Requested Events : 9223372036854775807
1
Dropped : 2
Dropped : 3
Dropped : 4
Dropped : 5
...
Dropped : 96
Dropped : 97
Dropped : 98
Dropped : 99

추가적으로 onBackpressureLatest(), onBackpressureError(), onBackpressureBuffer() 연산자가 있다.


BUFFER 예제 )

여기서, onBackpressureBuffer() 메서드는 버퍼의 사이즈를 명시할 수 있으며 안래의 전략을 선택하여 OVERFLOW 전략을 선택할 수 있다.

  • DROP_LATEST: 새로 발행된 이벤트를 버린다. 버퍼에는 오래된 이벤트가 남는다.
  • DROP_OLDEST: 새로 발행된 이벤트를 받고, 오래된 이벤트를 버린다. 버퍼에는 새로운 이벤트가 남는다.
  • ERROR: 버퍼의 용량을 초과하면 OverFlowException 예외를 일으킨다.
@Test
void onBackpressureBuffer() {
	CountDownLatch latch = new CountDownLatch(1);
    numberGenerator
    	.onBockpressureBuffer(
        	2, 
            x -> System.out.println("Dropped : " + x),
            BufferOverflowStategy.DROP_LATEST)
        .subscribe(new BaseSubscribe<Integer>() {
        	@Override
            protected void hookOnSubscribe(Subscription subscription) {
            	request(1);
            }
            
            @Override
            protected void hookOnNext(Integer value) {
            	System.out.println(value);
            }
            
            @Override
            protected void hookOnError(Throwable throwable) {
            	throwable.printStackTrace();
                latch.countDown();
            }
            
            @Override
            protected void hookOnComplete() {
            	latch.countDown();
            }
        ]);
        
    latch.awiat(1L, TimeUnit.SECONDS);
}

0개의 댓글