Reactor는 비동기 및 반응형 프로그래밍을 위한 Java 기반 라이브러리로, 효율적인 데이터 처리와 이벤트 처리를 지원하는 프로젝트이다. Reactor는 리액티브 스트림스(Reactive Streams) 인터페이스를 구현하며, 비동기 데이터 흐름을 쉽게 관리할 수 있도록 도와준다. 이를 통해 대규모 트래픽을 효율적으로 처리할 수 있으며 리소스 사용을 최적화한다.
Flux는 Reactor의 두 가지 주요 타입 중 하나로, 0개 이상의 데이터를 비동기 스트림으로 방출하는 구현체(Publisher)이다. 여러 개의 이벤트를 처리할 때 사용되며 데이터를 계속해서 내보내는 스트림의 형태이다. 예를 들면, 실시간 데이터 피드나 연속적인 이벤트 스트림에 적합하다.
public Flux<Integer> startFlux(){
return Flux.range(1,10).log();
}
public Flux<String> startFlux2(){
return Flux.fromIterable(List.of("a","b","c","d")).log();
}
Mono는 Reactor의 또 다른 주요 타입으로 0또는 1개의 데이터를 비동기적으로 방출하는 구현체(Publisher)이다. 단일 값이나 빈 값을 다루는 데 적합하며 비동기 작업의 결과가 하나의 데이터만 필요한 경우 주로 사용된다. 예를 들어 데이터베이스 조회나 단일 HTTP 요청의 응답을 처리할 때 유용하다.
public Mono<Integer>startMono(){
return Mono.just(1).log();
}
public Mono<?>startMono2(){
return Mono.empty().log();
}
public Mono<?>startMono3(){
return Mono.error(new Exception("hello reactor"));
}
실습 Gradle 추가
plugins {
id 'java'
id "io.spring.dependency-management" version "1.0.7.RELEASE"
}
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:2023.0.11"
}
}
dependencies {
implementation 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-test'
}
map 연산자는 스트림의 각 인스턴스에 주어진 함수를 적용하여 값을 변환할때 사용된다.(ex.객체 -> 다른 객체 주입,필드값 변경)
//1부터 5개의 요소를 *=2 한 값을 출력한다.
public Flux<Integer> fluxMap() {
return Flux.range(1, 5)
.map(i -> i * 2)
.log();
}
주어진 조건(predicate)에 따라 스트림의 요소를 걸러낸다. 조건을 만족하는 요소만 스트림에 남게 된다.
//5보다 큰 수만 걸러낸다.
public Flux<Integer> fluxFilter() {
return Flux.range(1, 10)
.filter(i -> i > 5)
.log();
}
스트림의 처음 n개의 요소만 방출하고 이후에는 스트림을 종료한다.
//5보다 큰 수중 3개의 요소만 방출 == 6 7 8
public Flux<Integer> fluxFilterTake() {
return Flux.range(1, 10)
.filter(i -> i > 5)
.take(3)
.log();
}
각 요소에 대해 비동기 작업을 수행하거나 Publisher를 반환하고, 그 결과를 단일 스트림으로 평탄화(flatten)하여 합친다.
//1 부터 10까지 각각의 i에서 10을 곱해 10개의 요소 안에서 0.2초 딜레이 간격으로
//비동기적 i 출력(ex.10,20,30 ~ 11,21,31)
public Flux<Integer> fluxFlatMap() {
return Flux.range(1, 10)
.flatMap(i -> Flux.range(i * 10, 10)
.delayElements(Duration.ofMillis(200)))
.log();
}
입력 요소에 대해 지정된 함수로 변환된 Publisher를 순서대로 병합하여 실행한다. 각 Publisher는 이전 Publisher가 완료된 후에 실행된다.
// 1부터 10까지 i에서 10을 곱한 수부터 10개를 각 i마다 순차적으로 실행
public Flux<Integer> fluxConcatMap() {
return Flux.range(1, 10)
.concatMap(i -> Flux.range(i * 10, 10)
.delayElements(Duration.ofMillis(100),single()))
.log();
}
delayElements는 기본적으로 parallel 스케줄러를 사용하기 때문에 single로 지정하면 순차적 처리를 확인할수 있다.
Mono로부터 여러 요소를 방출하는 Flux로 변환할 때 사용된다. Mono의 결과를 Flux로 변환하여 여러 요소를 방출할 수 있게 한다.
// 10을 출력하는 Mono에서 1개 부터 10까지의 Flux로 변환
public Flux<Integer> monoFlatMapMany() {
return Mono.just(10)
.flatMapMany(i -> Flux.range(1, i))
.log();
}
원본 Flux나 Mono가 빈 경우 다른 Publisher로 대체한다. 대체할 데이터 흐름이 필요할 때 사용된다.
//100보다 큰값이 없기 때문에 기본값인 30 출력
public Mono<Integer> defaultIfEmpty1() {
return Mono.just(100)
.filter(i -> i > 100)
.defaultIfEmpty(30);
}
// 100보다 큰 값이 없기 때문에 30의 값을 가지는
// Publisher Mono로 대체하여 60출력
public Mono<Integer> switchIfEmpty1() {
return Mono.just(100)
.filter(i -> i > 100)
.switchIfEmpty(Mono.just(30).map(i -> i * 2));
}
여러 Publisher의 데이터를 병합하여 동시에 방출할 수 있다. 각 Publisher에서 데이터가 준비되는 대로 방출되므로 비동기적인 병합이 가능하다.
Merge
//1,2,3 리스트를 가지는 Publisher와 4를 가지는 Publisher Merge
public Flux<String> fluxMerge() {
return Flux.merge(Flux.fromIterable(List.of("1","2","3")), Flux.just("4"))
.log();
}
// just 값을 가지는 Publisher들의 mergeWith를 사용한 Merge
public Flux<String> monoMerge() {
return Mono.just("1").mergeWith(Mono.just("2")).mergeWith(Mono.just("3"));
}
Zip
//각 Publusher들의 인덱스에 해당하는 요소를 getT1,getT2로 추출하여 결합
public Flux<String> fluxZip() {
return Flux.zip(Flux.just("a", "b", "c"), Flux.just("d", "e", "f"))
.map(i -> i.getT1() + i.getT2())
.log();
}