Spring WebFlux는 비동기 논블로킹 방식으로 동작하는 웹 프레임워크입니다.
특히, Reactive Streams 기반으로 대규모 데이터 처리나 이벤트 스트림에 적합합니다.
이번 스터디를 통해서 포스팅을 하게되었습니다.
Flow, Flux, Mono를 사용한 데이터 스트리밍과 비동기 데이터 처리 예제를 통해 WebFlux의 강력한 기능을 소개하겠습니다.
먼저 사용자 데이터를 변환하는 서비스와 스트리밍 데이터를 생성하는 로직입니다.
Kotlin의 Flow를 사용해 데이터 스트림을 구성하고, map 연산자를 통해 데이터를 변환합니다.
package com.monos.service
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import org.springframework.stereotype.Service
// 사용자 데이터 모델
data class User(val name: String)
@Service
class UserService {
// Flow를 통해 사용자 데이터를 변환
fun processUsers(userList: Flow<User>): Flow<User> {
return userList.map { user ->
user.copy(name = user.name.uppercase()) // 이름을 대문자로 변환
}
}
// 스트리밍 데이터 생성
fun createUserStream(): Flow<String> {
return flow {
for (i in 1..10) {
kotlinx.coroutines.delay(1000) // 1초 대기
emit("Hello from duplex communication! $i")
}
}
}
}
HTTP 요청에서 데이터를 비동기적으로 읽고 처리하거나, Flow를 활용해 데이터 스트림을 처리하는 로직입니다.
package com.monos.handler
import com.monos.service.UserService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.io.buffer.DataBufferUtils
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.nio.charset.StandardCharsets
@Component
class RequestHandler(private val userService: UserService) {
// HTTP Body를 읽어 처리
suspend fun handleDataBufferRequest(
httpReq: ServerHttpRequest,
httpRes: ServerHttpResponse
): Mono<Void> {
return httpReq.body
.publishOn(Schedulers.parallel()) // 병렬 처리 스케줄러 사용
.flatMap { dataBuffer ->
Mono.fromCallable {
val byteArray = ByteArray(dataBuffer.readableByteCount())
dataBuffer.read(byteArray)
DataBufferUtils.release(dataBuffer)
String(byteArray, StandardCharsets.UTF_8)
}
}
.doOnNext { processedData ->
println("Received: $processedData") // 수신된 데이터 로그 출력
}
.then(Mono.empty())
}
// 사용자 Flow를 처리
fun handleUserFlowRequest(userList: Flow<User>): Flow<User> {
return userService.processUsers(userList)
}
}
서비스와 핸들러를 이용해 요청을 처리하는 REST 엔드포인트를 정의합니다.
package com.monos.controller
import com.monos.handler
import com.monos.service
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlux
import org.springframework.http.MediaType
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@RestController
class ReactiveController(
private val requestHandler: RequestHandler,
private val userService: UserService
) {
// 사용자 Flow 처리
@PostMapping("/request/flow")
fun processUserFlow(@RequestBody userList: Flow<User>): Flow<User> {
return requestHandler.handleUserFlowRequest(userList)
}
// 스트리밍 데이터 전송
@GetMapping("/hello", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamUserData(): Flux<String> {
return userService.createUserStream().asFlux()
}
// HTTP Body 데이터 처리
@PostMapping("/process-data")
suspend fun processData(
httpReq: ServerHttpRequest,
httpRes: ServerHttpResponse
): Mono<Void> {
return requestHandler.handleDataBufferRequest(httpReq, httpRes)
}
}
사용자 데이터 변환
[
{"name": "Alice"},
{"name": "Bob"}
]
[
{"name": "ALICE"},
{"name": "BOB"}
]
Hello from duplex communication! 1
Hello from duplex communication! 2
...
Received: {"example":"data"}
Spring WebFlux는 비동기 데이터 처리에 강력한 도구를 제공합니다.
특히, 데이터 스트림을 Flow, Flux를 통해 처리하며 효율적이고 반응적인 애플리케이션을 개발할 수 있습니다.
이번 스터디 처럼 WebFlux를 활용해 다양한 비동기 작업과 데이터 스트리밍에 대해 좀더 공부하는 시간 덕분에 많은것을 배우게 되었습니다.