[Project Reactor] 14. Sequence 생성을 위한 오퍼레이터 (1)

y001·2025년 5월 1일

Reactive Programming

목록 보기
15/30
post-thumbnail

Reactive Programming에서는 다양한 오퍼레이터를 통해 Mono 또는 Flux 시퀀스를 생성할 수 있다.

오퍼레이터타입동작 방식 설명특징 요약
justOrEmptyMononull이 아니면 값을 emit, null이면 onComplete 전송null-safe 단일 값 생성
fromIterableFluxIterable 요소들을 하나씩 emit반복 가능한 컬렉션을 Flux로 변환
fromStreamFluxStream 데이터를 emit (스트림은 재사용 불가, 자동 종료)자바 스트림을 Flux로 변환, 자동 close
rangeFlux지정한 시작값부터 지정 개수만큼 정수 emitfor-loop 대체, 연속 정수 시퀀스
deferMono/Flux구독 시점에 시퀀스를 생성하여 최신 데이터 emitHOT → COLD 전환, 지연 평가
usingFlux리소스를 열고 사용 후 종료 처리리소스 안전 처리 (ex. 파일, DB 커넥션 등)
generateFlux상태 기반으로 동기적으로 한 건씩 emit상태 머신 형태, pull 기반
createFlux비동기 방식으로 복수 개 데이터를 자유롭게 emitpush 기반, 이벤트 중심 시나리오에 유리

1. justOrEmpty

justOrEmptyjust 오퍼레이터의 확장 개념으로, 값이 null인 경우에도 NullPointerException을 발생시키지 않고 onComplete 시그널을 전송한다. 값이 존재하면 해당 값을 emit하는 Mono를 생성한다.

Mono.justOrEmpty(null)
    .subscribe(
        data -> System.out.println(data),
        error -> System.out.println("Error: " + error),
        () -> System.out.println("Completed")
    );

2. fromIterable

fromIterableIterable 인터페이스를 구현한 객체를 받아, 각 요소를 emit하는 Flux를 생성한다.

Flux.fromIterable(List.of("A", "B", "C"))
    .subscribe(data -> System.out.println("Data: " + data));

3. fromStream

fromStream은 자바 Stream을 기반으로 데이터를 emit하는 Flux를 생성한다. 스트림은 재사용이 불가능하며, cancel, error, complete 이벤트가 발생하면 자동으로 닫힌다.

Flux.fromStream(() -> Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    .filter(i -> i % 2 == 0)
    .subscribe(data -> System.out.println("Data: " + data));

4. range

range는 특정 범위의 정수를 emit하는 Flux를 생성한다. 시작값부터 1씩 증가하는 수를 지정한 개수만큼 생성한다.

Flux.range(1, 10)
    .map(number -> number * 2)
    .subscribe(data -> System.out.println("Data: " + data));

5. defer

deferMonoFlux를 선언 시점이 아닌 구독 시점에 생성하도록 지연시킨다. 이를 통해 최신의 데이터를 구독 시점에 참조할 수 있다.

System.out.println("Example 14.5");
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now()));

Thread.sleep(2000);

justMono.subscribe(dateTime -> System.out.println("Just Mono: " + dateTime));
deferMono.subscribe(dateTime -> System.out.println("Defer Mono: " + dateTime));

Thread.sleep(2000);

justMono.subscribe(dateTime -> System.out.println("Just Mono: " + dateTime));
deferMono.subscribe(dateTime -> System.out.println("Defer Mono: " + dateTime));

출력 결과:

Example 14.5
Just Mono: 2025-05-02T02:05:37.347993
Defer Mono: 2025-05-02T02:05:39.393091
Just Mono: 2025-05-02T02:05:37.347993
Defer Mono: 2025-05-02T02:05:41.398425
  • just는 HOT Publisher로 이미 생성된 데이터를 재사용하며,
  • defer는 COLD Publisher로 구독 시마다 새로운 데이터를 생성한다.

6. using

using은 외부 리소스를 안전하게 사용하는 Flux를 생성할 수 있는 오퍼레이터이다. 리소스 획득, 사용, 해제 과정을 명시적으로 정의할 수 있다.

Path path = Path.of("src/test/java/com/example/webfluxexample/Chapter14/Example14_8.java");

Flux.using(
    () -> Files.lines(path),
    Flux::fromStream,
    Stream::close
).subscribe(System.out::println);

7. generate

generate동기적인 방식으로 하나씩 순차적으로 데이터를 emit하는 Flux를 생성한다. 내부적으로 상태값을 유지하며 상태 기반의 이벤트 발생이 가능하다.

Flux.generate(() -> 0, (state, sink) -> {
    sink.next(state);
    if (state == 10) {
        sink.complete();
    }
    return ++state;
}).subscribe(data -> System.out.println("data: " + data));

출력:

data: 0
data: 1
...
data: 10

8. create

create는 프로그래밍 방식으로 시그널을 발생시키는 오퍼레이터이며, 비동기적으로 여러 데이터를 emit할 수 있다. generate와 달리 더 유연한 데이터 생성이 가능하다.

Flux.create(sink -> {
    for (int i = 0; i < 5; i++) {
        sink.next("Item " + i);
    }
    sink.complete();
}).subscribe(System.out::println);

이러한 오퍼레이터들을 적절히 활용하면 다양한 데이터 시퀀스를 유연하게 구성할 수 있으며, 특히 동기/비동기 방식의 시그널 흐름을 제어할 때 유용하다.

0개의 댓글