리액티브 프로그래밍 시작하기 - Reactor편

ssongkim·2023년 8월 12일
0

Overview

이전 게시글을 통해 리액티브 개념에 대해 정리해보았습니다. (리액티브, 리액티브 시스템, 리액티브 프로그래밍, 리액티브 스트림즈......?!)

이번에는 앞서 나열했던 여러 리액티브 스트림즈 구현체 중 하나인 Reactor에 대해 알아보는 시간을 가져보겠습니다.

Reactor란

Reactor는 스프링 프레임워크 팀의 주도하에 개발된 리액티브 프로그래밍을 위한 라이브러리로, 리액티브 스트림즈 구현체 중 하나입니다.

Spring Framework 5버전 부터 리액티브 스택에 포함되어 Spring Webflux는 기본적으로 Reactor를 지원합니다.

특징

Reactor의 특징은 공식문서에서 소개하고 있습니다. https://projectreactor.io/

요약하면 Reactor는 리액티브 스트림즈 사양을 구현하였으며 JVM 위에서 실행되는 논블로킹 애플리케이션을 개발하는데 필요한 핵심 기술입니다.
자바의 함수형 프로그래밍 API를 통해 PublisherSubscriber의 상호작용이 이루어지며, 백프레셔를 지원합니다.

Marble Diagram

리액티브 스트림즈를 이해하려면 마블 다이어그램에 대한 개념을 먼저 알아야합니다.

Reactive Flow는 마블 다이어그램 ( marble diagram ) 으로 나타냅니다.

마블(Marble)은 구슬이라는 뜻입니다.

구슬 모양의 알록달록한 동그라미는 하나의 데이터를 의미하며, 다이어그램 상에서 시간의 흐름에 따라 변화하는 데이터의 흐름을 표현합니다.

마블 다이어그램의 제일 위에는 FluxMono를 통해 전달되는(방출되는) 데이터의 타임라인을 나타내고, 중앙에는 오퍼레이션, 제일 밑에는 결과로 생성되는 FluxMono의 타임라인을 나타냅니다.

우리는 마블 다이어그램을 통해 해당 오퍼레이션이 어떤 Reactive Flow를 가지고 스트림이 어떻게 처리되어 아웃풋이 나오는지 확인할 수 있습니다.

특정 메서드에 대한 마블 다이어그램을 보고싶다면 구글링을 하는 방법도 있고, 인텔리제이를 사용한다면 해당 오퍼레이션에 마우스를 갖다 대는 방법도 있습니다.

예를 들어 filter 오퍼레이션의 마블 다이어그램을 보겠습니다.

맨 위에는 방출되는 element를 표현하고, 오퍼레이션 수행 결과로 30, 22, 60 element가 표현되는 것을 볼 수 있습니다.

각 요소는 방출됐던 순서에 따라 배열되어있는 것으로 보아 이를 통해 방출된 element는 오퍼레이션을 수행하고 순서보장이 됨을 확인할 수 있습니다.

리액티브 스트림즈 컴포넌트

리액티브 스트림즈는 4개의 인터페이스인 Publisher(발행자), Subscriber(구독자), Subscription(구독), Processor(프로세서)가 존재합니다.

PublisherSubscriber의 동작 과정입니다.

그림 상으로는 PublisherSubscriber가 마치 같은 스레드에서 동기적으로 상호작용하는 것처럼 보이지만 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분입니다.

이때 만약 Publisher가 통지하는 속도가 Subscriber의 처리 속도보다 빠르면 처리를 기다리는 데이터는 쌓이게 되고, 결과적으로 시스템 부하가 커지게 됩니다.

이러한 문제를 방지하기 위해 Subscriber가 Subscription.request를 통해 데이터의 요청 개수를 지정하여 데이터 개수를 제어할 수 있습니다.

이는 백프레셔의 구현부분에 해당하며 개념에 관한 자세한 설명은 이전 게시글의 백프레셔 파트를 확인해주세요.

Reactor는 리액티브 스트림즈 표준을 따르는 구현체이므로 리액티브 스트림즈 인터페이스를 구현한 구현체들이 존재합니다.

Publisher

Publisher는 무한한 data를 제공합니다. 제공되어진 data는 Subscriber가 구독하는 형식으로 처리합니다.

