[Spring] Spring WebFlux 기초2

donghyeok·2023년 7월 2일

프로젝트 리액터

  • 프로젝트 리액터는 리액티브 스트림의 구현체 중 하나로 스프링의 에코시스템 범주에 포함된 프레임워크이다.
  • 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용
  • 리액터를 사용하면 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 비동기-논블로킹을 적용할 수 있다.
  • 함수형 프로그래밍의 접근 방식을 사용해 비동기-논블로킹 코드의 난해함을 어느정도 해결
  • 백프레셔를 사용해 시스템의 부하를 효율적으로 조절할 수 있다.

모노와 플럭스

  • 리액터는 리액티브 스트림의 Publisher인터페이스를 구현하는 모노(mono)플럭스(Flux) 두가지 핵심 타입을 제공한다.
  • 모노는 0..1개의 단일 요소 스트림을 통지하는 발행자이다
  • 플럭스는 0..N개로 이뤄진 다수 요소 스트림을 통지하는 발행자이다.
  • 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때까지 onNext를 사용해 구독자에게 데이터를 통지한다.

Mono를 사용한 예제

fun main() {
	val mono: Mono<String> = Mono.just("Hello World")
    mono.subscribe(::println) // "Hello world" 출력
}
  • 모노와 플러스의 연산자는 모두 Lazy하게 동작하여 subscribe를 호출하지 않으면 리액티브 스트림 사양대로 코드가 동작하지 않는다.
  • subscribe는 최종 연산자(Terminal Operator)이다.
  • Java8의 스트림에서 중간 연산자(filter, map..)들이 최종 연산자(collect, count..)가 호출되지 않으면 동작하지 않는 것과 유사하다.

Flux를 사용한 예제

fun main() {
	val iphone = Cellphone(name = "Iphone", price = 100)
    val galaxy = Cellphone(name = "Galaxy", price = 200)
    
    val flux: Flux<Cellphone> = Flux.just(iphone, galaxy)
    flux.subscribe(::println) // iphone, galaxy 객체 정보 출력 
}
  • Flux는 Mono와 다르게 다수의 요소를 통지할 수 있다.

스프링 WebFlux와 스프링 MVC 비교

스프링 MVC

  • 스프링으로 개발된 대부분의 웹 애플리케이션은 서블릿 기반의 스프링MVC이다.
  • 스프링 MVC는 동시성 처리를 전통적 웹 방식인 하나의 스레드가 하나의 요청을 처리하는 Thread per Request Model을 사용한다.
  • 이러한 모델은 DB, Network IO가 발생할 경우 결과를 받기까지 스레드가 블로킹됨.
  • 이러한 문제를 해결하기 위해 스레드풀을 사용해 동시성을 제어한다.
  • 명령어 코드 작성은 코드의 흐름을 쉽게 이해할 수 있고 디버깅하기 쉽다.
  • 대부분의 스프링 웹 애플리케이션이 스프링 MVC 기반이므로 안정성과 풍부한 라이브러리를 지원
  • JPA, JDBC와 같은 블로킹 API를 사용하는 경우 스프링 MVC를 사용하는 것이 낫다.

스프링 WebFlux

  • 스프링 WebFlux는 전동적 웹 프레임워크인 스프링MVC에 대비되는 리액티브 기반의 웹 스택 프레임워크이다.
  • 기본적으로 프로젝트 리액터 기반이며 리액티브 스트림의 다른 구현체인 RxJava나 코루틴으로도 개발이 가능하다.
  • 스프링 WebFlux는 비동기-논 블로킹으로 동작하므로 적은 수의 스레드로 대량의 동시성을 제어할 수 있다.
  • 스프링 MVC와 스프링 WebFlux의 공통점과 각각이 고유하게 지원하는 기능들
  • 함수형 앤드포인트와 애노테이션 컨트롤러 방식을 모두 지원
  • 이벤트 루프 동시성 모델
  • 스프링 MVC에 비해 러닝커브가 높은 편
  • 전 구간 비동기-논플로킹인 경우에 최적의 성능을 보여준다.
  • 스프링 MVC에서도 리액터와 WebFlux 의존성을 추가하여 리액티브 코드와 논블로킹 라이브러리를 사용할 수 있다.
  • 아래 처럼 어쩔 수 없이 블로킹 API를 사용하는 경우 별도의 스케쥴러로 동작시키는 것이 좋다.
