실시간 이미지 스트리밍 서비스에 백프레셔 적용
서버에서 SSE(Server-Sent Events)를 통해 클라이언트에게 이미지 조회 URL을 연속적으로 스트리밍 하는 상황을 가정한다.
이때, 클라이언트의 처리 속도가 서버의 데이터 생성 속도를 따라가지 못하면 데이터 유실이나 메모리 부족과 같은 문제가 발생할 수 있다.
이를 방지하기 위해 백프레셔를 적용하려 클라이언트가 처리할 수 있는 만큼만 데이터를 보내도록 스트리밍 속도를 제어하려고 한다.
클라이언트가 데이터를 요청(pull)할 때마다 서버가 다음 이미지를 URL을 전속하는 방식으로 구현하여 안정적인 이미지 스트리밍을 보장할 것이다.
이를 위해서는 이미지 조회 URL을 보내는 REST API에 백프레셔를 구현해야 한다.
만약 클라이언트가 데이터를 요청하는 속도에 맞춰 서버가 데이터를 보내고 싶다면 limitRate를 사용해야 한다.
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getImageUrls() {
return Flux.just(
"/api/images/webflux.png",
"/api/images/docker.png",
"/api/images/swagger.png"
// 더 많은 이미지 URL이 전송될 수 있다.
)
// 2개씩 데이터를 요청하도록 설정합니다.
// 첫 2개는 즉시 보내고, 클라이언트가 2개를 처리한 후 다시 2개를 요청하게 됩니다.
.limitRate(2)
.subscribeOn(Schedulers.boundedElastic()); // 별도 스레드에서 실행
}
조금 다른 동작 방식
위의
limitRate를 사용하는 방식과는 동작 방식이 조금 다르다.limitRate는 Pull 방식 백프레셔를 사용하여 소비자가 요청을 보내면 생산자가 그만큼의 데이터를 제공하는 방식이다.
하지만 버퍼를 사용하는 방식은 Push-and-Buffer 방식으로 생산자가 빠르게 데이터를 생성하고, 버퍼에 쌓아둔다. 그리고 소비자가 요청하면 버퍼에서 데이터를 가져가는 방식으로 동작한다.
만약 생산자가 데이터를 빠르게 생성하고, 소비자가 느리더라도 일정량의 데이터는 버퍼에 저장해서 처리하고 싶다면 onBackpressureBuffer를 사용해야 한다.
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getImageUrls() {
return Flux.just(
"/api/images/webflux.png",
"/api/images/docker.png",
"/api/images/swagger.png"
)
.onBackpressureBuffer(10, BufferOverflowStrategy.DROP_OLDEST) // 버퍼 크기 10, 오래된 데이터 버림
.subscribeOn(Schedulers.boundedElastic()); // 별도 스레드에서 실행
}
스트림의 실행을 특정 스케줄러(스레드 풀)로 전환하는 역할을 한다. 이는 Mono/Flux스트림의 전체 파이프라인이 어떤 스레드에서 실행될지 결정한다.
subscribeOn은 리액티브 스트림이 구독되는 시점부터 업스트림 연산자들이 어떤 스레드에서 동작할지 결정한다.Flux.just()와 같은 데이터 생성 작업이 boundedElastic스케줄러의 스레드에서 시작되도록 만든다.Schedulers.boundedFlastic()은 이러한 블로킹 작업을boundedElastic스케줄러는 블로킹 I/O 작업에 최적화된 스레드 풀이다. 특징은 다음과 같다.
따라서 subscribeOn(Schedulers.boundedElastic())은 "이 스트림의 모든 블로킹 작업은 별도의 스레드 풀에서 실행되도록 해줘"라는 요청과 같다.