데이터 스트림을 생성하고 소비하는 방식에 따라 콜드(Code)또는 핫(Hot) 시퀸스로 구분한다.
CD나 DVD를 재생하는 것에 비유할 수 있다. 각 시청자가 재생 버튼을 누르면 처음부터 영화가 시작된다.
콜드 시퀸스는 각 구독자(Subscriber)가 스트림을 구독할 때마다 처음부터 데이터 생산을 시작하는 스트림이다.
즉, 각 구독자는 자신만의 독립적인 데이터 흐름을 받게 된다.
Observable.just(), Flowable.fromIterable(), Mono.just(), Flux.fromIterable() 등 기본 생성 연산자.라디오 방송이나 TV 생방송에 비유할 수 있다. 채널을 돌리는 시점부터 방송을 볼 수 없으며 놓친 이전 내용은 들을 수 없다. 그리고 모든 사람이 같은 방송을 듣는다.
핫 시퀸스는 데이터 생산을 구독 여부와 관계없이 독립적으로 시작하며, 모든 구독자에게 동일한 데이터 흐름을 공유한다.
구독 시점 이후에 발생한 데이터만 받게 된다.
PublishSubject, ReplaySubject, BehaviorSubject 또는 connectable 연산자를 사용하여 콜드 시퀀스를 핫 시퀀스로 변환한Spring WebFlux에서 Mono와 Flux는 기본적으로 콜드 시퀸스이다.
Mono나 Flux를 생성하는 것은 데이터 파이프라인을 정의하는 것이지 즉시 데이터를 생산하는 것이 아니다.subscribe()메소드가 호출되거나 WebFlux 핸들러에서 Mono또는 Flux를 반환하여 클라이언트에 의해 구독될 때 시작된다.
@Slf4j
@RestController
public class FluxController {
@GetMapping("/cold-stream")
public Flux<String> getColdStream() {
return Flux.interval(Duration.ofSeconds(1)) // 1초마다 숫자를 발행하는 Flux 정의
.map(i -> "Item " + i)
.take(5) // 5개까지만 발행
.doOnSubscribe(s -> log.info("새로운 콜드 스트림")) // 구독 시점 로그
.doOnNext(item -> log.info("Producing: {}", item)); // 데이터 발행 시점 로그
}
}
/cold-stream엔드포인트에 매번 새로운 HTTP 요청이 올 때마다 Flux.interval은 처음부터 다시 시작된다.Item 0, Item 1, Item 2, Item 3, Item 4를 독립적으로 받는다.doOnSubscribe훅을 통해 각 요청마다 새로운 콜드 스트림로그가 찍히는 것을 확인할 수 있다.Spring WebFlux에서 콜드 시퀸스를 핫 시퀸스로 전환하는 방법이 있으며 이는 주로 다수의 구독자에게 동일한 데이터 스트림을 공유하고자 할 때 사용한다.
Mono나 Flux에 publish(), share(), replay()등의 연산자를 적용하여 핫 시퀸스로 만들 수 있다.
보통 아래 세 가지 방법을 사용한다.
이 방법은 특정 수의 구독자가 스트림에 연결될 때까지 데이터 생산을 기다리고 싶을 때 유용하다.
첫 N명의 구독자가 연결되는 순간 스트림이 시작되며 그 이후의 모든 구독자는 진행 중인 스트림을 공유한다.
@Slf4j
@SpringBootTest
public class ColdSequenceToHotSequenceTest {
@Test
void autoConnect() throws InterruptedException {
// 콜드 시퀀스 정의 (1초마다 숫자 발행)
Flux<Long> coldSource = Flux.interval(Duration.ofSeconds(1))
.doOnSubscribe(s -> log.info("콜드 스트림 구독 시작"))
.doOnNext(val -> log.info("콜드 스트림에서 발행: {}", val));
// 콜드 시퀀스를 핫 시퀀스로 변환 (2명의 구독자가 연결되면 시작)
Flux<Long> hotStream = coldSource.publish().autoConnect(2); // 2명이 구독해야 시작
log.info("첫 번째 구독자 연결 시도");
hotStream.subscribe(data -> log.info("구독자 1: {}", data));
Thread.sleep(2500); // 2.5초 대기 (아직 1명만 구독 중이므로 스트림 시작 안 함)
log.info("두 번째 구독자 연결 시도");
hotStream.subscribe(data -> log.info("구독자 2: {}", data));
Thread.sleep(5000); // 스트림이 시작된 후 5초 동안 데이터 수신
log.info("세 번째 구독자 연결 시도 (진행 중인 핫 스트림에 참여)");
hotStream.subscribe(data -> log.info("구독자 3: {}", data));
Thread.sleep(5000); // 스트림 종료 대기
}
}
콜드 스트림 구독 시작은 두 번째 구독자가 연결되는 순간 한 번만 출력된다.autoConnect(2)에 의해 두 번째 구독자가 연결된 후에야 데이터를 받기 시작한다.share()는 publish().refCount(1)과 동일하다.
첫 번째 구독자가 연결될 때 콜드 스트림을 시작하고 모든 구독자가 연결을 끊으면 스트림을 종료하여 자원을 해제한다.
@Slf4j
@SpringBootTest
public class ColdSequenceToHotSequenceTest {
@Test
void shareHotSequence() throws InterruptedException {
// 콜드 시퀀스 정의
Flux<Long> coldSource = Flux.interval(Duration.ofSeconds(1))
.doOnSubscribe(s -> log.info("----- 콜드 스트림 구독 시작 -----"))
.doOnNext(val -> log.info("원본 발행: {}", val))
.doOnCancel(() -> log.info("----- 콜드 스트림 취소 (구독자 없음) -----"));
// share()를 사용하여 핫 시퀀스로 변환
Flux<Long> hotStream = coldSource.share();
log.info("첫 번째 구독자 연결 시도");
Disposable subscribe1 = hotStream.subscribe(data -> log.info("구독자 1: {}", data));
Thread.sleep(3500); // 3.5초 대기
log.info("두 번째 구독자 연결 시도 (진행 중인 핫 스트림에 참여)");
Disposable subscribe2 = hotStream.subscribe(data -> log.info("구독자 2: {}", data));
Thread.sleep(3500); // 3.5초 대기
subscribe1.dispose();
subscribe2.dispose();
log.info("1,2 번째 구독자 연결 해제");
// 모든 구독자가 해제되면 스트림이 종료되는 것을 보여주기 위함
Thread.sleep(5000);
log.info("세 번째 구독자 연결 시도 (스트림이 종료되었다가 다시 시작)");
hotStream.subscribe(data -> log.info("구독자 3: {}", data));
Thread.sleep(3000);
}
}
콜드 스트림 구독 시작은 첫 번째 구독자가 연결될 때 출력된다.콜드 스트림 취소는 모든 구독자가 해제될 때 출력된다.replay()는 핫 시퀸스를 만들면서 동시에 과거의 일정 수량 또는 일정 기간의 이벤트를 캐시하여 늦게 합류한 구독자에게도 캐시된 데이터를 먼저 제공할 수 있게 한다.
@Slf4j
@SpringBootTest
public class ColdSequenceToHotSequenceTest {
@Test
void replayHotSequence() throws InterruptedException {
// 콜드 시퀀스 정의
Flux<Long> coldSource = Flux.interval(Duration.ofSeconds(1))
.take(10) // 최대 10개만 발행
.doOnSubscribe(s -> log.info("----- 콜드 스트림 구독 시작 -----"))
.doOnNext(val -> log.info("원본 발행: {}", val));
// replay(3)을 사용하여 핫 시퀀스로 변환: 최근 3개의 이벤트를 캐시
Flux<Long> hotStream = coldSource.replay(3).autoConnect(); // autoConnect는 첫 구독 시 바로 연결
log.info("첫 번째 구독자 연결 시도");
hotStream.subscribe(data -> log.info("구독자 1: {}", data));
Thread.sleep(4500); // 4.5초 대기 (Item 0, 1, 2, 3 발행)
log.info("두 번째 구독자 연결 시도 (캐시된 과거 데이터 + 현재 데이터 받음)");
hotStream.subscribe(data -> log.info("구독자 2: {}", data));
Thread.sleep(5000); // 5초 대기 (나머지 데이터 수신)
}
}
publish()만 호출하면 ConnectableFlux를 반환하며, 이 ConnectableFlux는 connect()메소드가 호출되기 전까지 데이터를 생산하지 않는다.
이 방식은 스트림을 시작할 시점을 개발자가 직접 제어하고 싶을 때 사용한다.
@Slf4j
@SpringBootTest
public class ColdSequenceToHotSequenceTest {
@Test
void connectableFluxHotSequence() throws InterruptedException {
// 콜드 시퀀스 정의
Flux<Long> coldSource = Flux.interval(Duration.ofSeconds(1))
.doOnSubscribe(s -> log.info("----- 콜드 스트림 구독 시작 -----"))
.doOnNext(val -> log.info("원본 발행: {}", val));
// publish()만 사용하여 ConnectableFlux로 변환
ConnectableFlux<Long> hotStream = coldSource.publish();
log.info("첫 번째 구독자 연결 시도 (아직 connect() 안 함)");
hotStream.subscribe(data -> log.info("구독자 1: {}", data));
Thread.sleep(2000); // 2초 대기 (아직 connect() 안 했으므로 데이터 발행 없음)
log.info("두 번째 구독자 연결 시도...");
hotStream.subscribe(data -> log.info("구독자 2: {}", data));
Thread.sleep(2000); // 2초 대기 (여전히 데이터 발행 없음)
log.info("connect() 호출 - 이제 스트림이 시작됩니다!");
hotStream.connect(); // 스트림 시작
Thread.sleep(5000); // 5초 동안 데이터 수신 대기
}
}
콜드 스트림 구독 시작은 connect()가 호출되는 순간에 한 번만 실행된다.connect()가 호출되기 전까지는 아무리 많은 구독자가 붙어도 데이터가 발행되지 않는다.connect()이후부터 모든 구독자가 동일한 스트림에서 데이터를 받는다.autoConnect(N)/refCount(N): 특정 수의 소비자가 모여야만 스트림을 시작하거나 소비자가 모두 떠나면 스트림을 종료해야 할 때.share(): 가장 일반적인 핫 시퀸스 변환. 여러 구독자가 스트림을 공유하지만, 스트림의 수명 주기가 구독자 수에 연동될 때.replay(): 늦게 합류한 구독자에게도 과거 데이터를 제공해야 할 때.publish().connect(): 개발자가 스트림 시작 시점을 명시적으로 제어해야 할 때.| 콜드 시퀸스 | 핫 시퀸스 | |
|---|---|---|
| 사용 사례 | 각 요청이 독립적인 데이터를 필요로 할 때 ex: 데이터베이스 쿼리 결과, 파일 다운로드, 사용자 맞춤형 API 응답 | 여러 클라이언트가 동일한 실시간 이벤트를 공유해야 할 때 ex: 실시간 채팅, 주식 시세 |
| 장점 | - 요청별 격리 - 단순한 구현 | - 자원 효율성(데이터 스트림을 한 번만 생성하고 공유) - 실시간 데이터 브로드 캐스팅 |
| 단점 | 자원 소모가 각 요청마다 발생할 수 있다(동일한 데이터라도 여러 번 생성) | - 복잡성 증가(스트림의 생명주기 관리, 캐시 정책 등) - 늦게 합류한 구독자는 이전의 데이터를 놓칠 수 있음( replay()로 보완 가능) |