WebFlux로 비동기 데이터 스트림과 실시간 통신 구현하기

궁금하면 500원·2024년 11월 29일
0

Spring WebFlux: 비동기 데이터 처리와 스트리밍

Spring WebFlux는 비동기 논블로킹 방식으로 동작하는 웹 프레임워크입니다.

특히, Reactive Streams 기반으로 대규모 데이터 처리나 이벤트 스트림에 적합합니다.
이번 스터디를 통해서 포스팅을 하게되었습니다.

Flow, Flux, Mono를 사용한 데이터 스트리밍과 비동기 데이터 처리 예제를 통해 WebFlux의 강력한 기능을 소개하겠습니다.

1. 데이터 처리 서비스 (UserService)

먼저 사용자 데이터를 변환하는 서비스와 스트리밍 데이터를 생성하는 로직입니다.

Kotlin의 Flow를 사용해 데이터 스트림을 구성하고, map 연산자를 통해 데이터를 변환합니다.

package com.monos.service

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import org.springframework.stereotype.Service

// 사용자 데이터 모델
data class User(val name: String)

@Service
class UserService {

    // Flow를 통해 사용자 데이터를 변환
    fun processUsers(userList: Flow<User>): Flow<User> {
        return userList.map { user ->
            user.copy(name = user.name.uppercase()) // 이름을 대문자로 변환
        }
    }

    // 스트리밍 데이터 생성
    fun createUserStream(): Flow<String> {
        return flow {
            for (i in 1..10) {
                kotlinx.coroutines.delay(1000) // 1초 대기
                emit("Hello from duplex communication! $i")
            }
        }
    }
}

주요 포인트

  • Flow: Kotlin의 비동기 데이터 스트림 처리 클래스. 하나씩 데이터를 방출하고 처리할 수 있습니다.
  • map 연산자: 데이터를 변환하는 데 사용됩니다.
  • delay: 비동기 작업의 지연을 나타냅니다.

2. 요청 핸들러 (RequestHandler)

HTTP 요청에서 데이터를 비동기적으로 읽고 처리하거나, Flow를 활용해 데이터 스트림을 처리하는 로직입니다.

package com.monos.handler

import com.monos.service.UserService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.io.buffer.DataBufferUtils
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.nio.charset.StandardCharsets

@Component
class RequestHandler(private val userService: UserService) {

    // HTTP Body를 읽어 처리
    suspend fun handleDataBufferRequest(
        httpReq: ServerHttpRequest,
        httpRes: ServerHttpResponse
    ): Mono<Void> {
        return httpReq.body
            .publishOn(Schedulers.parallel()) // 병렬 처리 스케줄러 사용
            .flatMap { dataBuffer ->
                Mono.fromCallable {
                    val byteArray = ByteArray(dataBuffer.readableByteCount())
                    dataBuffer.read(byteArray)
                    DataBufferUtils.release(dataBuffer)
                    String(byteArray, StandardCharsets.UTF_8)
                }
            }
            .doOnNext { processedData -> 
                println("Received: $processedData") // 수신된 데이터 로그 출력
            }
            .then(Mono.empty())
    }

    // 사용자 Flow를 처리
    fun handleUserFlowRequest(userList: Flow<User>): Flow<User> {
        return userService.processUsers(userList)
    }
}

주요 포인트

  • Mono: HTTP 요청의 완료를 나타냅니다.
  • Schedulers.parallel(): 병렬 스레드 풀을 활용해 데이터 처리 속도를 높입니다.
  • DataBufferUtils.release: 메모리 누수를 방지하기 위해 사용됩니다.

3. REST 컨트롤러 (ReactiveController)

서비스와 핸들러를 이용해 요청을 처리하는 REST 엔드포인트를 정의합니다.

package com.monos.controller

import com.monos.handler
import com.monos.service
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlux
import org.springframework.http.MediaType
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.web.bind.annotation.*
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@RestController
class ReactiveController(
    private val requestHandler: RequestHandler,
    private val userService: UserService
) {
    // 사용자 Flow 처리
    @PostMapping("/request/flow")
    fun processUserFlow(@RequestBody userList: Flow<User>): Flow<User> {
        return requestHandler.handleUserFlowRequest(userList)
    }

    // 스트리밍 데이터 전송
    @GetMapping("/hello", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun streamUserData(): Flux<String> {
        return userService.createUserStream().asFlux()
    }

    // HTTP Body 데이터 처리
    @PostMapping("/process-data")
    suspend fun processData(
        httpReq: ServerHttpRequest,
        httpRes: ServerHttpResponse
    ): Mono<Void> {
        return requestHandler.handleDataBufferRequest(httpReq, httpRes)
    }
}

주요 포인트

  • @PostMapping: HTTP POST 요청을 처리합니다.
  • @GetMapping: HTTP GET 요청을 처리하며 스트리밍 데이터를 반환합니다.
  • TEXT_EVENT_STREAM_VALUE: 서버가 클라이언트에 이벤트 스트림을 지속적으로 전송할 때 사용되는 MIME 타입입니다.

4. 실행 결과

사용자 데이터 변환

  • API Endpoint: /request/flow

  • 입력데이터 (Json)

[
    {"name": "Alice"},
    {"name": "Bob"}
]
  • 출력데이터 (Json)

[
    {"name": "ALICE"},
    {"name": "BOB"}
]

실시간 스트리밍

  • API Endpoint: /hello

  • 결과 (1초 간격 출력)

Hello from duplex communication! 1
Hello from duplex communication! 2
...

HTTP Body 처리

  • API Endpoint: /process-data

  • 로그 출력

Received: {"example":"data"}

결론

Spring WebFlux는 비동기 데이터 처리에 강력한 도구를 제공합니다.

특히, 데이터 스트림을 Flow, Flux를 통해 처리하며 효율적이고 반응적인 애플리케이션을 개발할 수 있습니다.

이번 스터디 처럼 WebFlux를 활용해 다양한 비동기 작업과 데이터 스트리밍에 대해 좀더 공부하는 시간 덕분에 많은것을 배우게 되었습니다.

profile
꾸준히, 의미있는 사이드 프로젝트 경험과 문제해결 과정을 기록하기 위한 공간입니다.

0개의 댓글