프로젝트 리액터
- 프로젝트 리액터는 리액티브 스트림의 구현체 중 하나로 스프링의 에코시스템 범주에 포함된 프레임워크이다.
- 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용
- 리액터를 사용하면 애플리케이션에 리액티브 프로그래밍을 적용할 수 있고 비동기-논블로킹을 적용할 수 있다.
- 함수형 프로그래밍의 접근 방식을 사용해 비동기-논블로킹 코드의 난해함을 어느정도 해결
- 백프레셔를 사용해 시스템의 부하를 효율적으로 조절할 수 있다.
모노와 플럭스
- 리액터는 리액티브 스트림의
Publisher인터페이스를 구현하는 모노(mono)와 플럭스(Flux) 두가지 핵심 타입을 제공한다.
- 모노는 0..1개의
단일 요소 스트림을 통지하는 발행자이다
- 플럭스는 0..N개로 이뤄진
다수 요소 스트림을 통지하는 발행자이다.
- 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때까지 onNext를 사용해 구독자에게 데이터를 통지한다.
Mono를 사용한 예제
fun main() {
val mono: Mono<String> = Mono.just("Hello World")
mono.subscribe(::println)
}
- 모노와 플러스의 연산자는 모두 Lazy하게 동작하여 subscribe를 호출하지 않으면 리액티브 스트림 사양대로 코드가 동작하지 않는다.
- 즉
subscribe는 최종 연산자(Terminal Operator)이다.
- Java8의 스트림에서 중간 연산자(filter, map..)들이 최종 연산자(collect, count..)가 호출되지 않으면 동작하지 않는 것과 유사하다.
Flux를 사용한 예제
fun main() {
val iphone = Cellphone(name = "Iphone", price = 100)
val galaxy = Cellphone(name = "Galaxy", price = 200)
val flux: Flux<Cellphone> = Flux.just(iphone, galaxy)
flux.subscribe(::println)
}
- Flux는 Mono와 다르게 다수의 요소를 통지할 수 있다.
스프링 WebFlux와 스프링 MVC 비교
스프링 MVC
- 스프링으로 개발된 대부분의 웹 애플리케이션은 서블릿 기반의 스프링MVC이다.
- 스프링 MVC는 동시성 처리를 전통적 웹 방식인 하나의 스레드가 하나의 요청을 처리하는
Thread per Request Model을 사용한다.
- 이러한 모델은 DB, Network IO가 발생할 경우 결과를 받기까지 스레드가 블로킹됨.
- 이러한 문제를 해결하기 위해 스레드풀을 사용해 동시성을 제어한다.
- 명령어 코드 작성은 코드의 흐름을 쉽게 이해할 수 있고 디버깅하기 쉽다.
- 대부분의 스프링 웹 애플리케이션이 스프링 MVC 기반이므로 안정성과 풍부한 라이브러리를 지원
- JPA, JDBC와 같은 블로킹 API를 사용하는 경우 스프링 MVC를 사용하는 것이 낫다.
스프링 WebFlux
- 스프링 WebFlux는 전동적 웹 프레임워크인 스프링MVC에 대비되는 리액티브 기반의 웹 스택 프레임워크이다.
- 기본적으로 프로젝트 리액터 기반이며 리액티브 스트림의 다른 구현체인 RxJava나 코루틴으로도 개발이 가능하다.
- 스프링 WebFlux는 비동기-논 블로킹으로 동작하므로 적은 수의 스레드로 대량의 동시성을 제어할 수 있다.
- 스프링 MVC와 스프링 WebFlux의 공통점과 각각이 고유하게 지원하는 기능들
- 함수형 앤드포인트와 애노테이션 컨트롤러 방식을 모두 지원
- 이벤트 루프 동시성 모델
- 스프링 MVC에 비해 러닝커브가 높은 편
- 전 구간 비동기-논플로킹인 경우에 최적의 성능을 보여준다.
- 스프링 MVC에서도 리액터와 WebFlux 의존성을 추가하여 리액티브 코드와 논블로킹 라이브러리를 사용할 수 있다.
- 아래 처럼 어쩔 수 없이 블로킹 API를 사용하는 경우 별도의 스케쥴러로 동작시키는 것이 좋다.
val blockingWrapper = Mono.fromCallable {
jpaRepository.findById(id)
}.subscribeOn(Schefulers.boundedElastic())
스프링 WebFlux 구현1 : 함수형 앤드포인트
@Component
class HelloHandler {
fun sayHello(req: ServerRequest) : Mono<ServerResponse> {
return ServerResponse.ok().bodyValue("Hello WebFlux")
}
}
data class User(val id:Long, val email: String)
@Component
class UserHandler {
val users = listOf {
User(id = 1, email = "asdf@gmail.com"),
User(id = 2, email = "asd@gmail.com")
}
fun getUser(req: ServerRequest) : Mono<ServerResponse> =
users.find { req:pathVariable("id").toLong() == it.id }
?.let {
ServerResponse.ok().bodyValue(it)
} ?: ServerResponse.notFound().build()
fun getAll(req: ServerRequest) : Mono<ServerResponse> =
ServerResponse.ok.bodyValue(users)
}
@Configuration
class Router {
@Bean
fun router(handler: HelloHandler) : RouterFunction<ServerResponse> =
route()
.GET("/", handler::sayHello)
.build()
@Bean
fun userRouter(handler: UserHander) : RouterFunction<ServerResponse> =
router {
"/users".nest {
GET("/{id}", handler::getUser)
GET("/", handler::getAll)
}
}
}
- 위 코드 예시처럼 WebFlux의 함수형 엔드포인트는 요청을 분석해 핸들러로 라우팅하는
라우터 함수 (RouterFunction)(:=controller)와 요청 객체를 전달 받아 응답을 제공하는 핸들러 함수 (HandlerFunction)(:=service)로 구성된다.
- 라우터 함수는 URI 패턴 매칭에 따른 분배를 위해
RouterFunctions.route()라는 유용한 빌더를 제공한다.
- 중복된 경로를 그룹화하고 싶은 경우
중첩 라우터(Nested Router)를 사용할 수 있다.
- 핸들러 함수는 라우터 함수로부터 전달받은 요청 객체
ServerRequest를 이용해 로직을 처리한 후 응답 객체 ServerResponse를 생성한 뒤 반환한다. (불변 객체들)
스프링 WebFlux 구현1 : 애노테이션 컨트롤러
data class Book(val id : Int, val name: String, val price: Int)
@Service
class BookService {
val books = mutableListOf(
Book(id = 1, name="이름1", price = 1000),
Book(id = 2, name="이름2", price = 2000)
)
fun getAll() : Flux {
return Flux.fromIterable(books)
}
fun get(id: Int) : Mono<Book> {
return Mono.justOrEmpty(books.find { it.id == id})
}
}
@RestController
class BookController(
private val bookService: BookService,
) {
@GetMapping("/books")
fun getAll(): Flux<Book> {
return bookService.getAll()
}
@GetMapping("/books/{id})
fun get(@PathVariable id: Int): Mono<Book> {
return bookService.get(id)
}
}
- 위와 같이
@Controller(:=Router), @Service(:=Handler)를 스프링 MVC에서 사용하던 것처럼 애노테이션으로 대치하여 사용할 수도 있다.
- 애노테이션 문법을 그대로 사용하되 응답값만
Mono<>, Flux<> 형태로 응답한다.
웹클라이언트
RestTemplate
RestTemplate는 스프링에서 제공하는 블로킹 방식의 HttpClient이다
- 스프링 애플리케이션에서 다른 서버와 통신할 경우 유용하게 사용된다.
- 하지만 Spring5부터 Deprecated되어 WebClient를 사용하길 권고한다.
- RestTemplate의 문제는 요청부터 응답까지 스레드가 블로킹되어 다른 일을 하지 못한다.
- 만일 하나의 API에서 여러 서버의 응답을 받아 처리하는 경우 하나씩 처리하므로 응답이 느려진다 (CompletableFuture 사용 필요)
WebClient
WebClient는 스프링에서 제공하는 리액티브 기반의 논블로킹 HttpClient이다.
- 논블로킹, 블로킹 방식 모두 사용이 가능하다.
- WebClient를 사용하면 비동기처리가 가능하여 여러 서버의 응답을 받아 처리하는 경우 동시에 여러 서버로 호출하며 빠르게 처리가 가능하다.
@GetMapping("/books/nonblock")
fun getBooksNonBlockingWay(): Flux<Book> {
val flux = WebClient.create()
.get()
.uri(url)
.retrieve()
.bodyToFlux(Book::class.java)
return flux
}
스프링데이터 R2DBC
R2DBC
- 전통적인 방식의
JDBC 드라이버는 하나의 커넥션에 하나의 스레드를 사용하는 Thread Per Connection 방식
- 높은 처리량과 대규모 애플리케이션을 위해 비동기-논블로킹 데이터베이스 API에 대한 요구가 생김
- 애플리케이션 로직이 비동기더라도 DB 드라이버가 JDBC라면 필연적으로 블로킹이 발생하여 100% 성능을 내기 어려웠음
R2DBC는 빠르게 성장중인 리액티브 기반 비동기-논블로킹 데이터베이스 드라이버
connection.createStatement("SELECT * FROM employess")
.execute()
.flatMap(r -> r.map((row, metadata) -> {
Employee emp = new Employee();
emp.setId(row.get("emp_id", Integer.class));
emp.setName(row.get("name", String.class));
emp.setPosition(row.get("position", String.class));
emp.setSalary(row.get("salary", Double.class));
return emp;
}))
.close()
.subscribe();
스프링데이터 R2DBC
스프링데이터 R2DBC는 R2DBC 기반의 스프링 데이터 프로젝트이다
- 많은 ORM에서 제공하는 LazyLoadinf, Dirty-Checking, Cache 등을 지원하지 않으므로 ORM으로써의 기능은 적지만 오히려 더 심플하게 사용할 수 있다.
ReactiveCrudRepository는 리액티브를 지원하는 CRUD 인터페이스이다.
- 모든 반환 타입이 Mono, Flux 같은 리액터의 Pusblisher로 이뤄져있다.
- 이외에도 스프링데이터 MongoDB Reactive, 스프링데이터 Redis Reactive가 존재
스프링 WebFlux & 코루틴
- 프로젝트 리액터 기반의 리액티브 프로그래밍은 비동기-논블로킹의 단점인 콜백 헬 문제를 순차적으로 동작하는 연산자를 통해 해결한다.
- 러닝커브가 매우 높아서 코루틴을 도입하게 된다.
코루틴(Coroutine)
- 코루틴은 코틀린에서 비동기-논블로킹 프로그래밍을 명령형 스타일로 작성할 수 있도록 도와주는 라이브러리이다.
- 코루틴은 멀티 플랫폼을 지원하여 코틀린을 사용하는 안드로이드, 서버 등 여러 환경에서 사용할 수 있다.
- 코루틴은
임시 중단 가능 함수(suspend function)을 통해 스레드가 실행을 잠시 중단했다가 중단 시점부터 다시 재개(resume)할 수 있다.
suspend fun combineApi() = coroutineScope {
val response1 = async { getApi1() }
val response2 = async { getApi2() }
return ApiResult {
response1.await()
response2.await()
}
}
스프링 WebFlux의 코루틴 지원
- 스프링 WebFlux 공식문서의 코틀린 예제들을 보면 모두 코루틴 기반의 예제를 소개하고 있다
- 스프링 MVC, 스프링 WebFlux 모두 코루틴을 지원하며 의존성만 추가하면바로 사용 가능
리액티브의 코루틴 변환
fun handler(): Mono<Void> -> suspend fun handler()
fun handler(): Flux<T> -> fun handler(): Flow<T>
코루틴을 적용한 컨트롤러
@RestController
class UserController (
private val userService: UserService,
private val userDetailService: UserDetailService
) {
@GetMapping("/{id}")
suspend fun get(@PathVariable id: Long) : User {
return userService.getById(id)
}
@GetMapping("/users")
suspend fun gets() = withContext(Dispatchers.IO) {
val usersDeffered = async { userService.gets() }
val userDetailsDeffered = async { userDetailService.gets() }
return UserList(usersDeffered.await(), userDetailDeffered.await())
}
}
코루틴을 사용한 WebClient
val result = client.get()
.uri("/persons/{id}", id)
.retrieve()
.awaitBody<Person>()
코루틴 기초
runBlocking
- 코루틴을 생성하는
코루틴 빌더이다
- runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때까지 스레드가 블로킹된다.
- 일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBloking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
- 코루틴을 지원하지 않는 경우 ex) 테스트코드, 배치 등
runBloking {
println("Hello")
println(Thread.currentThread().name)
}
println("world")
println(Thread.currentThread().name)
launch
launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
- launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 인자로 Unit을 반환하는 람다를 인자로 받는다.
delay()함수는 코루티 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep()과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단된 스레드는 코루틴 내에서 다른 일시 중단 함수를 수행한다.
- launch를 사용해 여러 작업을 병렬적으로 수행할 수 있다.
- launch가 반환하는 Job을 이용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다.
job.cancel()을 호출해 코루틴을 취소할 수 있다.
launch(start = CoroutineStart.LAZY)를 사용해서 start함수를 호출하는 시점에 코루틴을 동작시킬 수 있다.
fun main() = runBlocking<Unit> {
val job1: Job = launch {
val timeMillis = measureTimeMillis {
delay(150)
}
println("async task-1 $timeMillis ms")
}
job1.cancel()
val job2: Job = launch(start = CoroutineStart.LAZY) {
val timeMillis = measureTimeMillis {
delay(100)
}
println("async task-2 $timeMillis ms")
}
println("start task-2")
job2.start()
}
async
async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
- async는 비동기 작업의 결과로
Deffered라는 특별한 인스턴스를 반환하는데 await라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다.
- 자바 스크립트나 다른 언어의 async-await는
키워드인 경우가 보통이지만 코틀린의 코루틴은 async-await가 함수인 차이점이 있다.
fun main() = runBlocking<Unit> {
val result1: Deferred<Int> = async {
delay(100)
sum(1, 3)
}
println("result1 : ${result1.await()}")
val result2: Deferred<Int> = async {
delay(100)
delay(100)
sum(2, 5)
}
println("result2 : ${result2.await()}")
}
suspend 함수
- suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다.
- suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다. (호출하는 함수로 suspend여야함)
- suspend 함수에서 앞서 학습한 async, launch와 같은 코루틴 빌더를 사용하려면
코루틴 스코프를 사용해야 한다.
- 코루틴 스코프를 사용하면 runBlocking과 다르게 현재 스레드가 차단되지 않고 코루틴이 동작한다.
suspend fun main() {
doSomething()
}
fun printHello() = println("Hello")
suspend fun doSomething() = coroutineScope {
launch {
delay(200)
println("world")
}
launch {
printHello()
}
}
Flow
Flow는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 API이다.
- 코루틴의 suspend 함수는 단일 값을 비동기로 반환하지만 Flow를 사용하면 여러개의 값을 반환할 수 있다.
- 리액티브 스트림과 같이
최종 연산자인 collect를 호출하지 않으면 아무런 일도 일어나지 않는다.
fun main() = runBlocking<Unit> {
val flow = simple()
flow.collect { value -> println(value) }
}
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}