
Reactive Programming에서는 다양한 오퍼레이터를 통해 Mono 또는 Flux 시퀀스를 생성할 수 있다.
| 오퍼레이터 | 타입 | 동작 방식 설명 | 특징 요약 |
|---|---|---|---|
justOrEmpty | Mono | null이 아니면 값을 emit, null이면 onComplete 전송 | null-safe 단일 값 생성 |
fromIterable | Flux | Iterable 요소들을 하나씩 emit | 반복 가능한 컬렉션을 Flux로 변환 |
fromStream | Flux | Stream 데이터를 emit (스트림은 재사용 불가, 자동 종료) | 자바 스트림을 Flux로 변환, 자동 close |
range | Flux | 지정한 시작값부터 지정 개수만큼 정수 emit | for-loop 대체, 연속 정수 시퀀스 |
defer | Mono/Flux | 구독 시점에 시퀀스를 생성하여 최신 데이터 emit | HOT → COLD 전환, 지연 평가 |
using | Flux | 리소스를 열고 사용 후 종료 처리 | 리소스 안전 처리 (ex. 파일, DB 커넥션 등) |
generate | Flux | 상태 기반으로 동기적으로 한 건씩 emit | 상태 머신 형태, pull 기반 |
create | Flux | 비동기 방식으로 복수 개 데이터를 자유롭게 emit | push 기반, 이벤트 중심 시나리오에 유리 |
justOrEmptyjustOrEmpty는 just 오퍼레이터의 확장 개념으로, 값이 null인 경우에도 NullPointerException을 발생시키지 않고 onComplete 시그널을 전송한다. 값이 존재하면 해당 값을 emit하는 Mono를 생성한다.
Mono.justOrEmpty(null)
.subscribe(
data -> System.out.println(data),
error -> System.out.println("Error: " + error),
() -> System.out.println("Completed")
);
fromIterablefromIterable은 Iterable 인터페이스를 구현한 객체를 받아, 각 요소를 emit하는 Flux를 생성한다.
Flux.fromIterable(List.of("A", "B", "C"))
.subscribe(data -> System.out.println("Data: " + data));
fromStreamfromStream은 자바 Stream을 기반으로 데이터를 emit하는 Flux를 생성한다. 스트림은 재사용이 불가능하며, cancel, error, complete 이벤트가 발생하면 자동으로 닫힌다.
Flux.fromStream(() -> Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.filter(i -> i % 2 == 0)
.subscribe(data -> System.out.println("Data: " + data));
rangerange는 특정 범위의 정수를 emit하는 Flux를 생성한다. 시작값부터 1씩 증가하는 수를 지정한 개수만큼 생성한다.
Flux.range(1, 10)
.map(number -> number * 2)
.subscribe(data -> System.out.println("Data: " + data));
deferdefer는 Mono나 Flux를 선언 시점이 아닌 구독 시점에 생성하도록 지연시킨다. 이를 통해 최신의 데이터를 구독 시점에 참조할 수 있다.
System.out.println("Example 14.5");
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now()));
Thread.sleep(2000);
justMono.subscribe(dateTime -> System.out.println("Just Mono: " + dateTime));
deferMono.subscribe(dateTime -> System.out.println("Defer Mono: " + dateTime));
Thread.sleep(2000);
justMono.subscribe(dateTime -> System.out.println("Just Mono: " + dateTime));
deferMono.subscribe(dateTime -> System.out.println("Defer Mono: " + dateTime));
출력 결과:
Example 14.5
Just Mono: 2025-05-02T02:05:37.347993
Defer Mono: 2025-05-02T02:05:39.393091
Just Mono: 2025-05-02T02:05:37.347993
Defer Mono: 2025-05-02T02:05:41.398425
just는 HOT Publisher로 이미 생성된 데이터를 재사용하며, defer는 COLD Publisher로 구독 시마다 새로운 데이터를 생성한다.usingusing은 외부 리소스를 안전하게 사용하는 Flux를 생성할 수 있는 오퍼레이터이다. 리소스 획득, 사용, 해제 과정을 명시적으로 정의할 수 있다.
Path path = Path.of("src/test/java/com/example/webfluxexample/Chapter14/Example14_8.java");
Flux.using(
() -> Files.lines(path),
Flux::fromStream,
Stream::close
).subscribe(System.out::println);
generategenerate는 동기적인 방식으로 하나씩 순차적으로 데이터를 emit하는 Flux를 생성한다. 내부적으로 상태값을 유지하며 상태 기반의 이벤트 발생이 가능하다.
Flux.generate(() -> 0, (state, sink) -> {
sink.next(state);
if (state == 10) {
sink.complete();
}
return ++state;
}).subscribe(data -> System.out.println("data: " + data));
출력:
data: 0
data: 1
...
data: 10
createcreate는 프로그래밍 방식으로 시그널을 발생시키는 오퍼레이터이며, 비동기적으로 여러 데이터를 emit할 수 있다. generate와 달리 더 유연한 데이터 생성이 가능하다.
Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next("Item " + i);
}
sink.complete();
}).subscribe(System.out::println);
이러한 오퍼레이터들을 적절히 활용하면 다양한 데이터 시퀀스를 유연하게 구성할 수 있으며, 특히 동기/비동기 방식의 시그널 흐름을 제어할 때 유용하다.