리액티브 시스템(Reactive System)은 클라이언트의 요청에 빠르게 반응하는 시스템입니다.
빠른 반응을 하려면 요청 스레드가 차단되지 않게 함으로써(Non-Blocking) 클라이언트에게 즉각적으로 반응하도록 구성해야 합니다.
리액티브 시스템에서는 시스템 간 통신을 비동기적으로 처리합니다. 비동기적이란 작업이 순차적으로 진행되지 않고 여러 작업이 동시에 실행되는 방식을 의미합니다.
이를 통해 시스템은 실시간으로 변화에 대응하고 빠른 응답을 제공할 수 있습니다.
리액티브 시스템은 독립적으로 동작하는 구성 요소들이 상호 작용할 수 있도록 설계됩니다. 각 시스템은 사전에 정의된 메시지 형식과 의미를 통해 통신합니다. 이를 통해 시스템은 서로간의 사전 지식 없이 통신하며 변경에 유연하게 대응할 수 있는 느슨한 결합을 유지합니다.
비동기 메시지 패싱을 통해 작업을 분산하고 병렬화함으로써 시스템은 메시지를 보내고 응답을 기다리는 동안 다른 작업을 수행할 수 있습니다. 이를 통해 시스템은 효율적으로 작업을 처리하며 응답이 도착하면 적절한 처리를 수행합니다.
리액티브 프로그래밍(Reactive Programming)이란 리액티브 시스템에서 사용되는 Non-Blocking 통신을 위한 프로그래밍 모델입니다.
리액티브 프로그래밍은 반응성, 탄력성, 유연성과 같은 리액티브 시스템의 핵심 가치를 구현하는데 활용되며, 비동기적인 처리와 데이터 흐름의 관리를 통해 더욱 반응적이고 효율적인 애플리케이션을 개발할 수 있도록 지원합니다.
리액티브 프로그래밍에서는 데이터, 시간, 사용자 동작, 네트워크 요청들을 전부 이벤트나 시간의 흐름에 따라 발생하는 연속적인 데이터 흐름으로 간주하여 흘려보내고(emit), 스트림을 선언적으로 조작하고 변형할 수 있는 다양한 연산자(operators)들을 제공하여 가공합니다. observer(구독자)는 스트림에서 값이 나오기를 기다렸다가 나오는대로 최종작업을 하여 발행물에 반응합니다.
개발자는 요청을 스트림으로 해석해서 옵저버블로 구현하고, 연산자로 데이터를 정제하여 최종값에 어떻게 반응할지를 코딩하면 됩니다.
스프링은 리액티브 프로그래밍을 지원하기 위해 Spring WebFlux 모듈을 제공하고 있습니다.
리액티브 확장(Reactive Extension) 라이브러리는 대표적으로 RxJava가 있으며, 이외에도 RxJS, RxAndroid, RxKotlin, RxPython, RxScala 등이 있습니다.
스프링 웹플럭스는 멀티 쓰레드를 사용하고, 비동기적으로 동작하는 특징을 가지고 있습니다.
명령형 프로그래밍에서는 변수를 사용하여 코드를 작성하므로 여러 쓰레드에서 변수에 접근할 경우 찾기힘든 오류가 발생할 가능성이 있지만 리액티브는 선언형인 람다 표현식으로 작성되므로 내부에서는 변수를 사용하겠지만 코드로 드러나지 않아 개발자의 실수로 인한 오류가능성도 줄어들고, 동작 수행은 Operation(연산) 메서드 체인에 위임하고 있으므로 다중쓰레드를 활용할 수 있습니다.
// 예제코드
import reactor.core.publisher.Flux;
import java.util.List;
public class ReactiveGlossaryExample {
public static void main(String[] args) {
Flux //퍼블리셔
.fromIterable(List.of(1, 3, 6, 7, 8, 11)) //연산자
.filter(number -> number > 4 && (number % 2 == 0))
.reduce((n1, n2) -> n1 + n2)
.subscribe(System.out::println); //구독자
}
}
Reactor는 자바 기반의 리액티브 프로그래밍을 위한 라이브러리로, Spring 프레임워크와 함께 사용되어 비동기 및 이벤트 기반 애플리케이션을 개발할 수 있도록 지원합니다. Reactor는 리액티브 스트림(Reactive Streams) 스펙을 구현하고 있으며, 리액티브 프로그래밍의 핵심 개념과 기능을 제공합니다.
리액티브 스트림즈 표준 사양을 구현한 구현체이기 때문에 리액티브 프로그래밍의 특징인 Non-Blocking 통신을 지원하고, 요청쓰레드가 차단되지 않습니다.
Reactor는 Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공합니다.
- Mono[0|1] : 0건 또는 1건의 데이터를 emit 할 수 있음
- Flux[N] : 여러 건의 데이터를 emit할 수 있음
Scheduler는 여러 스레드의 작업을 스케줄링하고 실행할 스레드를 관리하는 관리자의 역할을 담당합니다.
작업의 우선순위, 동시성 제어, 작업 종속성 등을 고려하여 효율적으로 작업을 실행할 수 있도록 설계하고 작업의 실행 순서, 상호 배타적인 접근, 데드락 등을 방지하고 조율합니다.
Dead Lock 등을 방지합니다.
Backpressure
Publisher에서 끊임없이 들어오는 데이터를 emit하는 것과 달리 Subscriber의 처리 속도가 느리면 처리되지 않고 대기하는 데이터가 지속적으로 쌓이다가 오버플로우가 발생하게 되고 급기야는 시스템이 다운될 수 있습니다.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class HelloReactorExample {
public static void main(String[] args) throws InterruptedException {
Flux // 여러건의 데이터 emit 가능
.just("Hello", "Reactor") // Publisher
.map(message -> message.toUpperCase()) // Operator
.publishOn(Schedulers.parallel()) // Scheduler를 지정하는 Operator
.subscribe(System.out::println, // Publisher가 emit한 데이터를 처리
error -> System.out.println(error.getMessage()), //에러발생시 처리
() -> System.out.println("# onComplete")); //종료 후 처리
Thread.sleep(100L);
}
}
리액티브 스트림즈의 구현체인 Reactor 역시 다양한 종류의 Operator를 지원합니다.
하지만 Operator의 종류가 많아서 모두 기억하는 것은 어렵습니다.
따라서 자주 사용되는 Operator를 유형별로 묶어서 정리하면 도움이 됩니다.
- 새로운 Sequence를 생성(Creating)하고자 할 경우
just()
⭐ fromStream()
⭐ fromIterable()
fromArray()
range()
interval()
empty()
never()
defer()
using()
generate()
⭐ create()
- 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우
⭐ map()
⭐ flatMap()
⭐ concat()
collectList()
collectMap()
merge()
⭐ zip()
then()
switchIfEmpty()
and()
when()
- Sequence 내부의 동작을 확인(Peeking)하고자 할 경우
doOnSubscribe
⭐doOnNext()
doOnError()
doOnCancel()
doFirst()
doOnRequest()
doOnTerminate()
doAfterTerminate()
doOnEach()
doFinally()
⭐log()
- Sequence에서 데이터 필터링(Filtering)이 필요한 경우
⭐filter()
ignoreElements()
distinct()
⭐take()
next()
skip()
sample()
single()
- 에러를 처리(Handling errors)하고자 할 경우
⭐error()
⭐timeout()
onErrorReturn()
onErrorResume()
onErrorMap()
doFinally()
⭐retry()