Reactor에서는 publisher 인터페이스 구현체로 FluxMono가 있습니다.

    @Test
    @DisplayName("Publisher")
    void publisher()
    {
        Flux<String> flux = Flux.just("A", "B");
        Mono<String> mono = Mono.just("A");

        Assertions.assertInstanceOf(Publisher.class, flux);
        Assertions.assertInstanceOf(Publisher.class, mono);
    }

테스트코드로 FluxMonoPublisher.class의 일부임을 알 수 있습니다.

이러한 publisher 인터페이스는 subscribe 라는 추상 메서드를 가집니다. Publisher.subscribe(Subscriber)의 형식으로 data 제공자와 구독자가 연결을 맺습니다.

Flux

Flux는 여러 요소(0 ~ N)를 생성할 수 있는 Publisher입니다.
publisher 는 구독자가 없으면 가만히 있기 때문에 메모리 부족을 야기하지 않고도 무한대의 리액티브 스트림을 만들 수 있습니다.

subscribe를 통해 구독자를 추가하면 데이터가 전달됩니다.

	@Test
    void fluxTest()
    {

        Flux<Integer> flux = Flux.range(0, 5).repeat();

        // 다만, 이것을 수집하려고 시도하면 OOM이 날 것이다.
        // List<Integer> list = flux.collectList().block();
    }

Mono

Mono는 최대 하나의 요소(0 ~ 1)를 생성할 수 있는 Publisher입니다.

    @Test
    void monoTest()
    {
        Mono<Integer> mono = Mono.just(1);
    }

Mono는 0 아니면 1개의 데이터의 흐름만 관리하기에 Mono의 연산자들은 버퍼 중복, 값비싼 동기화 작업 등이 생략되어 Flux보다 가볍습니다.

Subscriber

Subscriber가 구독을 신청하게 되면 Publisher로부터 이벤트를 수신받을 수 있습니다.

이 이벤트들은 Subscriber 인터페이스의 메서드를 통해 전송됩니다.

  • onSubscribe, 구독시 최초에 한번만 호출
  • onNext, 구독자가 요구하는 데이터의 수 만큼 호출 (최대 java.lang.Long.MAX_VALUE)
  • onError, 에러 발생 또는 더이상 처리할 수 없는 경우
  • onComplete, Publisher가 데이터 통지를 완료했음을 알릴 때
    @Test
    @DisplayName("Subscriber")
    void subscriber()
    {
        Flux<String> flux = Flux.just("A", "B");

        MyCustomSubscriber myCustomSubscriber = new MyCustomSubscriber(); // 내가 만든 subscriber 구현체
        flux.subscribe(myCustomSubscriber); // onSubscribe 출력 확인
    }

Subscriber가 수신할 첫 번째 이벤트는 onSubscribe 메서드의 호출을 통해 이루어집니다.
Publisher가 onSubscribe 메서드를 호출할 때 이 메서드의 인자로 Subscription 객체를 Subscriber에 전달합니다.

Subscriber 구현체(표준을 지키지 않음)

public class MyCustomSubscriber implements Subscriber<String>
{
    private volatile Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("onSubscribe, subscription " + subscription.hashCode());
        this.subscription = subscription;
        this.subscription.request(1); // 구독 후, 최초 요청
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext = " + s);
        this.subscription.request(1); // 데이터 수신 후, 추가 요청
    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

리액티브 스트림 표준 Subscriber 인터페이스를 직접 구현하는 것은 쉬운 일이 아닙니다.

제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야합니다.

위 코드는 동작은 하지만, 표준을 지키지 않은 잘못된 구현입니다.
왜냐하면 this.subscription.request(n) 에서 숫자가 0 이하인지, 구독(subscription) 상태는 정상인지 등을 확인하지 않고, 1개씩 계속 요청했기 때문입니다.

안전한 구현은 MyCustomSubscriberSafety 에서 확인해보겠습니다.

Subscriber 구현체(안전한 구현)

public class MyCustomSubscriberSafety extends BaseSubscriber<String>
{
    @Override
    public void hookOnSubscribe(Subscription subscription) {
        System.out.println("onSubscribe, subscription " + subscription.hashCode());
        request(1); // 구독 후, 최초 요청
    }

    @Override
    public void hookOnNext(String s) {
        System.out.println("onNext = {}" + s);
        request(1); // 데이터 수신 후, 추가 요청
    }