val blockingWrapper = Mono.fromCallable {
	jpaRepository.findById(id)
}.subscribeOn(Schefulers.boundedElastic())

스프링 WebFlux 구현1 : 함수형 앤드포인트

@Component
class HelloHandler {
	fun sayHello(req: ServerRequest) : Mono<ServerResponse> {
    	return ServerResponse.ok().bodyValue("Hello WebFlux")
    }
}

data class User(val id:Long, val email: String)

@Component
class UserHandler {

	val users = listOf {
    	User(id = 1, email = "asdf@gmail.com"),
        User(id = 2, email = "asd@gmail.com")
    }
	
    fun getUser(req: ServerRequest) : Mono<ServerResponse> = 
    	users.find { req:pathVariable("id").toLong() == it.id } 
        	?.let {
            	ServerResponse.ok().bodyValue(it)
            } ?: ServerResponse.notFound().build()
        
    fun getAll(req: ServerRequest) : Mono<ServerResponse> = 
    	ServerResponse.ok.bodyValue(users)
}	

@Configuration
class Router {
	
    @Bean
    fun router(handler: HelloHandler) : RouterFunction<ServerResponse> = 
    	route()
        	.GET("/", handler::sayHello)
            .build()
            
    @Bean
    fun userRouter(handler: UserHander) : RouterFunction<ServerResponse> = 
    	router {
        	"/users".nest {
            	GET("/{id}", handler::getUser)
                GET("/", handler::getAll)
            }
        }
}
// GET '/' 호출 시, Hello WebFlux 본문 응답 
  • 위 코드 예시처럼 WebFlux의 함수형 엔드포인트는 요청을 분석해 핸들러로 라우팅하는 라우터 함수 (RouterFunction)(:=controller)와 요청 객체를 전달 받아 응답을 제공하는 핸들러 함수 (HandlerFunction)(:=service)로 구성된다.
  • 라우터 함수는 URI 패턴 매칭에 따른 분배를 위해 RouterFunctions.route()라는 유용한 빌더를 제공한다.
  • 중복된 경로를 그룹화하고 싶은 경우 중첩 라우터(Nested Router)를 사용할 수 있다.
  • 핸들러 함수는 라우터 함수로부터 전달받은 요청 객체 ServerRequest를 이용해 로직을 처리한 후 응답 객체 ServerResponse를 생성한 뒤 반환한다. (불변 객체들)

스프링 WebFlux 구현1 : 애노테이션 컨트롤러

data class Book(val id : Int, val name: String, val price: Int)

@Service
class BookService {
	val books = mutableListOf(
    	Book(id = 1, name="이름1", price = 1000),
        Book(id = 2, name="이름2", price = 2000) 
    )

	fun getAll() : Flux {
    	return Flux.fromIterable(books) //코틀린의 확장 함수로 대치 가능 books.toFlux
    }
    
    fun get(id: Int) : Mono<Book> {
    	return Mono.justOrEmpty(books.find { it.id == id}) // 확장함수 .toMono로 대치 가능 
    }	
}

