
connect(),autoConnect(),refCount()
리액터(reactor)의 Flux 스트림은 기본적으로 cold하다.
하지만 .publish()를 사용하면 이를 hot stream(ConnectableFlux)으로 변환할 수 있다. 여기에 연결 시점을 제어하는 connect(), autoConnect(), refCount() 등을 함께 사용하면 언제 데이터가 흐르고, 누가 어떤 타이밍에 받는지를 제어할 수 있게 된다.
publish()만 쓰면 아무 일도 일어나지 않는다ConnectableFlux<Integer> flux = Flux.range(1, 3)
.delayElements(Duration.ofMillis(500))
.doOnNext(d -> System.out.println("Emitting: " + d))
.publish();
publish()는 스트림을 준비만 할 뿐, 실제로 connect()를 호출해야 실행된다connect()를 써야 emit 시작flux.connect(); // connect 호출 시점부터 emit 시작됨
autoConnect() — 자동으로 connect 해주는 오퍼레이터Flux<Integer> flux = Flux.range(1, 3)
.delayElements(Duration.ofMillis(500))
.doOnNext(d -> System.out.println("Emitting: " + d))
.publish()
.autoConnect(); // 구독자가 생기면 자동으로 connect
Subscribed 1
Subscribed 2
Emitting: 1
Subscriber 1: 1
Subscriber 2: 1
...
✅ 특징: 한 번 시작되면, 구독자가 모두 끊겨도 스트림은 멈추지 않는다
refCount(n) — n명 이상 구독 시 connect, 모두 끊기면 disconnectFlux<Long> flux = Flux.interval(Duration.ofMillis(500))
.doOnSubscribe(s -> System.out.println("🔥 refCount - connected"))
.doOnCancel(() -> System.out.println("🛑 refCount - disconnected"))
.publish()
.refCount(1); // 구독자 수가 1명 이상일 때만 실행
▶️ Subscribing 1
🔥 refCount - connected
Subscriber 1: 0
Subscriber 1: 1
❌ Disposing Subscriber 1
🛑 refCount - disconnected
▶️ Subscribing 2
🔥 refCount - connected
Subscriber 2: 0
Subscriber 2: 1
Subscriber 2: 2
✅ 특징: 구독자 수에 따라 시작/종료를 자동 제어
| 항목 | autoConnect() | refCount(n) |
|---|---|---|
| 연결 조건 | 구독자 수 n명 도달 시 | 동일 |
| 구독자가 모두 끊기면? | ❌ 유지됨 | ✅ upstream 종료 |
| 다시 구독하면? | 이전 emit 이어받음 | 0부터 새로 시작 |
| 적합한 경우 | 한 번 시작하면 계속 돌리기 | 리소스 아끼고 싶을 때 |
refCount(1) 시나리오[Subscriber 1 구독] → emit 시작 (0, 1)
[Subscriber 1 해제] → emit 멈춤
[Subscriber 2 구독] → 다시 emit 시작 (0, 1, 2...)
autoConnect() 시나리오[Subscriber 1 구독] → emit 시작 (0, 1)
[Subscriber 1 해제] → emit 계속 진행
[Subscriber 2 구독] → 현재 흐름에 중간 합류 (2부터)