    @Override
    public void hookOnComplete() {
        System.out.println("onComplete");
    }
}

리액터 프레임워크에서 미리 만들어둔, BaseSubscriber 추상클래스를 상속해서 구현합니다.

이는 Subscriber의 onSubscribe, onNext, onError, onComplete 는 모두 직접 재정의할 수 없게 final 로 선언되어 있습니다.

대신 우리는 제공하는 hook 을 통해서 상황에 따라 얼마든지 조정이 가능합니다.

사용하기

일반적인 Reactive Streams 에서의 Publisher 는 subscribe 를 실행할 때 subscriber 를 등록해줍니다.

    @Test
    void subscriber()
    {
        Flux<String> flux = Flux.just("A", "B");

        flux.subscribe(new MyCustomSubscriber()); // 커스텀으로 구현한 subscriber 사용
    }

하지만 Flux 에서 제공하는 팩토리 메서드는 위와 같이 subscriber 를 등록해주는 메서드도 있는 반면에 subscriber 를 매개변수로 등록하지 않는 함수도 존재합니다.

subscriber 가 필요없어서가 아니라, 내부 로직에서 자동으로 subscriber 를 만들어 주기 때문입니다..(new LambdaSubscriber<>(...) 를 통해 Subscriber를 생성)

이때 기본적으로 s.request는 Long.MAX_VALUE를 할당 받습니다.

    @Test
    void nonSubscriber()
    {
        List<String> list = new ArrayList<>();

        Flux<String> flux = Flux.just("A", "B");
        flux.subscribe(list::add); // 내부적으로 subscriber를 구현해줌

        Assertions.assertEquals(2, list.size());
    }

Subscription

Subscriber는 Subscription 객체를 통해서 구독을 관리할 수 있습니다.
Subscription 인터페이스는 Subscriber가 구독한 데이터의 개수를 요청하거나 데이터 요청의 취소, 즉 구독을 해지하는 역할을 합니다.

Subscription 인터페이스는 request 메서드와 cancel 메서드를 가지고 있습니다.

Publisher에서 Subscriber의 onSubscribe 메서드를 호출하며 매개변수로 subscription을 전달합니다.

Subscriber는 Subscription의 request()를 호출하여 데이터를 요청하거나, cancel()을 호출하여 데이터를 더 이상 수신하지 않거나 구독을 취소할 수 있습니다.

request()를 호출할 때 Subscriber는 받고자 하는 데이터 항목 수를 나타내는 long 타입 값을 인자로 전달하는데 이것이 백프레셔이며, Subscriber가 처리할 수 있는 것보다 더 많은 데이터를 전송하는 것을 막아줍니다.

Subscriber의 데이터 요청이 완료되면 데이터가 스트림을 통해 전달되기 시작합니다. 이 때 onNext() 메서드가 호출되어 Publisher가 전송하는 데이터가 Subscriber에게 전달되며, 에러 발생시 onError()가 호출됩니다.

그리고 Publisher에서 전송할 데이터가 없고 더 이상 데이터를 생성하지 않는다면 Publisher가 onComplete()를 호출하여 작업이 끝났다고 Subscriber에게 알려줍니다.

전반적인 코드는 위의 MyCustomSubscriber 클래스를 확인합니다.

Processor

프로세서 인터페이스는 Subscriber, Publisher 인터페이스를 결합한 것이라고 이해하면 됩니다.

Cold Sequence / Hot Sequence

Cold와 Hot의 차이는 간단하게 Cold는 새로 시작한다, Hot은 새로 시작하지 않는다로 이해할 수 있습니다.

우리가 월간 잡지를 보내주는 서비스를 5월에 구독한다고 했을 때, 1월부터 5월까지 모든 잡지를 배달받고 이후로도 월마다 잡지를 배달받는 구조를 Cold, 5월 이후의 잡지만 배달받는 형태를 Hot이라고 보면 됩니다.

Cold Sequence란

Subscriber가 구독할 때마다(각 Subscriber는 구독시점이 달라도) Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름을 Cold Sequence라고 합니다.

이미지를 보면 1, 2번 구독자는 구독시점이 달라도 모두 4가지 색깔을 모두 받습니다.

Cold Sequnece 흐름으로 동작하는 PublisherCold Publisher라고 합니다.

Hot Sequence란

Cold Sequence는 구독이 발생할 때마다 Sequence의 타임라인이 처음부터 새로 시작했습니다. Hot Sequence는 구독이 아무리 많이 발생해도 Publisher가 데이터를 처음부터 emit하지 않습니다.

즉 Subsriber는 자신이 구독한 이후 emit된 데이터를 전달받을 수 있습니다.

이미지를 보면 1번 구독자는 4가지 색을 모두 받습니다. 앞선 색깔 2개를 emit한 후 구독한 2번째 구독자는 이후 emit된 2가지 색깔을 받습니다.

이처럼 Publisher가 데이터를 emit하는 과정이 한 번만 일어나도 Subscriber가 각각의 구독시점 이후에 emit된 데이터만 전달받는 데이터의 흐름을 Hot Sequence라고 합니다.
이러한 PublisherHot Publisher라고 합니다.

share()

share() 오퍼레이터는 원본 Flux를 멀티캐스트하는 새로운 Flux를 리턴하는 오퍼레이터입니다.

즉 여러 Subscriber가 하나의 Flux를 공유하도록 합니다. 하나의 원본 Flux를 공유하여 다 같이 사용하기 때문에 어떤 Subscriber가 이 원본 Flux를 먼저 구독해버리면 데이터 emit을 시작하게 되고, 이후에 다른 Subscriber가 구독하는 시점에는 이미 emit된 데이터는 전달받을 수 없게됩니다.

우리는 share() 오퍼레이터를 통해 Cold SequenceHot Sequence로 동작하게 할 수 있습니다.

