Backpressure 실습

알파로그·2023년 11월 10일
0

Spring WebFlux

목록 보기
9/13

✏️ 예제 1

  • sub 쪽에서 buffer 를 1로 제한해 backpressure 를 구현
// Subscriber 가 처리 가능한 만큼 request 개수를 조절하는 Backpressure 예시
Flux.range(1, 5)
        // publisher 가 공급한 data 출력
        .doOnNext(logger::pubLog)
        // subscriber 가 요청한 data 수 출력
        .doOnRequest(logger::reqLog)
        // 요청 data 의 수를 조절하는 BaseSubscriber 생성
        .subscribe(new BaseSubscriber<Integer>() {

            // pub 으로 보낼 request
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                // 데이터 요청 갯수를 1로 설정
                request(1);
            }

            // sub 에서 data 를 받으면 실행되는 method
            @Override
            protected void hookOnNext(Integer value) {
                // 2초의 data 처리 후 다시 1개로 설정후 pub 으로 요청
                sleep(2000L);
                System.out.println("처리한 데이터 : " + value);
                request(1);
            }

            private void sleep(Long ms) {
                try {
                    Thread.sleep(ms);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
  • 주석 X
Flux.range(1, 5)
        .doOnNext(logger::pubLog)
        .doOnRequest(logger::reqLog)
        .subscribe(new BaseSubscriber<Integer>() {

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);
            }

            @Override
            protected void hookOnNext(Integer value) {
                sleep(2000L);
                System.out.println("처리한 데이터 : " + value);
                request(1);
            }
        });
  • 결과
요청한 데이터 수 : 1
공급한 데이터 : 1
처리한 데이터 : 1
요청한 데이터 수 : 1
공급한 데이터 : 2
처리한 데이터 : 2
요청한 데이터 수 : 1
공급한 데이터 : 3
처리한 데이터 : 3
요청한 데이터 수 : 1
공급한 데이터 : 4
처리한 데이터 : 4
요청한 데이터 수 : 1
공급한 데이터 : 5
처리한 데이터 : 5
요청한 데이터 수 : 1

✏️ 예제 2

  • sub 에서 데이터를 2개씩 요청함
public static int count = 0;

public static void main(String[] args) {

    Flux<Integer> flux = Flux.range(1, 5)
            .doOnNext(BackpressureExample2::pubLog)
            .doOnRequest(BackpressureExample2::reqLog);

    flux.subscribe(new BaseSubscriber<Integer>() {

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(2);
        }

        @Override
        protected void hookOnNext(Integer value) {
            count++;
            System.out.println("처리한 데이터 : " + value);

            if (count == 2) {
                sleep(2000L);
                request(2);
                count = 0;
            }
        }
}
  • 결과
요청한 데이터 수 : 2
공급한 데이터 : 1
처리한 데이터 : 1
공급한 데이터 : 2
처리한 데이터 : 2
요청한 데이터 수 : 2
공급한 데이터 : 3
처리한 데이터 : 3
공급한 데이터 : 4
처리한 데이터 : 4
요청한 데이터 수 : 2
공급한 데이터 : 5
처리한 데이터 : 5
profile
잘못된 내용 PR 환영

0개의 댓글