Java_Reactor

Minki CHO·2023년 1월 29일
0

CodeStates

목록 보기
32/43

Project Reactor(줄여서 Reactor)

Reactor

? Reactor란?
:리액티브 스트림즈 표준 사양을 구현한 구현체 중 하나

Reactor 특징

(1) reactive library, based on the Reactive Streams
:리액티브 스트림즈 기반의 리액티브 라이브러리
:리액티브 스트림즈를 구현한 리액티브 라이브러리

(2) building non-blocking applications
:non-blocking :리액티브 프로그래밍의 핵심
:요청 쓰레드가 차단이 되지 않는다!

(3) TYPED [0|1|N] SEQUENCES
:Publisher 타입으로 Mono[0|1]과 Flux[N]이라는 두가지 타입을 제공
:Mono[0|1] :0건 또는 1건의 데이터를 emit 할 수 있음
:Flux[N] :여러 건의 데이터를 emit 할 수 있음

(4) Well-suited for microservices
:서비스들 간의 통신이 잦은 MSA(Microservice Architecture) 기반 애플리케이션들은 요청 쓰레드가 차단되는 Blocking 통신을 사용하기에는 무리가 있음
:따라서 기본적으로 Non-Blocking 통신을 완벽하게 지원하는 Reactor는 MSA 구조에 적합한 라이브러리라고 볼 수 있음

(5) backpressure-ready network
:

:위 그림은 리액티브 프로그래밍의 핵심 컴포넌트인 Publisher와 Subscriber의 인터랙션을 단순하게 표현함
:리액티브 프로그래밍에서는 끊임없이 들어오는 데이터를 적절하게 처리할 수 있어야 함
? 그림과 같이 Publisher에서 끊임없이 들어오는 데이터를 emit하는 것과 달리 Subscriber의 처리 속도가 느리다면?
:아직 처리되지 않고 대기하는 데이터가 지속적으로 쌓이는 것을 방치하게 되면 오버플로우 발생 및 시스템 다운 발생
:Backpressure :그림과 같이 Subscriber의 처리 속도가 Publisher의 emit 속도를 따라가지 못할 때 적절하게 제어하는 전략
:리액티브 프로그래밍에서 데이터를 적절하게 제어하기 위해 반드시 필요한 전략

Reactor 구성 요소

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    // 1)
            .just("Hello", "Reactor")               // 2)
            .map(message -> message.toUpperCase())  // 3)
            .publishOn(Schedulers.parallel())       // 4)
            .subscribe(System.out::println,         // 5)                    error -> System.out.println(error.getMessage()),  // 6)
                    () -> System.out.println("# onComplete"));   // 7)

        Thread.sleep(100L);
    }
}

1)Flux
:Reactor Sequence의 시작점
:ReactorSequence가 여러 건의 데이터를 처리함을 의미

2).just("Hello","Reactor")
:just() Operator
:원본 데이터 소스(Origianl Data Source)로부터 데이터를 emit하는 Publisher 역할을 함

3).map(message -> message.toUpperCase())
:map() Operator
:Publisher로부터 전달받은 데이터를 가공하는 Operator
:just() Operator에서 emit된 영문 문자열을 대문자로 변환

(Reactor에서는 map() Operator처럼 데이터를 가공 처리할 수 있는 수많은 Operator 지원)

4).publishOn(Schedulers.paraller())
:publishOn() Operator
:Reactor Sequence에서 쓰레드 관리자 역할을 하는 Scheduler를 지정하는 Operator
:해당 Operator에 Scheduler 지정 시, publishOn() Operator 기준으로 Downstream의 쓰레드가 Scheduler에서 지정한 유형의 쓰레드로 변경됨
:즉, 위 코드에서는 Reactor Sequence 상에서 두 개의 쓰레드가 실행됨

5).subscribe(System.out::println,
6) error -> System.out.println(error.getMessage()),
7) () -> System.out.println("# onComplete"));
:subscribe()는 파라미터로 총 세개의 람다 표현식을 가짐
:첫번째 파라미터인 5)System.out::println :Publisher가 emit한 데이터를 전달받아서 처리하는 역할을 함
:두번째 파라미터인 6)error -> System.out.println(error.getMessage()) :Reactor Sequence 상에서 에러 발생 시, 해당 에러를 전달받아 처리하는 역할을 함
:세번째 파라미터인 7)() -> System.out.println("# onComplete")) :Reactor Sequence 종료 후 어떤 후처리를 하는 역할을 함

결과

HELLO
REACTOR
# onComplete

:두 문자열이 map() 거쳐 대문자 변환 -> subscribe() 첫번째 파라미터/람다표현식으로 전달되어 출력
:# onComplete 출력 :Reactor Sequence 정상적으로 종료됨을 의미
:즉 subscribe()의 세번째 파라미터인 람다표현식은 Reactor Sequence 정상 종료되면 동작을 수행함

? 마지막 라인에 있는 Thread.sleep(100L)이란?
:Reactor Sequence에 Scheduler 지정시 main 쓰레드 외 별도 쓰레드 생성됨
:Reactor에서 Scheduler로 지정한 쓰레드는 모두 데몬 쓰레드이므로 주 쓰레드인 main 쓰레드 종료시
동시에 종료됨
:따라서 main 쓰레드를 Thread.sleep(100L) 적용하면, 0.1초 정도 동작 지연되어
0.1초 사이에 데몬 쓰레드를 통해 Reactor Sequence 정상 동작함

profile
Developer

0개의 댓글