    @Test
    @DisplayName("share() Operator를 이용해 Cold Sequence를 Hot Sequence로 동작하게 할 수 있다")
    void testHotSequence() throws InterruptedException {
        String[] singers = {"로이킴", "뉴진스", "트와이스", "테일러 스위프트"};

        // 4명의 가수는 2초 간격으로 무대에 나와 노래를 부를 예정입니다.
        Flux<String> concertFlux = Flux.fromArray(singers)
                .delayElements(Duration.ofSeconds(2))
                .share();

		// 첫 관객 관람(구독)을 하여 공연이 시작됐습니다.
        concertFlux.subscribe(singer -> System.out.printf("[%s] 관객 A가 %s의 무대를 보았습니다.%n", Thread.currentThread(), singer));

        Thread.sleep(2500); //공연 시작 2.5초 뒤.. 관객 B가 관람(구독) 시작
        concertFlux.subscribe(singer -> System.out.printf("[%s] 관객 B가 %s의 무대를 보았습니다.%n", Thread.currentThread(), singer));

        Thread.sleep(6000);
    }
[Thread[parallel-1,5,main]] 관객 A가 로이킴의 무대를 보았습니다.
[Thread[parallel-2,5,main]] 관객 A가 뉴진스의 무대를 보았습니다.
[Thread[parallel-2,5,main]] 관객 B가 뉴진스의 무대를 보았습니다.
[Thread[parallel-3,5,main]] 관객 A가 트와이스의 무대를 보았습니다.
[Thread[parallel-3,5,main]] 관객 B가 트와이스의 무대를 보았습니다.
[Thread[parallel-4,5,main]] 관객 A가 테일러 스위프트의 무대를 보았습니다.
[Thread[parallel-4,5,main]] 관객 B가 테일러 스위프트의 무대를 보았습니다.

다음 테스트코드를 통해 관객 B는 뉴진스의 공연부터 보는걸 볼 수 있습니다.

cache()

cache() 오퍼레이터는 Cold Sequence로 동작하는 Mono를 Hot Sequence로 변경해주고, emit된 데이터를 캐시한 뒤 구독이 발생할 때마다 캐시된 데이터를 전달해줍니다.

@Test
    void testCacheHotSequence() throws InterruptedException {
        var mono = Mono.fromCallable(() -> {
                    System.out.println("Go!");
                    return 5;
                })
                .map(i -> {
                    System.out.println("Double!");
                    return i * 2;
                });

        var cached = mono.cache();

        System.out.println("Using cached"); // Hot Sequence로 동작한다.
        System.out.println("1. " + cached.block()); // 최초 한 번 연산
        System.out.println("2. " + cached.block()); // 캐싱 반환
        System.out.println("3. " + cached.block()); // 캐싱 반환

        System.out.println("Using NOT cached"); // Cold Sequence로 동작한다.
        System.out.println("1. " + mono.block()); // 처음부터 다시 동작
        System.out.println("2. " + mono.block()); // 처음부터 다시 동작
        System.out.println("3. " + mono.block()); // 처음부터 다시 동작
    }
Using cached
Go!
Double!
1. 10
2. 10
3. 10
Using NOT cached
Go!
Double!
1. 10
Go!
Double!
2. 10
Go!
Double!
3. 10

테스트 결과를 보면 cache()사용 시 Hot Sequence로 동작하여 호출마다 처음부터 돌지 않고, 2번째 호출부터는 캐시된 값을 반환하는 것을 볼 수 있습니다.

Reactor에서 백프레셔를 사용하는 방법

request()로 요청 데이터 개수 제어

첫 번째 방법은 request()로 요청 데이터 개수 제어하는 것입니다.
Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청할 수 있습니다.

다음 링크로 예제코드를 확인합니다. https://github.com/suhongkim98/java-reactor-example/blob/main/src/main/java/org/example/subscriber/MyCustomSubscriberSafety.java

Backpressure 전략 사용

두 번째 방법은 Reactor에서 제공하는 Backpressure 전략을 사용하는 것입니다. Reactor에서는 다양한 전략을 제공합니다.

IGNORE 전략

IGNORE 전략은 말 그대로 백프레셔를 사용하지 않는 전략입니다.

ERROR 전략

 AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .onBackpressureError()
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();

에러 전략은 Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateExcetpion을 발생시킵니다. 이 경우 Publisher는 에러 시그널을 Subscriber에게 전송하고 삭제한 데이터는 폐기합니다.

DROP 전략

 AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .onBackpressureDrop(dropped -> log.info("dropped {}", dropped))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();

DROP전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득찬 경우, 버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit됐던 데이터부터 Drop시키는 전략입니다. Drop된 데이터는 폐기됩니다.

LATEST 전략


Pulisher가 Downstream으로 전달할 데이터가 버퍼에 가득찰 경우, 버퍼 밖에서 대기 중인 데이터는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터는 폐기하고, 버퍼가 빌 때 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략입니다.

DROP과 차이점이 있다면 버퍼가 가득 찰 경우 DROP은 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 폐기한다면, LATEST는 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨두고 나머지 데이터를 폐기합니다.

BUFFER 전략

버퍼가 가득차면 버퍼 내의 데이터를 폐기하거나 버퍼가 가득차면 에러를 발생시키는 전략도 지원합니다.

DROP과 LATEST가 버퍼 밖의 데이터를 폐기한다면 BUFFER전략에서의 데이터 폐기는 버퍼가 가득찼을 때 버퍼 안의 데이터를 폐기하는 것을 의미합니다. DROP_LATEST와 DROP_OLDEST 두가지가 있습니다.

DROP_LATEST 전략

Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우 이후에 emit된 데이터는 Drop하는 전략입니다.

  • buffer-size=5라고 하고 버퍼에 다음과 같이 쌓여있다 하겠습니다. [5,4,3,2,1]
  • 6을 emit합니다. [6,5,4,3,2,1]
  • 이는 버퍼 사이즈를 초과합니다. 6을 드랍합니다. [5,4,3,2,1]
  • 7을 emit합니다. [7,5,4,3,2,1]
  • 이는 버퍼 사이즈를 초과합니다. 7을 드랍합니다. [5,4,3,2,1]

DROP_OLDEST 전략

DROP_LATEST와 반대로 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찬 상태에서 데이터가 emit될 경우, 버퍼 안에 가장 오래된 데이터부터 Drop하는 전략입니다.

