Sink

김기현·2025년 8월 4일

Spring WebFlux

목록 보기
21/28

Sink란

Sink는 Reactor 프레임워크에서 프로그래밍 방식으로 리액티브 스트림(Mono/Flux)에 데이터를 발행(emit)하는 데 사용되는 도구이다.

쉽게 말해서 외부 소스에서 발생한 이벤트를 리액티브 스트림 파이프라인으로 흘려보내는 "입구" 역할을 한다.


  • 일반적으로 리액티브 스트림은 데이터를 비동기적으로 생성(ex: Flux.interval, WebClient)하거나, 다른 리액티브 소스(ex: Flux.fromIterable, Mono.fromCallable)에서 가져온다.
  • 하지만 때로는 콜백 기만 API, 레거시 시스템, 메시지 큐 리스너 등 논-리액티브 코드에서 발생하는 이벤트를 리액티브 스트림으로 전환해야 할 때가 있다.
  • 이때 Sinks가 유용하게 사용된다.

Sink의 주요 특징 및 목적

  • 외부 이벤트의 주입: 비동기적으로 발생하는 이벤트를 리액티브 스트림으로 변환하여 이벤트를 구독하는 모든 리액티브 소비자에게 전달할 수 있게 한다.
  • 명확한 역할 분리: Sinks는 데이터를 발행(emitting)하는 역할에만 집중한다. 데이터를 소비하거나 변환하는 로직은 Sink에서 얻는 MonoFlux를 통해 처리해야 한다.
  • 안전한 발행: Sinks는 스레드 안정성을 보장하며, tryEmitXXX()같은 메소드를 통해 발행 성공 여부를 즉시 반환하여 발행 중 발생할 수 있는 오류나 백프레셔 상황을 안전하게 처리할 수 있도록 돕는다.
  • 다양한 발행 전략: Sinks.One, Sinks.Many와 같은 다양한 팩토리 메소드를 통해 단일 값 발행, 다중 값 발행 등 스트림의 종류와 발행 전략(ex: unicast, multicast, replay)을 선택할 수 있다.
  • 백프레셔 지원: 구독자의 처리 속도에 맞춰 데이터 발행 속도를 조절하는 백프레셔 메커니즘을 지원한다.

Sink의 종류

Sinks는 발행하려는 데이터의 양과 구독자 수에 따라 크게 두 가지 종류라 나뉜다.

Sinks.One<T>

  • 단 하나의 값을 발행하고 완료되거나 오류를 발생시키는 Mono<T>와 연결된다.
  • 주로 단일 응답이 필요한 비동기 작업에 사용된다.
  • ex: Sinks.one()

Sinks.Many<T>

  • 여러 개의 값을 발행하거나 무한한 스트림을 제공하는 Flux<T>와 연결된다.
  • 구독자 관리 방식에 따라 추가적인 분류가 있다.
    • unicast(): 단 하나의 구독자만 허용한다. 첫 구독자가 연결되면 Sink는 "활성" 상태가 되고 다른 구독자가 연결을 시도하면 오류를 발생시킨다.
    • multicast(): 여러 구독자를 허용한다. 발행되는 데이터는 연결된 모든 구독자에게 전달된다.
    • replay(): 여러 구독자를 허용하며, Sink에 발행된 과거 데이터를 새로 구독하는 구독자에게도 다시 발행한다. 캐싱과 유사한 효과를 가진다.

각 분류에 따른 예제

@SpringBootTest
public class SinkTest {
    @Test
    void sinksManyUnicastOnBackpressure() throws InterruptedException {
        System.out.println("--- Unicast Sink Example ---");

        // Unicast Sink 생성: 단 하나의 구독자만 허용
        Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();

        // 첫 번째 구독자 연결
        unicastSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 1 (Unicast): " + data),
                        error -> System.err.println("구독자 1 (Unicast) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 1 (Unicast) 완료")
                );

        // 데이터 발행
        unicastSink.tryEmitNext("데이터 A");
        unicastSink.tryEmitNext("데이터 B");

        // 두 번째 구독자 연결 시도 (에러 발생 예상)
        try {
            unicastSink.asFlux()
                    .subscribe(
                            data -> System.out.println("구독자 2 (Unicast): " + data),
                            error -> System.err.println("구독자 2 (Unicast) 에러: " + error.getMessage()),
                            () -> System.out.println("구독자 2 (Unicast) 완료")
                    );
        } catch (IllegalStateException e) {
            System.err.println("두 번째 구독자 연결 시도 실패: " + e.getMessage());
        }

        unicastSink.tryEmitNext("데이터 C"); // 여전히 첫 번째 구독자에게만 발행

        Thread.sleep(100); // 비동기 처리를 위한 대기

        unicastSink.tryEmitComplete(); // 스트림 완료
        System.out.println("Unicast Sink 완료.");