@RestController
class BookController(
	private val bookService: BookService,
) {
	@GetMapping("/books")
    fun getAll(): Flux<Book> {
    	return bookService.getAll()
    }
    
    @GetMapping("/books/{id})
    fun get(@PathVariable id: Int): Mono<Book> {
  		return bookService.get(id)	
    }
}
  • 위와 같이 @Controller(:=Router), @Service(:=Handler)를 스프링 MVC에서 사용하던 것처럼 애노테이션으로 대치하여 사용할 수도 있다.
  • 애노테이션 문법을 그대로 사용하되 응답값만 Mono<>, Flux<> 형태로 응답한다.

웹클라이언트

RestTemplate

  • RestTemplate는 스프링에서 제공하는 블로킹 방식의 HttpClient이다
  • 스프링 애플리케이션에서 다른 서버와 통신할 경우 유용하게 사용된다.
  • 하지만 Spring5부터 Deprecated되어 WebClient를 사용하길 권고한다.
  • RestTemplate의 문제는 요청부터 응답까지 스레드가 블로킹되어 다른 일을 하지 못한다.
  • 만일 하나의 API에서 여러 서버의 응답을 받아 처리하는 경우 하나씩 처리하므로 응답이 느려진다 (CompletableFuture 사용 필요)

WebClient

  • WebClient는 스프링에서 제공하는 리액티브 기반의 논블로킹 HttpClient이다.
  • 논블로킹, 블로킹 방식 모두 사용이 가능하다.
  • WebClient를 사용하면 비동기처리가 가능하여 여러 서버의 응답을 받아 처리하는 경우 동시에 여러 서버로 호출하며 빠르게 처리가 가능하다.
@GetMapping("/books/nonblock")
fun getBooksNonBlockingWay(): Flux<Book> {
	val flux = WebClient.create()
    	.get()
       	.uri(url)
        .retrieve()
        .bodyToFlux(Book::class.java)
    return flux
}

스프링데이터 R2DBC

R2DBC

  • 전통적인 방식의 JDBC 드라이버는 하나의 커넥션에 하나의 스레드를 사용하는 Thread Per Connection 방식
  • 높은 처리량과 대규모 애플리케이션을 위해 비동기-논블로킹 데이터베이스 API에 대한 요구가 생김
  • 애플리케이션 로직이 비동기더라도 DB 드라이버가 JDBC라면 필연적으로 블로킹이 발생하여 100% 성능을 내기 어려웠음
  • R2DBC는 빠르게 성장중인 리액티브 기반 비동기-논블로킹 데이터베이스 드라이버
connection.createStatement("SELECT * FROM employess")
        .execute()
        .flatMap(r -> r.map((row, metadata) -> {
            Employee emp = new Employee();
            emp.setId(row.get("emp_id", Integer.class));
            emp.setName(row.get("name", String.class));
            emp.setPosition(row.get("position", String.class));
            emp.setSalary(row.get("salary", Double.class));
          return emp;
        }))
        .close()
        .subscribe();

스프링데이터 R2DBC

  • 스프링데이터 R2DBC는 R2DBC 기반의 스프링 데이터 프로젝트이다
  • 많은 ORM에서 제공하는 LazyLoadinf, Dirty-Checking, Cache 등을 지원하지 않으므로 ORM으로써의 기능은 적지만 오히려 더 심플하게 사용할 수 있다.
  • ReactiveCrudRepository는 리액티브를 지원하는 CRUD 인터페이스이다.
  • 모든 반환 타입이 Mono, Flux 같은 리액터의 Pusblisher로 이뤄져있다.
  • 이외에도 스프링데이터 MongoDB Reactive, 스프링데이터 Redis Reactive가 존재

스프링 WebFlux & 코루틴

  • 프로젝트 리액터 기반의 리액티브 프로그래밍은 비동기-논블로킹의 단점인 콜백 헬 문제를 순차적으로 동작하는 연산자를 통해 해결한다.
  • 러닝커브가 매우 높아서 코루틴을 도입하게 된다.

코루틴(Coroutine)

  • 코루틴은 코틀린에서 비동기-논블로킹 프로그래밍을 명령형 스타일로 작성할 수 있도록 도와주는 라이브러리이다.
  • 코루틴은 멀티 플랫폼을 지원하여 코틀린을 사용하는 안드로이드, 서버 등 여러 환경에서 사용할 수 있다.
  • 코루틴은 임시 중단 가능 함수(suspend function)을 통해 스레드가 실행을 잠시 중단했다가 중단 시점부터 다시 재개(resume)할 수 있다.
suspend fun combineApi() = coroutineScope {
	val response1 = async { getApi1() }
    val response2 = async { getApi2() }
    
    return ApiResult {
    	response1.await()
        response2.await()
    }
}

스프링 WebFlux의 코루틴 지원

  • 스프링 WebFlux 공식문서의 코틀린 예제들을 보면 모두 코루틴 기반의 예제를 소개하고 있다
  • 스프링 MVC, 스프링 WebFlux 모두 코루틴을 지원하며 의존성만 추가하면바로 사용 가능

리액티브의 코루틴 변환

//Mono -> suspend
fun handler(): Mono<Void> -> suspend fun handler()

//Flux -> Flow
fun handler(): Flux<T> -> fun handler(): Flow<T>

코루틴을 적용한 컨트롤러

@RestController
class UserController (
	private val userService: UserService,
    private val userDetailService: UserDetailService
) {
	@GetMapping("/{id}")
    suspend fun get(@PathVariable id: Long) : User {
    	return userService.getById(id)
	}
    
    @GetMapping("/users")
    suspend fun gets() = withContext(Dispatchers.IO) {
    	val usersDeffered = async { userService.gets() }
        val userDetailsDeffered = async { userDetailService.gets() }
        return UserList(usersDeffered.await(), userDetailDeffered.await())
    }
}

코루틴을 사용한 WebClient

val result = client.get()
		.uri("/persons/{id}", id)
        .retrieve()
        .awaitBody<Person>()

코루틴 기초

runBlocking

  • 코루틴을 생성하는 코루틴 빌더이다
  • runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때까지 스레드가 블로킹된다.
  • 일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBloking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
    - 코루틴을 지원하지 않는 경우 ex) 테스트코드, 배치 등
