저번 포스트에 이어서 Spring Reactive Web 의존성을 추가하여 단일 스레드 + 비동기/논블로킹 통신을 알아보자.
WebFlux를 사용하기 위해 필수 의존성을 추가한다.
# build.gradle의 dependencies안에 추가
implementation 'org.springframework.boot:spring-boot-starter-webflux'
# build.gradle의 dependencies안에 추가
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
Spring Reactive Web은 Spring MVC에서 사용하는 Tomcat서버를 사용하지 않고 Netty서버를 사용한다.
Netty 는 비동기 이벤트 기반 네티워크 애플리케이션 프레임워크로, 1개의 이벤트에 대하여 다수의 Worker 스레드로 동작하게 된다.
반면, Tomcat은 블로킹/동기 방식을 사용한다.
WebFlux를 사용하기 위한 간단한 Customer
객체를 하나 생성하자.
@RequiredArgsConstructor
@Data
public class Customer {
@Id
private Long id;
private final String firstName;
private final String lastName;
}
그저, 이름을 받는 객체이다.
그리고, Customer
를 CRUD하기 위한 저장소도 생성한다.
public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> { // 2)
@Query("SELECT * FROM customer WHERE last_name = :lastname") // 쿼리문
Flux<Customer> findByLastName(String lastName); // 1)
}
1) JPA와 같이 find
메서드와 save
메서드 등을 제공한다.
2) JPA와 다르게 ReactiveCrudRepository
로 확장한다.
Customer를 저장하기 위한 테이블을 만든다.
DROP TABLE IF EXISTS CUSTOMER;
CREATE TABLE customer (
id SERIAL PRIMARY KEY,
first_name VARCHAR(255),
last_name VARCHAR(255)
);
DataBase의 값을 조회하기 위해 초기 설정을 해준다.
@Slf4j
@Configuration
public class DBInit {
@Bean
ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) { // 1)
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
initializer.setConnectionFactory(connectionFactory);
initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));
return initializer;
}
@Bean
public CommandLineRunner demo(CustomerRepository repository) { // 2)
return (args) -> {
// save a few customers
repository.saveAll(Arrays.asList(new Customer("Jack", "Bauer"),
new Customer("Chloe", "O'Brian"),
new Customer("Kim", "Bauer"),
new Customer("David", "Palmer"),
new Customer("Michelle", "Dessler")))
.blockLast(Duration.ofSeconds(10));
};
}
}
프로젝트를 빌드할 때, 데이터베이스의 테이블을 생성하고 초기 값을 넣어주기 위해 initailizer
메서드와 demo
메서드를 Beam으로 등록한다.
클라이언트로부터 요청을 받기 위해 Controller
를 생성한다.
@RestController
public class CustomerController {
private final CustomerRepository customerRepository;
public CustomerController(CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
@GetMapping("/customer")
public Flux<Customer> findAll() { // 1)
return customerRepository.findAll().delayElements(Duration.ofSeconds(1)).log();
}
}
1) customerRepository
에 있는 customer 정보를 전부 불러온다.
Spring MVC와 다른 점은, 만약 Spring MVC로 구현했다면 5명의 Customer 데이터를 한번에 가져온 다음 응답한다.
반면, Spring WebFlux는 onNext()
를 통해 Customer 데이터를 1개 가져오면 가져온 값을 브라우저로 바로 전달한다.
마지막 onComplete()
메서드가 실행되면서 응답을 완료한다.
하지만, 실제 개발에서는 딜레이를 걸지 않기 때문에 위 코드만 보면 MVC와 별 차이가 없어 보인다. (실제론 단일 스레드로 처리하기 때문에 차이는 존재한다.)
이 차이를 명확하게 확인하기 위해, SSE 프로토콜과 함께 사용해보자.
Client가 Server로부터 데이터를 받을 수만 있는 방식을 SSE라고 한다.
이번에 내가 구현할 것은 클라이언트의 POST요청을 통해 DB에 접근하면 이벤트가 발생하여 구독하고 있던 클라이언트에게 DB로 들어간 데이터를 보여주고자 한다.
SSE 프로토콜의 간단한 흐름은 아래 그림과 함께 설명하고, 자세한 내용은 다음 포스트에서 다뤄보겠다.
이처럼, 클라이언트가 서버로 요청을 보내고, 서버에서 작성한 로직이나 이벤트가 발생하면 데이터를 즉시 클라이언트로 전달하는 방식이다.
이제, 코드로 넘어가보자.
간단한 시나리오는 다음과 같다.
/customer/sse
를 구독(SSE 프로토콜로 통신하고 {클라이언트가 여러개일 경우}각 Stream을 sink*)한다.Sink의 역할❗
두 개 이상의 Stream의 Sink를 맞춰주는 역할이다.
조금 더 쉽게 설명하면,
Client A와 Client B에서 각자 요청을 보내면 서버는 Flux A와 Flux B를 클라이언트에 맞게 응답한다.
즉, 서로 다른 Stream이 생긴다.
이때, Sink는 서로 다른 Stream을 연결하는 역할이다.
Event(Data)가 발생되면 Sink로 연결된 각 Stream으로 Event를 수신할 수 있다.
/customer/sse
는 Customer가 새로 추가되면 이벤트를 발생하고, 이벤트가 발생하면 새로 추가된 Customer의 정보를 실시간 + 비동기적으로 제공한다./customer
로 요청하여 새로운 Customer 를 저장한다./customer/sse
를 구독하는 모든 사용자에게 새로 추가된 Customer의 정보를 제공한다.@RestController
public class CustomerController {
private final CustomerRepository customerRepository;
private final Sinks.Many<Customer> sink; // 1) 추가
public CustomerController(CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
this.sink = Sinks.many().multicast().onBackpressureBuffer(); // 2) 추가
}
```
중략
```
@GetMapping("/customer/sse")
public Flux<ServerSentEvent<Customer>> findAllSSE() { // 3)
// 4)
return sink.asFlux().map(customer -> ServerSentEvent.builder(customer).build()).doOnCancel(() -> sink.asFlux().blockLast());
}
@PostMapping("/customer")
public Mono<Customer> save() {
return customerRepository.save(new Customer("Hong", "gildong")).doOnNext(sink::tryEmitNext); // 5)
}
}
1. private final Sinks.Many<Customer> sink;
Sinks.Many<Customer>
라는 객체를 선언한다.2. this.sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<Customer>
객체의 생성 옵션과 함께 추가한다.unicast()
, multicast()
, replay()
가 있다.unicast()
: 하나의 Subscriber 만 허용한다. 즉, 하나의 Client 만 연결할 수 있다.multicast()
: 여러 Subscriber 를 허용한다.replay()
: 여러 Subscriber 를 허용하되, 이전에 발행된 이벤트들을 기억해 추가로 연결되는 Subscriber 에게 전달한다.onBackpressureBuffer()
: publisher가 subcriber에 수용할 수 없는 양의 데이터를 전달하면 데이터를 Buffering 해서 저장.3. Flux<ServerSentEvent<Customer>>
findAllSSE
메서드는 비동기 스트림 형태로 Customer 객체들을 담는 Server-Sent Events를 반환한다.4. findAllSSE() 메서드와 내부 로직
Sinks.Many<Customer>
타입의 sink를 Flux<Customer>
타입으로 변환.ServerSentEvent<Customer>
의 형식으로 변환(SSE 객체).doOnCancel()
: 구독을 취소할 때 실행하는 함수sink.asFlux().blockLast()
: 구독이 취소된 후, 현재 데이터 스트림을 종료sink.asFlux().blockLast()
메서드를 사용하지 않고 클라이언트에서 종료하여 구독 취소 한다면? 👉 실질적으로 스트림이 종료되지 않아, 여전히 서버 측 리소스가 사용 중이거나 차단되어 있을 수 있어 새로운 요청이 제대로 처리되지 않는다.5. save() 메서드
글로 조금 설명을 덧붙여보자면,
처음에 /customer/sse
를 구독한 클라이언트는 하나였다.
그리고 Post방식으로 /customer
를 통해 이벤트를 발생시키니 해당 데이터가 바로 구독한 클라이언트로 전송된다.
추가로, 새로운 클라이언트가 /customer/sse
를 구독하고 다시 이벤트를 발생시키니 두 클라이언트 모두에게 데이터가 전달되는 것을 알 수 있다.
WebFlux를 공부하기 위해 뛰어들었지만, 내가 습득하지 못한 여러 개념들이 바탕되었기 때문에 어렵게 공부했다는 느낌이 들었다.
모르거나 아리송한 개념이 있으면 이해할 때까지 찾아보는 습관 덕분인지 이해는 할 수 있었다.
하지만 WebFlux에 대해 완벽한 수준은 아니라서 누군가 이 포스트르 본다면 틀린 부분이 산더미처럼 보일지도 모른다.
피드백은 언제나 환영이기 때문에, 지나가던 고수님들께서 이 글을 보신다면 꼭 피드백을 받고 싶다.
백엔드 개발을 하면서 동기/블로킹 방식의 개발을 자주 했다.
어쩌면, 웬만한 프로젝트는 동기/블로킹으로 개발했던 것 같다.
개발을 하면서도 대규모 트레픽에서 약한 모습을 종종 봐왔기 때문에 WebFlux의 비동기/논블로킹 방식이 필요하다고 생각했다.
물론, 상황마다 WebFlux와 MVC를 다뤄야할 것이다.
그렇기에, WebFlux의 중요성을 다시 한 번 알게 되었고, 두가지 모두 능숙하게 다룰 수 있는 개발자가 되는 날까지 WebFlux에 가까워 지려고 한다.
YouTube : Springboot-WebFlux
알림 기능을 구현해보자 - SSE(Server-Sent-Events)!
Accessing data with R2DBC