Reactor Operator -1

이정원·2024년 11월 10일
post-thumbnail

1.Reactor란

Reactor는 비동기 및 반응형 프로그래밍을 위한 Java 기반 라이브러리로, 효율적인 데이터 처리와 이벤트 처리를 지원하는 프로젝트이다. Reactor는 리액티브 스트림스(Reactive Streams) 인터페이스를 구현하며, 비동기 데이터 흐름을 쉽게 관리할 수 있도록 도와준다. 이를 통해 대규모 트래픽을 효율적으로 처리할 수 있으며 리소스 사용을 최적화한다.

1-1.FLUX

Flux는 Reactor의 두 가지 주요 타입 중 하나로, 0개 이상의 데이터를 비동기 스트림으로 방출하는 구현체(Publisher)이다. 여러 개의 이벤트를 처리할 때 사용되며 데이터를 계속해서 내보내는 스트림의 형태이다. 예를 들면, 실시간 데이터 피드나 연속적인 이벤트 스트림에 적합하다.

FLUX 메서드

  • range(int start, int count):주어진 시작 값부터 시작하여 count만큼의 연속된 정수를 방출하는 Flux를 생성한다.
public Flux<Integer> startFlux(){
        return Flux.range(1,10).log();
    }

  • fromIterable(Iterable<? extends T> it):주어진 Iterable 데이터(리스트, 집합 등)를 사용하여 각 요소를 방출하는 Flux를 생성한다.
public Flux<String> startFlux2(){
        return Flux.fromIterable(List.of("a","b","c","d")).log();
    }

1-2.MONO

Mono는 Reactor의 또 다른 주요 타입으로 0또는 1개의 데이터를 비동기적으로 방출하는 구현체(Publisher)이다. 단일 값이나 빈 값을 다루는 데 적합하며 비동기 작업의 결과가 하나의 데이터만 필요한 경우 주로 사용된다. 예를 들어 데이터베이스 조회나 단일 HTTP 요청의 응답을 처리할 때 유용하다.

MONO 메서드

  • just(T... data):주어진 요소들을 방출하는 Flux를 생성한다.
public Mono<Integer>startMono(){
        return Mono.just(1).log();
    }

  • empty():아무런 데이터를 방출하지 않는 빈 Mono를 생성한다.
public Mono<?>startMono2(){
        return Mono.empty().log();
    }

  • error(Throwable error):에러를 방출하는 Mono를 생성한다.
public Mono<?>startMono3(){
        return Mono.error(new Exception("hello reactor"));
    }

실습 Gradle 추가

plugins {
    id 'java'
    id "io.spring.dependency-management" version "1.0.7.RELEASE"
}
dependencyManagement {
    imports {
        mavenBom "io.projectreactor:reactor-bom:2023.0.11"
    }
}
dependencies {
	implementation 'io.projectreactor:reactor-core'
	testImplementation 'io.projectreactor:reactor-test'
}

2.Reactor Operator

2-1.map

map 연산자는 스트림의 각 인스턴스에 주어진 함수를 적용하여 값을 변환할때 사용된다.(ex.객체 -> 다른 객체 주입,필드값 변경)

//1부터 5개의 요소를 *=2 한 값을 출력한다.
public Flux<Integer> fluxMap() {
        return Flux.range(1, 5)
                .map(i -> i * 2)
                .log();
    }

2-2.filter

주어진 조건(predicate)에 따라 스트림의 요소를 걸러낸다. 조건을 만족하는 요소만 스트림에 남게 된다.

//5보다 큰 수만 걸러낸다.
public Flux<Integer> fluxFilter() {
        return Flux.range(1, 10)
                .filter(i -> i > 5)
                .log();
    }

2-3.take

스트림의 처음 n개의 요소만 방출하고 이후에는 스트림을 종료한다.

//5보다 큰 수중 3개의 요소만 방출 == 6 7 8
 public Flux<Integer> fluxFilterTake() {
        return Flux.range(1, 10)
                .filter(i -> i > 5)
                .take(3)
                .log();
    }

2-4.flatMap

각 요소에 대해 비동기 작업을 수행하거나 Publisher를 반환하고, 그 결과를 단일 스트림으로 평탄화(flatten)하여 합친다.

//1 부터 10까지 각각의 i에서 10을 곱해 10개의 요소 안에서 0.2초 딜레이 간격으로 
//비동기적 i 출력(ex.10,20,30 ~ 11,21,31)
public Flux<Integer> fluxFlatMap() {
        return Flux.range(1, 10)
                .flatMap(i -> Flux.range(i * 10, 10)
                        .delayElements(Duration.ofMillis(200)))
                .log();
    }

2-4.concatMap

입력 요소에 대해 지정된 함수로 변환된 Publisher를 순서대로 병합하여 실행한다. 각 Publisher는 이전 Publisher가 완료된 후에 실행된다.

// 1부터 10까지 i에서 10을 곱한 수부터 10개를 각 i마다 순차적으로 실행
public Flux<Integer> fluxConcatMap() {  
        return Flux.range(1, 10)
                .concatMap(i -> Flux.range(i * 10, 10)
                        .delayElements(Duration.ofMillis(100),single()))
                .log();
    }

delayElements는 기본적으로 parallel 스케줄러를 사용하기 때문에 single로 지정하면 순차적 처리를 확인할수 있다.

2-5.flatMapMany

Mono로부터 여러 요소를 방출하는 Flux로 변환할 때 사용된다. Mono의 결과를 Flux로 변환하여 여러 요소를 방출할 수 있게 한다.

// 10을 출력하는 Mono에서 1개 부터 10까지의 Flux로 변환
 public Flux<Integer> monoFlatMapMany() {
        return Mono.just(10)
                .flatMapMany(i -> Flux.range(1, i))
                .log();
    }

2-6. switchIfEmpty / defaultIfEmpty

원본 Flux나 Mono가 빈 경우 다른 Publisher로 대체한다. 대체할 데이터 흐름이 필요할 때 사용된다.

//100보다 큰값이 없기 때문에 기본값인 30 출력
public Mono<Integer> defaultIfEmpty1() {
        return Mono.just(100)
                .filter(i -> i > 100)
                .defaultIfEmpty(30);
    }
// 100보다 큰 값이 없기 때문에 30의 값을 가지는 
// Publisher Mono로 대체하여 60출력 
public Mono<Integer> switchIfEmpty1() {
        return Mono.just(100)
                .filter(i -> i > 100)
                .switchIfEmpty(Mono.just(30).map(i -> i * 2));
    }

2-7. merge / zip

여러 Publisher의 데이터를 병합하여 동시에 방출할 수 있다. 각 Publisher에서 데이터가 준비되는 대로 방출되므로 비동기적인 병합이 가능하다.

Merge

//1,2,3 리스트를 가지는 Publisher와 4를 가지는 Publisher Merge
public Flux<String> fluxMerge() {
        return Flux.merge(Flux.fromIterable(List.of("1","2","3")), Flux.just("4"))
                .log();
    }
// just 값을 가지는 Publisher들의 mergeWith를 사용한 Merge    
public Flux<String> monoMerge() {
        return Mono.just("1").mergeWith(Mono.just("2")).mergeWith(Mono.just("3"));
    }	

Zip

//각 Publusher들의 인덱스에 해당하는 요소를 getT1,getT2로 추출하여 결합 
public Flux<String> fluxZip() {
        return Flux.zip(Flux.just("a", "b", "c"), Flux.just("d", "e", "f"))
                .map(i -> i.getT1() + i.getT2())
                .log();
    }

0개의 댓글