Spring WebFlux로 실시간 데이터 스트리밍 구현하기

궁금하면 500원·2024년 11월 6일

미생의 스프링

목록 보기
22/48

실시간 데이터 전송을 위해 SSE(Server-Sent Events)를 사용하여 데이터를 스트리밍하는 것입니다.

while(true) 루프를 사용해 계속해서 데이터를 스트리밍하지만, 이 방식은 다소 비효율적이고 멀티스레딩 문제를 발생시킬 수 있습니다.

또한, Spring WebFlux와 같은 비동기 방식에서는 단일 스레드에서 작업을 효율적으로 처리할 수 있도록 설계됩니다.

이제 Spring WebFlux를 사용해 비동기적으로 실시간 데이터를 처리할 수 있는 개선된 예제 코드를 작성해 보겠습니다.

1. EventNotify 컴포넌트

SSE 방식으로 실시간으로 데이터를 전송할 수 있도록 상태를 관리하는 컴포넌트입니다.

이 컴포넌트는 새로운 데이터를 추가하고, change 상태를 관리합니다.

@Component
public class EventNotify {

    private final List<String> events = new ArrayList<>();

    @Getter
    @Setter
    private boolean change = false;

    // 데이터 추가
    public void add(final String data) {
        this.events.add(data);
        this.change = true;  // 데이터가 변경되었음을 알림
    }

    // 마지막 이벤트 반환
    public String getLastEvent() {
        return this.events.get(this.events.size() - 1);
    }
}

2. SseController - 실시간 데이터 스트리밍을 위한 SSE 엔드포인트

Spring MVC 환경에서는 Filter를 사용하여 실시간 스트리밍을 구현할 수 있지만, Spring WebFlux에서는 Flux와 같은 비동기 스트림을 사용하여 더욱 효율적으로 데이터를 처리할 수 있습니다.

@RestController
@RequestMapping("/sse")
public class SseController {

    private final EventNotify eventNotify;

    @Autowired
    public SseController(EventNotify eventNotify) {
        this.eventNotify = eventNotify;
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamEvents() {
        return Flux.<String>create(sink -> {
            // change 상태가 true일 때만 데이터를 푸시
            while (true) {
                if (eventNotify.isChange()) {
                    sink.next(eventNotify.getLastEvent());
                    eventNotify.setChange(false);  // 전송 후 상태를 초기화
                }
                try {
                    Thread.sleep(100);  // 실시간 데이터를 처리하는 간격
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).subscribeOn(Schedulers.boundedElastic());  // 비동기 방식으로 처리
    }
}

3. EventNotifyController - 데이터를 추가하는 엔드포인트

이 컨트롤러는 클라이언트가 /add 엔드포인트로 요청을 보내면 EventNotify에 데이터를 추가하고, 이를 기반으로 SSE 엔드포인트(/sse)에서 실시간으로 데이터를 받을 수 있게 됩니다.

@RestController
@RequestMapping("/add")
public class EventNotifyController {

    private final EventNotify eventNotify;

    @Autowired
    public EventNotifyController(EventNotify eventNotify) {
        this.eventNotify = eventNotify;
    }

    @PostMapping
    public ResponseEntity<String> addEvent(@RequestParam String data) {
        eventNotify.add(data);
        return ResponseEntity.ok("Data added: " + data);
    }
}

4. application.properties(.yml)

Spring Boot 애플리케이션에서 WebFlux를 사용하려면 spring-boot-starter-webflux 의존성을 포함하고, 관련 설정을 활성화해야 합니다

# application.properties
spring.main.web-application-type=reactive

실행과정

  1. /add 엔드포인트에 POST 요청을 보내면, EventNotify 컴포넌트에 데이터가 추가됩니다.

  2. /sse 엔드포인트에 GET 요청을 보내면, 클라이언트는 실시간으로 데이터를 스트리밍 받을 수 있습니다.
    이때, Spring WebFlux의 Flux가 데이터를 비동기적으로 처리하여 클라이언트에 전송합니다.

  3. while(true) 루프를 사용하지 않고, WebFlux의 Flux를 사용하여 비동기적으로 데이터를 스트리밍하는 방식으로, 성능 문제를 최소화합니다.

정리하면...

  • 비동기 처리: WebFlux의 Flux와 subscribeOn(Schedulers.boundedElastic())를 사용하여 비동기적으로 데이터를 처리하도록 했습니다.

  • 효율적인 데이터 전송: while(true) 루프 대신 Flux를 사용해 데이터를 효율적으로 처리합니다.

  • SSE 구현: produces = MediaType.TEXT_EVENT_STREAM_VALUE를 사용하여 SSE를 구현합니다.

결론

Spring WebFlux를 활용한 비동기 스트리밍 방식으로 실시간 데이터 전송을 구현한 예시입니다.

이렇게 구성하면 단일 스레드에서 효율적으로 데이터를 스트리밍할 수 있으며, Flux를 사용해 비동기적으로 데이터를 처리할 수 있습니다.

WebFlux를 활용하면 멀티스레딩 문제를 피하고 성능을 개선할 수 있습니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글