  • buffer-size=5라고 하고 버퍼에 다음과 같이 쌓여있다 하겠습니다. [5,4,3,2,1]
  • 6을 emit합니다. [6,5,4,3,2,1]
  • 이는 버퍼 사이즈를 초과합니다. 1을 드랍합니다. [6,5,4,3,2]
  • 7을 emit합니다. [7,6,5,4,3,2]
  • 이는 버퍼 사이즈를 초과합니다. 2을 드랍합니다. [7,6,5,4,3]

Sinks

Processor의 기능을 개선한 Sinks가 Reactor 3.4.0 버전부터 지원되기 시작했습니다. Processor와 관련된 API는 Reactor 3.5.0부터 완전히 제거될 예정입니다.

Sinks 등장 이전에 Reactor에서 프로그래밍 방식으로 signal을 전송하는 가장 일반적인 방법은 generate() Operator나 create() Operator 등을 사용하는 것인데, 이는 싱글스레드 기반에서 signal을 전송합니다. 반면, Sinks는 멀티스레드 방식으로 signal을 전송해도 스레드 안전성을 보장하기 때문에 예기치 않은 동작으로 이어지는 것을 방지해 줍니다.

Sinks는 Sinks.Many 또는 Sinks.One interface를 사용해서 Thread-Safe하게 signal을 발생시킵니다.

Sinks.One

Sinks.One은 한 건의 데이터를 프로그래밍 방식으로 emit합니다.

emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달합니다. 나머지 데이터는 Drop 됩니다.

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();

        sinkOne.emitValue("Hello Reactor", FAIL_FAST);
//        sinkOne.emitValue("Hi Reactor", FAIL_FAST);
//        sinkOne.emitValue(null, FAIL_FAST);

        mono.subscribe(data -> log.info("# Subscriber1 {}", data));
        mono.subscribe(data -> log.info("# Subscriber2 {}", data));
    }
}

emitValue(): 메서드의 두 번째 파라미터는 emit 도중 에러가 발생할 경우 어떻게 처리할 것인지에 대한 핸들러를 나타냅니다.

FAIL_FAST: 빠르게 실패 처리한다는 의미는 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 한다는 의미입니다. 주석 처리된 코드를 주석 해제하고 다시 실행하면 출력 결과는 동일하나 Drop 되었다는 디버그 로그를 확인할 수 있습니다. 즉, Sinks.One으로 아무리 많은 수의 데이터를 emit한다 하더라도 처음 emit한 데이터는 정상적으로 emit되지만 나머지 데이터들은 Drop된다는 것을 알 수 있습니다.

asMono(): emit한 데이터를 구독하여 전달받기 위해 Mono 객체로 변환합니다.

Sinks.Many

Sinks.Many는 여러 건의 데이터를 프로그래밍 방식으로 emit합니다.

Sinks.One은 한 건의 데이터를 emit하는 한 가지 기능만 가지기 때문에 Default Spec(SinksSpecs.DEFAULT_ROOT_SPEC)을 사용합니다. 반면, Sinks.Many의 경우 데이터 emit을 위한 여러 가지 기능이 정의된 ManySpec을 리턴합니다.

public final class Sinks {
  ...
  ...
    public interface ManySpec {
      UnicastSpec unicast();
      MulticastSpec multicast();
      MulticastReplaySpec replay();
    }
}

UnicastSpec

UnicastSpec은 내부적으로 UnicastProcessor을 사용하며 단 하나의 Subscriber에게만 데이터를 emit을 허용합니다. Subscriber가 여러 개일 경우 예외를 발생시킵니다.

MulticastSpec

MulticastSpec의 기능은 하나 이상의 Subscriber에게 데이터를 emit 합니다.

Sinks가 Publisher의 역할을 할 경우 기본적으로 Hot Publisher로 동작합니다.

MulticastReplaySpec

MulticastReplaySpec에는 emit된 데이터를 다시 Replay해서 구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 하는 다양한 메서드들이 정의되어 있습니다.

all(): 구독 전에 이미 emit된 데이터가 있더라도 처음 emit된 데이터부터 모든 데이터들이 Subscriber에게 전달됩니다.

limit(): emit된 데이터 중 파라미터로 입력한 개수만큼 뒤에서부터 되돌려 Subscriber에게 전달됩니다.

@Slf4j
public class Main {
    public static void main(String[] args) {
        Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
        Flux<Integer> fluxView = replaySink.asFlux();

        replaySink.emitNext(1, FAIL_FAST);
        replaySink.emitNext(2, FAIL_FAST);
        replaySink.emitNext(3, FAIL_FAST);

        fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); // 2, 3, 4

        replaySink.emitNext(4, FAIL_FAST);

        fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); // 3, 4
    }
}

Subs1은 1,2,3 중 뒤에서 2칸 되돌려 2부터 재생해 2, 3, 4를 받습니다.
Subs2는 1,2,3,4 중에서 뒤에서 2칸 되돌려 3부터 재생해 3, 4를 받습니다.

Scheduler