runBloking {
	println("Hello")
    println(Thread.currentThread().name)
}
println("world")
println(Thread.currentThread().name)

//Hello
//main @coroutine#1
//world
//main

launch

  • launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
  • launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 인자로 Unit을 반환하는 람다를 인자로 받는다.
  • delay()함수는 코루티 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep()과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단된 스레드는 코루틴 내에서 다른 일시 중단 함수를 수행한다.
  • launch를 사용해 여러 작업을 병렬적으로 수행할 수 있다.
  • launch가 반환하는 Job을 이용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다.
  • job.cancel()을 호출해 코루틴을 취소할 수 있다.
  • launch(start = CoroutineStart.LAZY)를 사용해서 start함수를 호출하는 시점에 코루틴을 동작시킬 수 있다.
fun main() = runBlocking<Unit> {
	val job1: Job = launch {
    	val timeMillis = measureTimeMillis {
        	delay(150)   
    	}
        println("async task-1 $timeMillis ms")
    }
    job1.cancel() //
    
    val job2: Job = launch(start = CoroutineStart.LAZY) {
        val timeMillis = measureTimeMillis {
			delay(100)
		}
        println("async task-2 $timeMillis ms")
    }
    println("start task-2")
    job2.start()
}

async

  • async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
  • async는 비동기 작업의 결과로 Deffered라는 특별한 인스턴스를 반환하는데 await라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다.
  • 자바 스크립트나 다른 언어의 async-await는 키워드인 경우가 보통이지만 코틀린의 코루틴은 async-await가 함수인 차이점이 있다.
fun main() = runBlocking<Unit> {
	val result1: Deferred<Int> = async {
        delay(100)
		sum(1, 3) 
    }
    println("result1 : ${result1.await()}")
    val result2: Deferred<Int> = async {
        delay(100)
		delay(100)
		sum(2, 5) 
    }
    println("result2 : ${result2.await()}")
}

suspend 함수

  • suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다.
  • suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다. (호출하는 함수로 suspend여야함)
  • suspend 함수에서 앞서 학습한 async, launch와 같은 코루틴 빌더를 사용하려면 코루틴 스코프를 사용해야 한다.
  • 코루틴 스코프를 사용하면 runBlocking과 다르게 현재 스레드가 차단되지 않고 코루틴이 동작한다.
suspend fun main() {
	doSomething()
}

fun printHello() = println("Hello")

suspend fun doSomething() = coroutineScope {
	launch {
    	delay(200)
        println("world")
    }
    
    launch {
    	printHello()
    }
}

Flow

  • Flow는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 API이다.
  • 코루틴의 suspend 함수는 단일 값을 비동기로 반환하지만 Flow를 사용하면 여러개의 값을 반환할 수 있다.
  • 리액티브 스트림과 같이 최종 연산자collect를 호출하지 않으면 아무런 일도 일어나지 않는다.

fun main() = runBlocking<Unit> {
    val flow = simple()
    flow.collect { value -> println(value) }
}
fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
		delay(100)
		emit(i) 
     }
}
//Flow started
//1
//2
//3 

0개의 댓글