[Spring Webflux] 20. Reactive Streaming 데이터 처리: SSE

y001·2025년 5월 9일

Reactive Programming

목록 보기
30/30
post-thumbnail

실시간 데이터를 브라우저에 전달하는 기술은 다양하다. 그중에서도 Spring WebFlux 기반으로 Server-Sent Events(SSE)를 구현하는 방식은 비교적 간단한 구조로도 반응형 스트리밍을 완성할 수 있는 강력한 방법이다. 이 글에서는 Spring WebFlux의 Flux와 SSE를 결합하여 Reactive Streaming 흐름을 처리하는 방법을 살펴보고, 실제 코드 예제와 함께 어떻게 동작하는지 자세히 설명한다.


1. Spring WebFlux + SSE

Spring WebFlux는 Reactive Streams 사양을 구현한 Reactor 기반의 논블로킹 웹 프레임워크다. 이 구조 안에서 Flux는 N개의 데이터를 비동기적으로 흘려보낼 수 있는 반응형 스트림이며, SSE는 이 Flux를 HTTP 응답을 유지한 채로 클라이언트에 점진적으로 전달할 수 있는 이상적인 방법이다.

SSE는 text/event-stream 콘텐츠 타입을 사용하며, 클라이언트가 먼저 연결을 열고 서버는 그 연결을 끊지 않은 채 이벤트 데이터를 순차적으로 내려보낸다. WebSocket처럼 복잡한 프로토콜 업그레이드 없이도 단방향 푸시가 가능하다는 점에서 간단하고 안정적인 기술이다.


2. BookService: 데이터 스트리밍 처리

먼저, 데이터베이스에서 Book 엔티티를 조회하여 Flux 형태로 반환하는 서비스를 구성한다.

@Slf4j
@Validated
@Service
@RequiredArgsConstructor
public class BookService {
    private final @NonNull R2dbcEntityTemplate template;

    public Flux<Book> streamingBooks() {
        return template
            .select(Book.class)
            .all()
            .delayElements(Duration.ofSeconds(2L)); // 2초 간격 emit
    }
}

설명

  • R2dbcEntityTemplate.all()을 통해 전체 Book 데이터를 Flux로 조회한다.
  • delayElements(Duration.ofSeconds(2))를 통해 각 데이터 emit 간격을 2초로 설정한다.
  • 이 Flux는 이후 Controller 또는 Router에서 SSE로 클라이언트에 전달된다.

3. BookRouter: SSE 라우팅 설정

Spring WebFlux에서는 RouterFunction을 통해 명시적으로 라우팅을 설정할 수 있다. 아래는 /v1/streaming-books 경로에서 SSE 방식으로 Flux\<BookDto.Response>를 내려주는 라우터 설정이다.

@Configuration
public class BookRouter {
    @Bean
    public RouterFunction<?> routeStreamingBook(BookService bookService, BookMapper mapper) {
        return route(RequestPredicates.GET("/v1/streaming-books"),
            request -> ServerResponse.ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(
                    bookService.streamingBooks()
                        .map(book -> mapper.bookToResponse(book)),
                    BookDto.Response.class
                ));
    }
}

설명

  • MediaType.TEXT_EVENT_STREAM은 SSE 스트리밍을 위한 핵심 설정이다.
  • bookService.streamingBooks()로부터 Flux을 얻고, BookDto.Response로 변환한 후 그대로 응답 본문으로 스트리밍한다.
  • 클라이언트는 이 스트림을 수신하여 알림이나 상태 갱신에 활용할 수 있다.

4. BookWebClient: Reactive 클라이언트에서 SSE 수신

서버에서 내려주는 SSE 스트림을 수신하려면, 클라이언트 측에서도 WebClient로 응답 스트림을 구독해야 한다.

@Slf4j
@Configuration
public class BookWebClient {
    @Bean
    public ApplicationRunner streamingBooks() {
        return args -> {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<BookDto.Response> response = webClient
                .get()
                .uri("/v1/streaming-books")
                .retrieve()
                .bodyToFlux(BookDto.Response.class);

            response.subscribe(book -> {
                log.info("bookId: {}", book.getBookId());
                log.info("titleKorean: {}", book.getTitleKorean());
                log.info("titleEnglish: {}", book.getTitleEnglish());
                log.info("description: {}", book.getDescription());
                log.info("author: {}", book.getAuthor());
                log.info("isbn: {}", book.getIsbn());
                log.info("publishDate: {}", book.getPublishDate());
                log.info("================================");
            }, error -> log.error("error: ", error));
        };
    }
}

설명

  • bodyToFlux(BookDto.Response.class)를 통해 SSE로 수신되는 스트림을 Flux로 처리
  • subscribe()를 통해 각 Book 정보를 받아 로그로 출력
  • 서버가 delayElements로 2초마다 데이터를 보내기 때문에, 클라이언트도 그 주기로 데이터를 수신

5. 실행 결과

실제로 위 구조를 실행하면 클라이언트 콘솔에 2초 간격으로 도서 정보가 한 건씩 출력되는 것을 확인할 수 있다. 이는 서버에서 HTTP 연결을 닫지 않고, 지속적으로 응답을 보내주는 SSE의 동작 방식 덕분이다.

예시 출력:

INFO - bookId: 1
INFO - titleKorean: 도서1
...
INFO - bookId: 2
INFO - titleKorean: 도서2
...

6. 정리

Spring WebFlux에서 SSE를 활용하면, 서버에서 발생하는 데이터를 클라이언트로 끊김 없이, 실시간으로 스트리밍할 수 있다. 이 방식은 일반적인 REST API와 달리, 응답을 한 번에 끝내지 않고 연결을 유지한 채 필요한 시점마다 데이터를 내려주는 방식이기 때문에, 실시간 데이터 전달에 매우 적합하다.

특히 Flux<T>를 기반으로 동작하기 때문에, Reactor의 다양한 연산자들과 조합하여 시간 제어, 필터링, 매핑 등 다양한 스트림 처리가 가능하다. 복잡한 실시간 시스템을 상대적으로 간단하게 구현할 수 있다는 점이 큰 장점이다.

0개의 댓글