Reactor의 스케쥴러는 OS 스케쥴러와 의미가 비슷합니다.
리액터에서 스케쥴러는 비동기 프로그래밍을 위해 사용되는 스레드를 관리해주는 역할을 합니다.
우리는 스케쥴러를 이용하여 어떤 스레드에서 무엇을 처리할지 제어합니다. 멀티스레드 환경에서 멀티스레드를 완벽하게 제어하는 일은 어렵지만 리액터 스케쥴러는 스레드 제어를 대신 해주기에 개발자가 직접 스레드를 제어해야하는 부담에서 벗어날 수 있습니다.

    @Test
    void singleThreadTest()
    {
        Flux.range(1, 3)
            .map(i -> {
                System.out.printf("%s, map %d to %d\n", Thread.currentThread(), i, i + 2);
                return i + 2;
            })
            .flatMap(i -> {
                System.out.printf("%s, flatMap %d to Flux.range(%d, %d)", Thread.currentThread(), i, 1, i);
                return Flux.range(1, i);
            })
            .subscribe(i -> System.out.println(Thread.currentThread() + " next " + i));
    }

다음 테스트코드를 실행시켜보면 모두 main 스레드에서 실행되는 것을 확인하여 리액터는 비동기 실행을 강제하지 않는다는 것을 알 수 있습니다.

실행 결과를 보면 map(), flatMap(), subscribe()에 전달한 코드가 모두 main 쓰레드에서 실행된 것을 알 수 있습니다.

즉 map 연산, flatMap 연산뿐만 아니라 subscribe를 이용한 구독까지 모두 main 쓰레드가 실행합니다.

이때 스케줄링을 위한 전용 Operator를 사용하면 구독이나 신호 처리를 별도 쓰레드로 실행할 수 있습니다. 대표적으로 publishOn(), SubscribeOn(), parallel()이 있으며 Operator의 파라미터로 적절한 스케쥴러를 전달하면 해당 스케쥴러의 특성에 맞는 스레드가 리액터 시퀀스에 할당됩니다.

스케쥴러 종류는 다음 공식 문서를 참고합니다.
https://projectreactor.io/docs/core/release/api/

publishOn Operator 이용한 신호 처리 쓰레드 스케줄링

publishOn는 다운스트림으로 시그널을 전송할 때 실행되는 스레드를 제어하는 역할을 하는 오퍼레이터입니다.

    @Test
    void publishOnTest() throws InterruptedException
    {
        CountDownLatch latch = new CountDownLatch(1);

        Flux.range(1, 6)
            .map(i -> {
                System.out.printf("%s, map 1: %d to %d\n", Thread.currentThread(), i, i + 10);
                return i + 10;
            })
            .publishOn(Schedulers.boundedElastic(), 2) // 두 번째 인자인 2는 스케줄러가 신호를 처리하기 전에 미리 가져올 (prefetch) 데이터 개수이다.
            .map(i -> {     // publishOn에서 지정한 PUB 스케줄러가 실행
                System.out.printf("%s, map 2: %d to %d\n", Thread.currentThread(), i, i + 10);
                return i + 10;
            })
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println(Thread.currentThread() + " hookOnSubscribe");
                    requestUnbounded();
                }

                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println(Thread.currentThread() + " hookOnNext: " + value);
                }

                @Override
                protected void hookOnComplete() {
                    System.out.println(Thread.currentThread() + " hookOnComplete");
                    latch.countDown();
                }
            });

        latch.await();
    }

publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있습니다.
map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리합니다.

subscribeOn Operator 이용한 구독 처리 쓰레드 스케줄링

subscribeOn은 구독이 발생한 직후 실행될 스레드를 지정하는 오퍼레이터입니다.

subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request신호를 별도 스케줄러로 처리합니다.(시퀀스를 실행할 스케줄러를 지정합니다)

subscribeOn()으로 지정한 스케줄러는 시퀀스의 request 요청 처리뿐만 아니라 첫 번째 publishOn() 지정 이전까지의 신호 처리를 실행합니다.

    @Test
    void subscribeOnTest() throws InterruptedException
    {
        CountDownLatch latch = new CountDownLatch(1);

        Flux.range(1, 6)
            .log() // 보다 상세한 로그 출력 위함
            .subscribeOn(Schedulers.boundedElastic())
            .map(i -> {
                System.out.printf("%s, map 1: %d to %d\n", Thread.currentThread(), i, i + 10);
                return i + 10;
            })
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println(Thread.currentThread() + " hookOnSubscribe"); // main thread
                    request(1);
                }

                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println(Thread.currentThread() + " hookOnNext: " + value); // SUB 쓰레드
                    request(1);
                }

                @Override
                protected void hookOnComplete() {
                    System.out.println(Thread.currentThread() + " hookOnComplete"); // SUB 쓰레드
                    latch.countDown();
                }
            });

        latch.await();
    }

