[Project Reactor] 14. Sequence 필터링을 위한 오퍼레이터 (2)

y001·2025년 5월 1일

Reactive Programming

목록 보기
16/30
post-thumbnail

리액티브 프로그래밍에서 filter, skip, take, next 같은 오퍼레이터는 데이터 스트림에서 원하는 데이터만 골라내거나 흐름을 제어할 때 사용된다. 이 글에서는 각 오퍼레이터의 개념과 실제 사용 예제를 기반으로 동작 방식을 정리하고, 실행 시 주의할 점도 함께 설명한다.


1. filter – 조건에 맞는 데이터만 통과시키기

filter 오퍼레이터는 Upstream에서 emit된 데이터 중 Predicate 조건을 만족하는 값만 Downstream으로 전달한다. 조건을 만족하지 않는 값은 단순히 무시된다.

Flux.range(1, 10)
    .filter(i -> i % 2 == 0)
    .subscribe(data -> System.out.println("Data: " + data));
  • 위 예제에서는 1부터 10까지의 숫자 중 짝수만 출력된다.

2. filterWhen – 비동기 조건 필터링

filterWhenfilter와 유사하지만, 비동기 조건 검사를 수행할 수 있다는 점에서 차이가 있다. Predicate 대신 Publisher<Boolean>을 반환하는 함수를 사용하여 조건을 판단한다.

Flux.range(1, 10)
    .filterWhen(data -> Flux.just(data % 2 == 0))
    .publishOn(Schedulers.parallel())
    .subscribe(data -> System.out.println("Data: " + data));
  • 내부적으로 Flux.just(Boolean)을 사용하므로 비동기 평가가 가능하다.
  • 조건 결과가 true인 값만 Downstream으로 전달된다.

3. skip – 앞의 데이터를 건너뛰기

skip(n)은 Upstream에서 emit된 데이터 중 앞의 n개를 무시하고 그 이후 데이터만 전달한다.

Flux.range(1, 10)
    .skip(2)
    .subscribe(data -> System.out.println("Data: " + data));

출력 결과:

Data: 3
Data: 4
Data: 5
Data: 6
Data: 7
Data: 8
Data: 9
Data: 10

시간 기반 skip

skip(Duration) 형태로 시간 기반 건너뛰기도 가능하다. 지정한 시간 동안 emit된 값은 모두 무시된다.

Flux.interval(Duration.ofMillis(300))
    .skip(Duration.ofSeconds(1))
    .subscribe(data -> System.out.println("Data: " + data));
  • 위 예제에서는 1초 동안 emit된 데이터는 모두 무시되고 이후 데이터만 출력된다.
  • 단, 이 코드는 Flux.interval()을 사용하므로 비동기 스트림이기 때문에 main 쓰레드가 대기하지 않으면 아무 출력도 보이지 않는다.

4. take – 일정 개수까지만 전달하고 종료

take(n)은 Upstream에서 emit된 데이터 중 앞의 n개만 전달한 후 시퀀스를 종료한다.

Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .subscribe(data -> System.out.println("Data: " + data));

Thread.sleep(4000); // 중요

출력 결과:

Data: 0
Data: 1
Data: 2

❓ 왜 Thread.sleep()이 필요할까?

  • Flux.interval()은 별도의 쓰레드에서 데이터를 emit하기 때문에, main 쓰레드가 대기하지 않으면 프로그램이 먼저 종료되어 아무 것도 출력되지 않는다.
  • 이를 해결하려면 Thread.sleep() 또는 blockLast() 같은 방식으로 메인 쓰레드가 종료되지 않도록 대기해야 한다.

❓ take는 emit만 제한하나? 시퀀스가 종료되는가?

  • filter는 조건에 맞지 않는 값을 무시만 한다.
  • 반면 take는 지정된 개수만큼 emit하고 나면 onComplete 시그널을 발생시키며 스트림 자체를 종료한다.
  • 따라서 take는 단순한 필터링이 아닌 흐름 제어 및 종료의 의미를 가진다.

5. next – 첫 번째 값만 Mono로 변환

next()Flux에서 첫 번째 값을 꺼내고, 그 값을 포함한 Mono를 생성한다. 이후 emit은 모두 무시된다. 데이터가 없다면 empty Mono가 된다.

Flux.fromIterable(List.of("A", "B", "C"))
    .next()
    .subscribe(data -> System.out.println("Data: " + data));

출력 결과:

Data: A
  • next()Flux → Mono 변환 시 자주 사용되며, 첫 데이터만 필요할 때 유용하다.

마무리하면서

오퍼레이터동작 설명시퀀스 종료 여부특징 요약
filter조건에 맞는 값만 emit종료하지 않음조건 미충족 값은 무시됨
filterWhen비동기 조건으로 필터링종료하지 않음Publisher<Boolean>로 판단
skip(n)앞의 n개 값을 무시종료하지 않음정수 또는 시간 단위로 skip 가능
take(n)앞의 n개만 emit 후 시퀀스 종료종료함 (onComplete)emit 제한 + 시퀀스 종료
next()첫 값만 emit 후 나머지는 무시, Mono로 래핑종료함 (Mono)Flux → Mono 변환용

0개의 댓글