Mono, Flux를 사용하면서 리액티브라는 용어를 정확히 알고있다고 생각하지 않아, 기본 개념부터 하나 씩 이해해보려고 한다.
JS를 할때는 V8 engine이 어떻게 이벤트 루프를 처리하는지 등의 내용을 읽어봤는데, Java의 동작 방식은 잘 나와있지 않아 이에대한 궁금증도 풀어보려한다.
JavaScript로 시작하면서 Async/Sync, Blocking/Non-blocking에 대한 개념은 학습했으나, Java로 언어를 바꾸면서 Asnyc annotation을 사용하거나 Multi threading이 아직은 어색했다.
Mono, Flux API를 사용하면 요청을 Non blocking으로 활용할 수 있다.
변화에 반응하는 것을 중심에 두고 만든 프로그래밍 모델로 데이터를 비동기적으로 처리하고 이벤트 기반 아키텍쳐를 통해 실시간으로 데이터의 변화에 반응할 수 있게 프로그래밍을 할 수 있다.
Reactive stream의 주 목적은 subscriber가 publisher의 데이터 생산 속도를 제어하는데 있다.
backpressure : publisher의 생산 속도를 subscriber가 따라잡지 못하는 상황을 말한다. Reactor에서의 Backpressure 처리 방식은 뒤에서 알아본다.
spring-web
module은 Spring-Webflux에 기본이 되는 reactive foundation (HTTP 추상화, Reactive Stream adapter)를 가지고 있다.
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);
// RouterFunction은 들어오는 요청을 핸들러 함수로 라우팅한다.
// 라우터 함수가 매칭되면 핸들러 함수를 반환하고, 매칭되는 것이 없으면 빈 Mono를 반환
RouterFunction<ServerResponse> route = route()
.GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson)
.GET("/person", accept(APPLICATION_JSON), handler::listPeople)
.POST("/person", handler::createPerson)
.build();
public class PersonHandler {
private final PersonRepository repository;
public PersonHandler(PersonRepository repository) {
this.repository = repository;
}
public Mono<ServerResponse> listPeople(ServerRequest request) { (1)
Flux<Person> people = repository.allPeople();
return ok().contentType(APPLICATION_JSON).body(people, Person.class);
}
public Mono<ServerResponse> createPerson(ServerRequest request) { (2)
Mono<Person> person = request.bodyToMono(Person.class);
return ok().build(repository.savePerson(person));
}
public Mono<ServerResponse> getPerson(ServerRequest request) { (3)
int personId = Integer.valueOf(request.pathVariable("id"));
return repository.getPerson(personId)
.flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
제일 많이 봤던 그림
SpringApplication.setWebApplicationType(WebApplicationType.REACTIVE)
Spring MVC와 Spring Webflux가 둘 다 annotated controller를 지원한하고 해도, 동시성 모델과 blocking, thread의 기본 전략이 다르다.
궁금했던 내용이 나와서 너무 기쁨...
Invoking a Blocking API
blocking 라이브러리를 사용하려면, Reactor/RxJava 모두 다른 스레드로 요청을 처리해주는 publishOn operator를 지원한다.
Mutable State
Reactive pipeline은 구분된 환경에서 data가 sequential하게 처리된다.
파이프라인 안의 코드는 절대 동시에 실행되지 않아 mutable state를 신경쓰지 않아도 된다.
Threading Model
그러면 Spring Webflux를 사용하는 Application은 무슨 스레드를 얼마나 실행하고 있을까
reactor-http-nio-
로 시작하는 쓰레드를 볼 수 있다.CPU 사용이 많은 작업 및 Blocking IO가 많은 경우
-> runnable 한 thread가 CPU를 점유하고 있으면 이벤트 루프가 이벤트 queue에 있는 작업을 처리할 수 없다.
앞서 Backpressure는 Publisher가 빠르게 데이터 emit 시 Subscriber의 처리 속도가 느려서 처리되지 못한 데이터가 계속 쌓이는 현상을 해결하려는 것이다.
방법 1. Subscriber가 처리할 수 있는 수준의 양을 Publisher에게 알려주는 방식 -> subscriber가 이벤트 처리에 주도권을 가진다.
방법2. Backpressure 전략을 사용한다 (버퍼에 가득차면 Exception 발생시키기, 버퍼 밖에 대기하는 데이터 Drop 시키기 등..)
Netty의 Event loop가 어떻게 생겼는지 볼 차례다.
참고
https://mumomu.tistory.com/176
https://velog.io/@youngmin-mo/%EC%96%B4%EB%96%BB%EA%B2%8C-%EB%8B%A4%EB%A5%B8-%EC%84%9C%EB%B2%84%EC%99%80-%ED%86%B5%EC%8B%A0%ED%95%A0%EA%B9%8C%EC%9A%94
https://madplay.github.io/post/spring-webflux-references-functional-endpoints
https://www.youtube.com/watch?v=I0zMm6wIbRI
https://dzone.com/articles/spring-webflux-eventloop-vs-thread-per-request-mod
https://stackoverflow.com/questions/51377675/dont-spring-boot-starter-web-and-spring-boot-starter-webflux-work-together