Spring WebFlux에서의 스트리밍 처리

1. Mono와 Flux의 기본 개념

- Mono
단일 값을 반환하는 비동기적인 결과. 예를 들어, 데이터베이스에서 하나의 레코드를 조회할 때 사용됩니다.

  • Flux
    여러 개의 값을 반환할 수 있는 비동기적인 결과. 예를 들어, 서버에서 지속적으로 데이터를 푸시할 때 사용됩니다.

  • Mono
    아무 값도 반환하지 않는 비동기 작업을 나타냅니다.
    이는 주로 요청이 성공적으로 처리되었음을 나타내지만 클라이언트에 대한 응답을 보내지 않을 때 사용됩니다.

2. 왜 Mono를 사용할까?

Mono는 주로 서버에서 클라이언트에게 직접 응답을 보내지 않고, 다른 방법으로 처리해야 할 경우에 사용됩니다.
예를 들어, WebFlux에서 클라이언트와의 연결을 통해 지속적인 스트리밍을 수행할 때 유용합니다.
이러한 상황에서는 서버가 데이터 전송을 관리하고, 클라이언트는 해당 데이터를 실시간으로 소비합니다.

3. SSE(Server-Sent Events)와 직접 스트리밍

일반적으로 SSE는 브라우저에서 EventSource 객체를 사용하여 구현됩니다.
그러나 이 객체는 요청에 헤더를 추가할 수 없는 제약이 있습니다.
따라서 복잡한 요청을 요구하는 경우, fetch를 사용하여 HTTP 요청을 커스터마이즈해야 합니다.
이러한 이유로 SSE를 사용할 필요가 없는 경우도 많습니다.

4. ServerHttpRequest 및 ServerHttpResponse 활용

  • ServerHttpRequest: 요청의 본문이 Flux 형태로 들어옵니다.
    이 객체를 통해 요청 데이터를 비동기적으로 처리할 수 있습니다.
  • ServerHttpResponse: writeAndFlushWith 메서드를 사용하여 응답을 Flux 형태로 보낼 수 있습니다.
    이를 통해 서버에서 클라이언트에게 지속적으로 데이터를 푸시할 수 있습니다.

5. HTTP 통신의 기본 개념

  • HTTP 1.1: 심플렉스 통신 방식으로, 요청이 끝나야 응답이 가능합니다.
    즉, 한 번에 하나의 요청과 응답만 처리할 수 있습니다.

  • HTTP 2: 멀티플렉스 통신 방식으로, 여러 요청과 응답을 동시에 처리할 수 있습니다.
    이로 인해 더 효율적인 데이터 전송이 가능합니다.

6.코드 예제

@RestController
class Test0 {
    @PostMapping("/test0")
    suspend fun test0(req: ServerHttpRequest, res: ServerHttpResponse): Mono<Void> {
        var responseSink: FluxSink<DataBuffer>? = null

        fun response(body: String) {
            responseSink?.next(
                DefaultDataBufferFactory().wrap(body.toByteArray())
            ) ?: res.writeAndFlushWith(Flux.just(Flux.create { sink ->
                responseSink = sink
                sink.next(DefaultDataBufferFactory().wrap(body.toByteArray()))
            })).timeout(Duration.ofHours(1L)).subscribe()
        }

        val onRequest: (String, String) -> Unit = { curr, all ->
            println("Request: $curr, $all")
            response("Hello, World 2")
        }

        response("Hello, World 1")

        return Mono.create { sink ->
            res.setStatusCode(org.springframework.http.HttpStatus.OK)
            val request = ByteArrayOutputStream()
            req.body.flatMapSequential({ buffer ->
                if (buffer.readableByteCount() > 0) {
                    val curr = ByteArray(buffer.readableByteCount()).also {
                        buffer.read(
                            if (it.size >= 2 &&
                                it[it.size - 2] == '\r'.code.toByte() &&
                                it[it.size - 1] == '\n'.code.toByte()
                            ) it.copyOf(it.size - 2) else it
                        )
                    }
                    request.write(curr)
                    onRequest(String(curr, StandardCharsets.UTF_8), request.toString())
                }
                DataBufferUtils.release(buffer)
                Mono.empty<Void>()
            }, 1)
            .then(Mono.create<Void> {
                onRequest("", request.toString())
                responseSink?.complete()
                it.success()
                sink.success()
            }).subscribe()
        }
    }
}

7. 코드설명

  • responseSink: FluxSink를 통해 비동기적으로 클라이언트에 데이터를 전송합니다.
  • response 함수: 전달된 문자열을 DataBuffer로 감싸서 클라이언트에게 전송합니다.
    클라이언트가 응답을 수신하지 못했을 경우 새로운 Flux를 생성하여 데이터를 푸시합니다.
  • onRequest: 클라이언트로부터 요청을 처리하는 로직입니다.
    각 요청을 처리하고 응답을 반환합니다.

8. 결론

Spring WebFlux를 사용하여 스트리밍 HTTP 통신을 구현하는 것은 비동기적이고 효율적인 데이터 처리 방법을 제공합니다.

그러나 HTTP 프로토콜과 SSE의 제약을 이해하고 상황에 맞는 방법을 선택하는 것이 중요합니다.

Spring WebFlux의 스트리밍 처리 방식에 대한 공부를 하게 된 계기가 되었습니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글