        Thread.sleep(100);
    }

    @Test
    void sinkManyMulticastOnBackpressure() throws InterruptedException {
        System.out.println("\n--- Multicast Sink Example ---");

        // Multicast Sink 생성: 여러 구독자 허용
        Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer();

        // 첫 번째 구독자 연결
        multicastSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 1 (Multicast): " + data),
                        error -> System.err.println("구독자 1 (Multicast) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 1 (Multicast) 완료")
                );

        // 구독 전에 데이터 발행
        multicastSink.tryEmitNext("사전 발행 데이터 X");

        // 데이터 발행
        multicastSink.tryEmitNext("데이터 D");
        multicastSink.tryEmitNext("데이터 E");

        Thread.sleep(50); // 구독자 2 연결 전 발행 데이터 수신 대기

        // 두 번째 구독자 연결 (이전 데이터는 받지 못함)
        multicastSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 2 (Multicast): " + data),
                        error -> System.err.println("구독자 2 (Multicast) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 2 (Multicast) 완료")
                );

        // 두 번째 구독자 연결 후에 데이터 발행
        multicastSink.tryEmitNext("데이터 F");
        multicastSink.tryEmitNext("데이터 G");

        Thread.sleep(100);

        multicastSink.tryEmitComplete(); // 스트림 완료
        System.out.println("Multicast Sink 완료.");

        Thread.sleep(100);
    }

    @Test
    void sinksManyReplayLatest() throws InterruptedException {
        System.out.println("\n--- Replay Sink Example (latest) ---");

        // Replay Sink (latest): 마지막 발행된 데이터만 재발행
        Sinks.Many<String> replayLatestSink = Sinks.many().replay().latest();

        // 데이터 발행 (구독자 연결 전)
        replayLatestSink.tryEmitNext("초기 데이터 1");
        replayLatestSink.tryEmitNext("초기 데이터 2"); // 마지막 데이터

        // 첫 번째 구독자 연결 (마지막 데이터 2만 받음)
        replayLatestSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 1 (Replay Latest): " + data),
                        error -> System.err.println("구독자 1 (Replay Latest) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 1 (Replay Latest) 완료")
                );

        Thread.sleep(50);

        // 추가 데이터 발행
        replayLatestSink.tryEmitNext("데이터 H");
        replayLatestSink.tryEmitNext("데이터 I"); // 이 시점의 마지막 데이터

        Thread.sleep(50);

        // 두 번째 구독자 연결 (데이터 H, I를 받음)
        replayLatestSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 2 (Replay Latest): " + data),
                        error -> System.err.println("구독자 2 (Replay Latest) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 2 (Replay Latest) 완료")
                );

        Thread.sleep(100);

        replayLatestSink.tryEmitComplete(); // 스트림 완료
        System.out.println("Replay Latest Sink 완료.");

        Thread.sleep(100);
    }

    @Test
    void sinksManyReplayAll() throws InterruptedException {
        System.out.println("\n--- Replay Sink Example (all) ---");

        // Replay Sink (all): 모든 과거 데이터 재발행
        Sinks.Many<String> replayAllSink = Sinks.many().replay().all();

        // 데이터 발행 (구독자 연결 전)
        replayAllSink.tryEmitNext("모든 초기 데이터 X");
        replayAllSink.tryEmitNext("모든 초기 데이터 Y");

        // 첫 번째 구독자 연결 (X, Y 모두 받음)
        replayAllSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 1 (Replay All): " + data),
                        error -> System.err.println("구독자 1 (Replay All) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 1 (Replay All) 완료")
                );

        Thread.sleep(50);

        // 추가 데이터 발행
        replayAllSink.tryEmitNext("모든 데이터 Z");

        Thread.sleep(50);

        // 두 번째 구독자 연결 (X, Y, Z 모두 받음)
        replayAllSink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 2 (Replay All): " + data),
                        error -> System.err.println("구독자 2 (Replay All) 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 2 (Replay All) 완료")
                );

        Thread.sleep(100);

        replayAllSink.tryEmitComplete(); // 스트림 완료
        System.out.println("Replay All Sink 완료.");

        Thread.sleep(100);
    }
}

Sink 사용 예시

Sinks.Many를 사용하여 외부 이벤트를 리액티브 스트림으로 변환하는 예제이다.

package com.webfluxpractice;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;

@SpringBootTest
public class SinkTest {

    @Test
    void sink() throws InterruptedException {
        // 1. Sinks.Many 생성 (여러 구독자를 허용하는 Multicast Sink)
        Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

        // 2. Sink에서 Flux를 얻어 구독자 연결
        // 이 Flux는 sink를 통해 데이터가 발행될 때마다 이를 수신합니다.
        sink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 1: " + data),
                        error -> System.err.println("구독자 1 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 1 완료")
                );

        sink.asFlux()
                .subscribe(
                        data -> System.out.println("구독자 2: " + data),
                        error -> System.err.println("구독자 2 에러: " + error.getMessage()),
                        () -> System.out.println("구독자 2 완료")
                );

        // 3. 외부 이벤트(가상)를 Sink를 통해 발행
        System.out.println("데이터 발행 시작...");
        EmitResult result1 = sink.tryEmitNext("Hello"); // 데이터 발행 시도
        System.out.println("Emit 'Hello' result: " + result1);

        Thread.sleep(100); // 비동기 처리를 위한 짧은 대기

        EmitResult result2 = sink.tryEmitNext("World");
        System.out.println("Emit 'World' result: " + result2);

        Thread.sleep(100);

        // 4. 모든 데이터 발행이 끝났음을 알리고 스트림 완료
        EmitResult completeResult = sink.tryEmitComplete();
        System.out.println("Emit complete result: " + completeResult);

        // 완료된 Sink에 데이터를 다시 발행하면 실패합니다.
        EmitResult failResult = sink.tryEmitNext("This will fail");
        System.out.println("Emit 'This will fail' result: " + failResult); // EXPECTED: FAIL_TERMINATED

        Thread.sleep(100); // 모든 구독자의 완료 메시지 출력을 위한 대기
    }
}
profile
백엔드 개발자를 목표로 공부하는 대학생

0개의 댓글