[Spring WebFlux] Project Reactor

see1237·2022년 10월 14일
0

Section4

목록 보기
10/13

📌 Reactor란?

리액티브 스트림즈 표준 사양을 구현한 구현체 중 하나

Reactor 특징

  • 완전한 Non-Blocking 통신을 지원한다.
  • Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공
  • 서비스들 간의 통신이 잦은 MSA(Microservice Architecture) 구조에 적합 (Non-Blocking 통신)
  • Backpressure 전략 지원(Subscriber의 처리 속도가 Publihser의 emit 속도를 따라가지 못할 때 적절하게 제어하는 전략)

📌 스케줄러(Scheduler)

스케줄러(Scheduler)란?

  • 쓰레드를 관리하는 관리자 역할
  • Scheduler는 Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드를 제공해 준다.

💡 스케줄러는 복잡한 멀티쓰레딩 프로세스를 단순하게 해준다.

Scheduler 전용 Operator

  • subscribeOn()
    • 구독 직후 실행되는 Operator 체인의 실행 쓰레드Scheduler로 지정한 쓰레드로 변경한다.
    • 주로 데이터 소스에서 데이터를 emit하는 원본 Publisher의 실행 쓰레드를 지정하는 역할을 한다.
  • publishOn()
    • 전달 받은 데이터를 가공 처리하는 Operator 앞에 추가해서 실행 쓰레드를 별도로 추가하는 역할을 한다.

💡 publishOn()은 Operator 앞에 여러번 추가할 경우 별도의 쓰레드가 추가로 생성되지만 subscribeOn()여러 번 추가해도 하나의 쓰레드만 추가로 생성된다.

📌 상황별로 분류된 Operator 목록

  • 새로운 Sequence를 생성(Creating)하고자 할 경우
    • just()
    • fromStream()
      • Java의 Stream을 입력으로 전달 받아 emit
    • fromIterable()
      • Java의 Iterable을 입력으로 전달 받아 emit (List, Map, Set 등의 컬렉션을 파라미터로 전달할 수 있다.)
    • fromArray()
    • range()
    • interval()
    • empty()
    • never()
    • defer()
    • using()
    • generate()
    • create()
      • 프로그래밍 방식으로 Signal 이벤트를 발생시키는 Operator
      • 한 번에 여러 건의 데이터를 비동기적으로 emit할 수 있다.
      • 파라미터는 FluxSink라는 람다 파라미터를 가지는 람다 표현식
        • FluxSink는 Flux나 Mono에서 just(), fromIterable() 같은 데이터 생성 Operator에 데이터소스를 전달하면 내부에서 알아서 데이터를 emit 하는 등의 Sequence를 진행하는 것이 아니라 프로그래밍 방식으로 직접 Signal 이벤트를 발생 시켜서 Sequence를 진행하도록 해주는 기능을 한다.
      • 코드 예제
        /**
         * create() Operator기본 예제
        */
        @Slf4j
        public class CreateExample {
            private static List<Integer>source= Arrays.asList(1, 3, 5, 7, 9, 11, 13, 15, 17, 19);
        
            public static void main(String[] args) {
                Flux.create((FluxSink<Integer> sink) -> {
                    sink.onRequest(n -> {
                        for (int i = 0; i <source.size(); i++) {
                            sink.next(source.get(i));
                        }
                        sink.complete();
                    });
                    sink.onDispose(() ->log.info("# clean up"));
                }).subscribe(data ->log.info("# onNext: {}", data));
            }
        }
  • 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우
    • map()
    • flatMap()
      • 내부로 들어오는 데이터 한 건당 하나의 Sequence가 생성된다.(Inner Sequence)
      • perator에서 추가 쓰레드를 할당할 경우, 작업의 처리 순서를 보장하지 않는다
    • concat()
      • 논리적으로 하나의 Sequence로 이어 붙인 후, 이어 붙인 Sequecne에서 시간 순서대로 데이터를 차례대로 emit한다.
    • collectList()
    • collectMap()
    • merge()
    • zip()
      • 입력으로 전달되는 여러 개의 Publisher Sequence에서 emit된 데이터를 결합하는 Operator
        • 결합의 의미는 각 Publisher가 emit하는 데이터를 하나씩 전달 받아서 새로운 데이터를 만든 후에 Downstream으로 전달한다는 의미
      • 입력으로 주어진 Sequence의 emit 시점이 매번 다르더라도 동일 index 상의 emit 시점이 늦은 데이터가 emit될 때까지 대기했다가 데이터가 모두 emit되면 해당 데이터를 모두 전달 받는다.
    • then()
    • switchIfEmpty()
    • and()
    • when()
  • Sequence 내부의 동작을 확인(Peeking)하고자 할 경우
    • doOnSubscribe
      • 구독 발생 직후에 트리거 되는 Operator
    • doOnNext()
      • 데이터 emit 시 트리거 되어 부수 효과(side-effect)를 추가할 수 있는 Operator
        • 함수형 프로그래밍 세계에서 부수 효과(side-effect)는 어떤 동작을 실행하되 리턴 값이 없는 것을 의미
    • doOnError()
    • doOnCancel()
    • doFirst()
    • doOnRequest()
    • doOnTerminate()
    • doAfterTerminate()
    • doOnEach()
    • doFinally()
    • log()
      • Publisher에서 발생하는 Signal 이벤트를 로그로 출력해주는 역할
  • Sequence에서 데이터 필터링(Filtering)이 필요한 경우
    • filter()
    • ignoreElements()
    • distinct()
    • take()
    • next()
    • skip()
    • sample()
    • single()
  • 에러를 처리(Handling errors)하고자 할 경우
    • error()
      • 의도적으로 onError Signal 이벤트를 발생시킬 때 사용
    • timeout()
      • 입력으로 주어진 시간동안 emit되는 데이터가 없으면 onError Signal 이벤트를 발생
        시킨다.
    • onErrorReturn()
    • onErrorResume()
    • onErrorMap()
    • doFinally()
    • retry()
      • Sequence 상에서 에러가 발생할 경우, 입력으로 주어진 숫자만큼 재구독해서 Sequence를 다시 시작한다.

0개의 댓글