
리액티브 프로그래밍에서 filter, skip, take, next 같은 오퍼레이터는 데이터 스트림에서 원하는 데이터만 골라내거나 흐름을 제어할 때 사용된다. 이 글에서는 각 오퍼레이터의 개념과 실제 사용 예제를 기반으로 동작 방식을 정리하고, 실행 시 주의할 점도 함께 설명한다.
filter – 조건에 맞는 데이터만 통과시키기filter 오퍼레이터는 Upstream에서 emit된 데이터 중 Predicate 조건을 만족하는 값만 Downstream으로 전달한다. 조건을 만족하지 않는 값은 단순히 무시된다.
Flux.range(1, 10)
.filter(i -> i % 2 == 0)
.subscribe(data -> System.out.println("Data: " + data));
filterWhen – 비동기 조건 필터링filterWhen은 filter와 유사하지만, 비동기 조건 검사를 수행할 수 있다는 점에서 차이가 있다. Predicate 대신 Publisher<Boolean>을 반환하는 함수를 사용하여 조건을 판단한다.
Flux.range(1, 10)
.filterWhen(data -> Flux.just(data % 2 == 0))
.publishOn(Schedulers.parallel())
.subscribe(data -> System.out.println("Data: " + data));
Flux.just(Boolean)을 사용하므로 비동기 평가가 가능하다.true인 값만 Downstream으로 전달된다.skip – 앞의 데이터를 건너뛰기skip(n)은 Upstream에서 emit된 데이터 중 앞의 n개를 무시하고 그 이후 데이터만 전달한다.
Flux.range(1, 10)
.skip(2)
.subscribe(data -> System.out.println("Data: " + data));
출력 결과:
Data: 3
Data: 4
Data: 5
Data: 6
Data: 7
Data: 8
Data: 9
Data: 10
skipskip(Duration) 형태로 시간 기반 건너뛰기도 가능하다. 지정한 시간 동안 emit된 값은 모두 무시된다.
Flux.interval(Duration.ofMillis(300))
.skip(Duration.ofSeconds(1))
.subscribe(data -> System.out.println("Data: " + data));
Flux.interval()을 사용하므로 비동기 스트림이기 때문에 main 쓰레드가 대기하지 않으면 아무 출력도 보이지 않는다.take – 일정 개수까지만 전달하고 종료take(n)은 Upstream에서 emit된 데이터 중 앞의 n개만 전달한 후 시퀀스를 종료한다.
Flux.interval(Duration.ofSeconds(1))
.take(3)
.subscribe(data -> System.out.println("Data: " + data));
Thread.sleep(4000); // 중요
출력 결과:
Data: 0
Data: 1
Data: 2
Thread.sleep()이 필요할까?Flux.interval()은 별도의 쓰레드에서 데이터를 emit하기 때문에, main 쓰레드가 대기하지 않으면 프로그램이 먼저 종료되어 아무 것도 출력되지 않는다.Thread.sleep() 또는 blockLast() 같은 방식으로 메인 쓰레드가 종료되지 않도록 대기해야 한다.filter는 조건에 맞지 않는 값을 무시만 한다.take는 지정된 개수만큼 emit하고 나면 onComplete 시그널을 발생시키며 스트림 자체를 종료한다.take는 단순한 필터링이 아닌 흐름 제어 및 종료의 의미를 가진다.next – 첫 번째 값만 Mono로 변환next()는 Flux에서 첫 번째 값을 꺼내고, 그 값을 포함한 Mono를 생성한다. 이후 emit은 모두 무시된다. 데이터가 없다면 empty Mono가 된다.
Flux.fromIterable(List.of("A", "B", "C"))
.next()
.subscribe(data -> System.out.println("Data: " + data));
출력 결과:
Data: A
next()는 Flux → Mono 변환 시 자주 사용되며, 첫 데이터만 필요할 때 유용하다.| 오퍼레이터 | 동작 설명 | 시퀀스 종료 여부 | 특징 요약 |
|---|---|---|---|
filter | 조건에 맞는 값만 emit | 종료하지 않음 | 조건 미충족 값은 무시됨 |
filterWhen | 비동기 조건으로 필터링 | 종료하지 않음 | Publisher<Boolean>로 판단 |
skip(n) | 앞의 n개 값을 무시 | 종료하지 않음 | 정수 또는 시간 단위로 skip 가능 |
take(n) | 앞의 n개만 emit 후 시퀀스 종료 | 종료함 (onComplete) | emit 제한 + 시퀀스 종료 |
next() | 첫 값만 emit 후 나머지는 무시, Mono로 래핑 | 종료함 (Mono) | Flux → Mono 변환용 |