[Reactive Programming] 동기 Flux vs 비동기 Flux

y001·2025년 5월 2일

Reactive Programming

목록 보기
14/30
post-thumbnail

리액티브 프로그래밍을 학습하거나 테스트 코드를 작성하다 보면 어떤 Flux는 결과가 곧바로 출력되지만, 어떤 Flux는 아무것도 출력되지 않고 종료되는 상황을 마주하게 된다.
이 차이는 결국 Flux가 언제 데이터를 emit하느냐, 즉 동기적으로 실행되느냐 비동기적으로 실행되느냐에 따라 달라진다.

이 글에서는 1) 동기 vs 비동기 Flux의 개념, 2) emit 시점에 따른 주요 연산자 정리, 3) 비동기 흐름에서 블로킹이 필요한 이유와 처리 방법을 중심으로 살펴본다.


1. 동기 Flux vs 비동기 Flux

Flux는 기본적으로 비동기 스트림을 위한 Publisher이지만, 실제 실행 시점은 사용된 연산자에 따라 다르다.
어떤 Flux동기적으로 즉시 실행되고, 어떤 FluxScheduler나 delay 설정에 의해 비동기적으로 emit된다.


▶ 동기 Flux

  • 메인 스레드에서 즉시 실행되며 subscribe 시점에 모든 emit과 완료가 처리된다.
  • Thread.sleep() 같은 블로킹 처리는 필요하지 않다.
Flux.fromIterable(List.of("A", "B", "C"))
    .next()
    .subscribe(data -> System.out.println("Data: " + data));

출력 결과:

Data: A

▶ 비동기 Flux

  • interval, delayElements와 같이 Scheduler를 사용하거나 시간 기반 emit을 수행하는 연산자는 비동기로 동작한다.
  • subscribe 이후에도 emit이 지연되기 때문에, 메인 스레드가 먼저 종료되면 아무런 출력도 발생하지 않는다.
Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .subscribe(System.out::println);

Thread.sleep(4000);  // 메인 스레드 유지 필요

출력 결과:

0
1
2

2. Emit 시점에 따른 주요 Operator 분류

Flux가 언제 데이터를 emit하는지는 연산자의 특성에 따라 결정된다. 다음은 주요 연산자들을 emit 시점 기준으로 분류한 표이다.

분류emit 시점주요 연산자설명
동기 emitsubscribe 시 즉시 emitjust, fromIterable, range, empty, errorsubscribe 되자마자 데이터를 emit
비동기 emit일정 시간 또는 delay 이후 emitinterval, timer, delayElements, delaySubscriptionScheduler나 지연 설정을 통해 emit

예시 비교

// 동기 emit
Flux.just("A", "B", "C").subscribe(System.out::println); // 즉시 출력됨

// 비동기 emit
Flux.interval(Duration.ofMillis(500)).take(2).subscribe(System.out::println);
// 블로킹 없이 실행하면 출력되지 않음

3. 타이머 기반 emit은 블로킹이 필요하다

▶ 이유

  • interval, delayElements 등의 연산자는 내부적으로 비동기 Scheduler를 사용한다.
  • Scheduler는 daemon 스레드로 동작하기 때문에, 메인 스레드가 먼저 종료되면 Flux emit이 완료되기 전에 프로그램이 종료된다.
  • 따라서, 결과 출력을 보장하려면 메인 스레드의 생명 주기를 블로킹 코드로 유지해야 한다.

블로킹 처리 방법

1. Thread.sleep() 사용 – 가장 단순한 방식

Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .subscribe(System.out::println);

Thread.sleep(4000);  // 메인 스레드를 강제로 유지

2. CountDownLatch 사용 – 테스트 코드나 복수 subscribe 처리에 유용

CountDownLatch latch = new CountDownLatch(1);

Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .doOnComplete(latch::countDown)
    .subscribe(System.out::println);

latch.await();  // onComplete까지 대기

3. blockLast() 사용 – 마지막 emit까지 자동 대기

Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .doOnNext(System.out::println)
    .blockLast();  // 모든 데이터가 emit될 때까지 대기

0개의 댓글