Chapter07. Cold Sequence와 Hot Sequence

김신영·2023년 7월 30일
0

Spring WebFlux

목록 보기
7/13
post-thumbnail

Cold와 Hot의 의미

Cold는 무언가를 새로 시작하고, Hot은 무언가를 새로 시작하지 않는다.

  • Hot Swap
    • 컴퓨터 시스템의 전원이 켜져 있는 상태에서 디스크 등의 장치를 교체할 경우 시스템을 재시작하지 않고서도 바로 장치를 인식하는 것
  • Hot Deploy
    • 서버를 재시작하지 않고서 응용 프로그램의 변경 사항을 적용할 수 있는 기능
  • Hot Wallet
    • 인터넷에 연결되어 있기 때문에, 즉시 사용이 가능하지만 보안에 취약
  • Cold Wallet
    • 인터넷과 단절되어 있어서 사용성은 떨어지지만 보안이 강화된다는 특성

Cold Sequence

Subscriber가 구독할 때 마다 데이터 흐름이 처음부터 다시 시작되는 Sequence

image

Flux<String> coldFlux =
    Flux
        .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
        .map(String::toLowerCase);

coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
System.out.println("----------------------------------------------------------------------");
Thread.sleep(2000L);
coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));

// 01:27:07.332 [main] DEBUG- Using Slf4j logging framework
// 01:27:07.340 [main] INFO - # Subscriber1: korea
// 01:27:07.340 [main] INFO - # Subscriber1: japan
// 01:27:07.340 [main] INFO - # Subscriber1: chinese
// ----------------------------------------------------------------------
// 01:27:09.347 [main] INFO - # Subscriber2: korea
// 01:27:09.348 [main] INFO - # Subscriber2: japan
// 01:27:09.352 [main] INFO - # Subscriber2: chinese

Hot Seqeunce

구독이 발생한 시점 이전에 Publisher로부터 emit 된 데이터는 Subscriber가 전달받지 못한다.
구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있다.

image

String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};

log.info("# Begin concert:");
Flux<String> concertFlux =
        Flux
            .fromArray(singers)
            .delayElements(Duration.ofSeconds(1))
            .share();

concertFlux.subscribe(
        singer -> log.info("# Subscriber1 is watching {}'s song", singer)
);

Thread.sleep(2500);

concertFlux.subscribe(
        singer -> log.info("# Subscriber2 is watching {}'s song", singer)
);

Thread.sleep(3000);

// 01:31:24.967 [main] INFO - # Begin concert:
// 01:31:25.018 [main] DEBUG- Using Slf4j logging framework
// 01:31:26.064 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
// 01:31:27.070 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
// 01:31:28.080 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
// 01:31:28.081 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
// 01:31:29.087 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
// 01:31:29.091 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
// 01:31:30.101 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
// 01:31:30.101 [parallel-5] INFO - # Subscriber2 is watching Singer E's song
  • delayElements()
    • 데이터 소스로 입력된 각 데이터의 emit을 일정시간 동안 지연시키는 Operator
  • share()
    • Cold Sequence를 Hot Sequence로 동작하게 해주는 Operator
    • 여러 Subscriber가 하나의 원본 Flux를 공유한다.

cache() Operator

  • Cold Sequence로 동작하는 Publisher를 Hot Sequence로 변경해준다.
  • emit된 데이터를 캐시한 뒤, 구독이 발생할 때 마다 캐시된 데이터를 전달한다.
public static void main(String[] args) throws InterruptedException {
    URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
            .host("worldtimeapi.org")
            .port(80)
            .path("/api/timezone/Asia/Seoul")
            .build()
            .encode()
            .toUri();

    Mono<String> mono = getWorldTime(worldTimeUri).cache();
    mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
    Thread.sleep(2000);
    mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

    Thread.sleep(2000);
}

private static Mono<String> getWorldTime(URI worldTimeUri) {
    return WebClient.create()
            .get()
            .uri(worldTimeUri)
            .retrieve()
            .bodyToMono(String.class)
            .map(response -> {
                DocumentContext jsonContext = JsonPath.parse(response);
                return jsonContext.read("$.datetime");
            });
}

정리

  • Subscriber의 구독 시점이 달라도, 구독할 때마다 Publisher가 데이터를 처음부터 emit하는 과정을 Cold Sequence라고 한다.
  • Cold Sequence 흐름으로 동작하는 Publisher를 Cold Publisher라고 한다.
  • Publisher가 데이터를 emit하는 과정이 한 번만 일어나고, Subscriber가 각각의 구독 시점 이후에 emit된 데이터만 전달받는 것을 Hot Sequence라고 한다.
  • Hot Sequence 흐름으로 동작하는 Publisher를 Hot Publisher 라고 한다.
  • share(), cache() 등의 Operator를 사용해서 Cold Sequence를 Hot Seqeunce로 변환할 수 있다.
profile
Hello velog!

0개의 댓글