따라서 위 코드를 실행하면 Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행합니다.

참고로 시퀀스의 request 요청과 관련된 로그를 보기 위해 log() 메서드를 사용했습니다.

subscribeOn과 publishOn을 적절히 혼합하면 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있습니다.

parallel()

subscribeOn과 publishOn은 동시성을 가지는 논리적 스레드에 해당하지만 parallel()은 병렬성을 가지는 물리적 스레드에 해당합니다.

parallel()은 라운드로빈 방식으로 CPU 논리적 코어(물리적 스레드) 개수 만큼 스레드를 병렬로 실행합니다.

예를 들어 4코어 8스레드 CPU라면 8스레드를 병렬로 실행합니다.

Context

Context란 어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보를 의미합니다.

Reactor에서 Context는 Operator같은 Reactor 구성요소 간에 전파되는 키/밸류 형태의 저장소라고 정의합니다.
여기서 전파는 다운스트림에서 업스트림으로 컨텍스트가 전파되어 Operator 체인상의 각 Operator가 해당 Context의 정보를 동일하게 이용할 수 있음을 의미합니다.

Reactor의 Context는 Subscriber와 매핑됩니다. 즉 구독이 발생할 때마다 해당 구독과 연결된 하나의 컨텍스트가 생긴다라고 볼 수 있습니다.

Context는 Operator 체인의 아래에서부터 위로 전파됩니다. 따라서 Operator 체인 상에서 Context read 메서드가 Context write 메서드 밑에 있을 경우에는 write된 값을 read할 수 없습니다.

그럼 언제 활용하면 좋을까요?
Context는 API 인증정보 같은 직교성(독립성)을 가지는 정보를 전송하는데 적합합니다.

Context에 쓰기

ContextWrite() Operator를 통해 컨텍스트에 데이터를 쓸 수 있습니다.
실제로 데이터를 쓰는 동작은 Context API 중 하나인 put()을 통해 쓸 수 있습니다.

Context에서 읽기

deferContextual() 오퍼레이터를 통해 원본 데이터 소스 레벨에서 읽을 수 있습니다.
deferContextual()은 defer() Operator와 같은 원리로 동작하는데 Context에 저장된 데이터와 원본 데이터 소스의 처리를 지연시키는 역할을 합니다.

Operator 체인의 중간에서 Context의 데이터를 읽기 위해서는 transformDeferredContextual() 오퍼레이터를 사용합니다.

Reactor에서는 Operator 체인 상의 서로 다른 스레드들이 Context의 저장된 데이터에 손쉽게 접근할 수 있습니다.
그래서 Context.put()을 통해 컨텍스트에 데이터를 쓴 후에 매번 불변 객체를 다음 contextWrite() Operator로 전달함으로써 스레드 안전성을 보장합니다.

자주 사용되는 컨텍스트 API

put(k,v)

key/value 형태로 컨텍스트에 쓴다.

of(k1,v1,k2,v2..)

여러 개의 값을 컨텍스트에 쓴다.

putAll(contextView)

현재 컨텍스트와 파라미터로 입력된 컨텍스트뷰를 머지한다.

delete(k)

키에 해당하는 밸류를 제거한다.

리액티브 스트림즈 오퍼레이션

Reactor는 스트림 생성, 조합, 로직(boolean체크), 변환-전환 등을 수행할 수 있는 다양한 오퍼레이션을 제공합니다.

다음 깃허브 링크에서 테스트코드를 통해 다양한 예시를 볼 수 있습니다.
https://github.com/suhongkim98/java-reactor-example/tree/main

참고자료

스프링 인 액션
스프링으로 시작하는 리액티브 프로그래밍

profile
鈍筆勝聰✍️

1개의 댓글

comment-user-thumbnail
2023년 8월 12일

잘 봤습니다. 좋은 글 감사합니다.

답글 달기