Cold는 무언가를 새로 시작하고, Hot은 무언가를 새로 시작하지 않는다.
Subscriber가 구독할 때 마다 데이터 흐름이 처음부터 다시 시작되는 Sequence
Flux<String> coldFlux =
Flux
.fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
.map(String::toLowerCase);
coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
System.out.println("----------------------------------------------------------------------");
Thread.sleep(2000L);
coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
// 01:27:07.332 [main] DEBUG- Using Slf4j logging framework
// 01:27:07.340 [main] INFO - # Subscriber1: korea
// 01:27:07.340 [main] INFO - # Subscriber1: japan
// 01:27:07.340 [main] INFO - # Subscriber1: chinese
// ----------------------------------------------------------------------
// 01:27:09.347 [main] INFO - # Subscriber2: korea
// 01:27:09.348 [main] INFO - # Subscriber2: japan
// 01:27:09.352 [main] INFO - # Subscriber2: chinese
구독이 발생한 시점 이전에 Publisher로부터 emit 된 데이터는 Subscriber가 전달받지 못한다.
구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있다.
String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};
log.info("# Begin concert:");
Flux<String> concertFlux =
Flux
.fromArray(singers)
.delayElements(Duration.ofSeconds(1))
.share();
concertFlux.subscribe(
singer -> log.info("# Subscriber1 is watching {}'s song", singer)
);
Thread.sleep(2500);
concertFlux.subscribe(
singer -> log.info("# Subscriber2 is watching {}'s song", singer)
);
Thread.sleep(3000);
// 01:31:24.967 [main] INFO - # Begin concert:
// 01:31:25.018 [main] DEBUG- Using Slf4j logging framework
// 01:31:26.064 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
// 01:31:27.070 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
// 01:31:28.080 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
// 01:31:28.081 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
// 01:31:29.087 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
// 01:31:29.091 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
// 01:31:30.101 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
// 01:31:30.101 [parallel-5] INFO - # Subscriber2 is watching Singer E's song
delayElements()
share()
public static void main(String[] args) throws InterruptedException {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
Mono<String> mono = getWorldTime(worldTimeUri).cache();
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
}
private static Mono<String> getWorldTime(URI worldTimeUri) {
return WebClient.create()
.get()
.uri(worldTimeUri)
.retrieve()
.bodyToMono(String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response);
return jsonContext.read("$.datetime");
});
}
Cold Sequence
라고 한다.Cold Sequence
흐름으로 동작하는 Publisher를 Cold Publisher
라고 한다.Hot Sequence
라고 한다.Hot Sequence
흐름으로 동작하는 Publisher를 Hot Publisher
라고 한다.