✏️ [Java] Spring WebFlux - 비동기 처리 이해하기

박상민·2025년 1월 9일
1

JAVA

목록 보기
4/6

AI-SW Developers 공모전이 다음주인 1월 16일에 최종 발표를 한다.
최종 발표전 지금까지 작성했던 코드를 분석하면서 공부를 하고 있는데 이해가 부족한 부분들이 보였다. 오늘은 몇 가지 키워드와 함께 Spring WebFlux와 비동기 처리를 공부하려고 한다.

공부 키워드
Spring WebFlux, Redis 퍼블리싱, WebClient, Reactor, Mono 타입, 리액티브 프로그래밍

⭐️ Spring WebFlux

Why was Spring Webflux created?
Spring Webflux가 만들어진 이유 중 하나는 적은 수의 스레드로 동시성을 처리하고 더 적은 하드웨어 리소스로 확장하기 위해 비동기(non-blocking) 웹 스택이 필요했기 때문이다.

Servlet non-blocking I/O는 동기적(synchronous)으로 동작하거나 블로킹(blocking) 방식으로 설계된 Servlet API(예: Filter, Servlet, getParameter, getPart)와는 방향성이 다르다. 이는 특히 Netty와 같은 비동기, 비차단 방식의 서버에서 중요한 역할을 한다.

두번째 이유는 함수형 프로그래밍이다.
Java 5에서 애너테이션이 추가되면서 애너테이션 기반 REST 컨트롤러나 단위 테스트 같은 새로운 기회가 생긴 것처럼, Java 8의 람다 표현식 추가는 Java에서 함수형 API를 구현할 기회를 열어주었다.
이러한 변화는 비동기 애플리케이션과 Continuation 스타일 API에 큰 이점을 제공한다.
프로그래밍 모델 관점에서 Java 8은 Spring WebFlux가 애너테이션 기반 컨트롤러와 더불어 함수형 웹 엔드포인트를 제공할 수 있게 했다.

요약하면, Spring WebFlux는 비동기적 확장성과 함수형 프로그래밍의 가능성을 활용하기 위해 만들어진 프레임워크라는 의미이다.

WebFlux 주요 특징
WebFlux는 '반응형 프로그래밍(Reative Programming)'을 구현하기 위해 Reactor라는 라이브러리를 이용한다. Reactor는 Netty 서버를 통해 비동기식 이벤트 기반의 서버 환경을 제공하며, 이를 이용하여 비동기 방식의 non-blocking 요청을 통해 이벤트 기반의 반응형 스트림으로 데이터를 주고받는다.

📌 반응형 프로그래밍(Reative Programming)

반응형 프로그래밍(Reative Programming)

  • Spring WebFlux는 반응형 프로그래밍(Reactive Programming) 방식을 통해 이벤트 기반의 비동기식 애플리케이션을 구축할 수 있다.
  • 이 이벤트는 비동기적으로 처리되며 새로운 이벤트가 발생하면 이벤트 스트림이 생성되며, 스트림을 구족하면 이벤트를 처리할 수 있다.

📌 Reactor

반응형 프로그래밍(Reactive Programming)을 구현하기 위한 Reactor는 Reactive 라이브러리 중 하나이다.
Publisher-Subscriber 패턴을 중심으로 동작하며 데이터를 생성, 가공하고 구독자에게 전달하는 역할을 한다.
Reactor에서는 Mono와 Flux의 데이터 스트림 유형을 지원한다.

Publisher-Subscriber 패턴: 반응형 스트림(Reactive Stream)

  • 비동기적 및 이벤트 기반 응용 프로그램을 위한 '스트림 처리 기술'을 의미한다. 해당 기술의 핵심은 'Publisher'가 'Subscriber'에게 데이터를 제공하는 것을 의미한다.

  • 이는 Publisher는 데이터를 생성하고, Subscriber는 이를 처리한다. 이러한 방식으로 스트림을 처리함으로써 데이터를 더 효율적으로 처리할 수 있다.

  • Publisher: 발행자 - 데이터를 생성하고, Subscriber에게 전송한다.

  • Subscriber: 구독자 - Publisher로부터 데이터를 받아들이고, 소비한다.

  • Subscription: 구독 - Subscriber가 처리할 데이터의 양을 정의한다.

📌 Mono

Mono는 Reactor 라이브러리에서 제공하는 Reactive Streams의 Publisher 중 하나로 오직 '0개 또는 1개의 데이터 항목 생성'하고 이 결과가 생성되고 나면 스트림이 종료되면 결과 생성을 종료한다.
Mono를 사용하여 비동기적으로 결과를 반환하면 해당 결과를 구독하는 클라이언트는 결과가 생성될 때까지 블로킹하지 않고 다른 작업을 수행할 수 있다.

Mono 사용 예시

Mono.just("Hello, world!")
	.map(String::toUpperCase)
    .flatMap(s -> Mono.just("Mono: " + s))
    .subscribe(System.out::println);
  • "Hello, world!"라는 문자열을 Mono.just를 통해 Mono로 만든 후, map 연산자를 이용해 문자열을 대문자로 변환한다.
  • 이후, flatMap 연산자를 이용해 "Mono: "이라는 문자열과 결합한다.
  • 마지막으로, 비동기적으로 처리된 결과값을 subscribe 메서드를 이용하여 출력한다.

📌 Blocking Request / Non-Blocking Request

일반적으로 WebFlux를 사용하는 경우 Non-Blocking Request를 사용하는 것이 좋다고 한다. Blocking Request를 사용하면 스레드가 차단되어 성능이 저하되기 때문이다.
WebFlux는 Reactive Programming 모델을 위해 설계되었으며, Non-Blocking Request를 사용하면 이러한 모델을 더욱 효과적으로 활용할 수 있다.

Blocking Reqeust (=Synchronous request)

  • 기존 Spring MVC에서 사용되던 Blocking Request를 수행하면서 클라이언트에서 요청을 보내면 결과가 반환될 때까지 대기를 하는 것을 의미한다.

출처 https://reflectoring.io/getting-started-with-spring-webflux/

Non-Blocking Request (=Asynchrouse Request)

  • WebFlux에서 제공하는 Non-Blocking Request 수행하면서 요청을 보내고 결과가 반환되지 않더라고 다른 작업을 수행할 수 있는 것을 의미한다.

출처 https://reflectoring.io/getting-started-with-spring-webflux/

📌 WebClient

WebFlux의 일부인 WebClient는 비동기적인 방식으로 HTTP 요청을 보내고 응답을 받을 수 있는 라이브러리를 의미한다.
다수의 외부 API 호출이나, 다른 서비스들과의 통합 작업에서 유용하다. WebFlux의 WebClient는 비동기적인 방식으로 HTTP 요청을 보내고 응답을 받을 수 있는 라이브러리이다. 이를 통해 Reactive Streams를 이용하여 높은 성능의 네트워크 통신을 구현할 수 있다.

📌 적용 코드

/**
 * AI 질의응답
 */
@PostMapping("/question/{roomId}")
public Mono<ResponseEntity<String>> sendMessage(@PathVariable String roomId,
                                                    @CurrentUser User user,
                                                    @RequestParam("question") String question) {

    return ollamaApiClient.sendQuestion(roomId, user, question)
            .doOnSuccess(response -> {
                // AI의 응답을 Redis에 퍼블리시
                PublishMessage publishMessage = new PublishMessage(roomId, response.content(), LocalDateTime.now());

                log.info("publishMessage: {}", publishMessage.getContent());
                redisTemplate.convertAndSend(topic.getTopic(), publishMessage);
            })
            .then(Mono.just(ResponseEntity.ok("AI Response Success")))
            .onErrorResume(error -> {
                log.error("Error while sending question: ", error);
                return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send question."));
            });
}
    
---------------
/** AI 질의응답 메서드
반환 타입을 Mono로 감싸는 이유
비동기 작업을 외부에서 처리하려면 리액티브 타입으로 반환하는 것이 좋다.
이렇게 하면 호출자가 메서드의 실행이 완료될 때까지 비동기적으로 기다릴 수 있다.
*  */
@Transactional
public Mono<ChatMessageDto.ChatResponse> sendQuestion(String roomId, User currentUser, String question) {
    // 사용자 질문 저장
    chatMessageService.saveMessage(currentUser, question, roomId, SenderType.USER);

    ChatMessageDto.ChatRequest chatRequest = ChatMessageDto.ChatRequest.builder()
            .roomId(roomId)
            .content(question)
            .senderType(SenderType.USER)
            .build();

    return webClient.post()
            .uri("/ai/chat/message")
            .bodyValue(chatRequest)
            .retrieve()
            .bodyToMono(ChatMessageDto.ChatResponse.class);
}

위는 AI-SW Developers 공모전에서 진행한 SSL 프로젝트의 코드 일부이다.
기능은 사용자의 법률 질문을 받아 외부 AI API에 전달해 답변을 받고 사용자에게 답변을 전달하는 기능이다.

이 코드는 Spring WebFlux를 사용하여 비동기적으로 처리된 코드이다.
1. Mono와 리액티브 프로그래밍

  • Mono는 Spring WebFlux의 리액티브 타입으로, 0 또는 1개의 데이터를 비동기적으로 반환하는 스트림을 나타낸다.
  • 코드에서 반환 타입으로 Mono<ResponseEntity<String>>Mono<ChatMessageDto.ChatResponse>를 사용하고 있는 것은 비동기 작업을 리액티브 방식으로 처리하기 위해서이다.
  1. WebClient
  • WebClient는 Spring WebFlux에서 제공하는 비동기 HTTP 클라이언트이다.
  • 코드에서 sendQuestion 메서드는 WebClient를 사용하여 AI API에 POST 요청을 보내고 있다. 이 요청은 블로킹 없이 비동기로 처리된다.
return webClient.post()
            .uri("/ai/chat/message")
            .bodyValue(chatRequest)
            .retrieve()
            .bodyToMono(ChatMessageDto.ChatResponse.class);
  • 요청의 결과는 Mono로 감싸져 반환되며, AI API의 응답이 준비될 때까지 호출 스레드는 차단되지 않는다.
  1. 비동기 체이닝과 후속 작업
  • sendMessage 메서드에서 ollamaApiClient.sendQuestion 호출 후, .doOnSuccess.then으로 작업 체인을 구성하소 있다.
return ollamaApiClient.sendQuestion(roomId, user, question)
        .doOnSuccess(response -> {
            // 응답 처리
        })
        .then(Mono.just(ResponseEntity.ok("AI Response Success")))
        .onErrorResume(error -> {
            // 에러 처리
        });
  • 이 코드는 응답을 받았을 때 Redis에 메시지를 퍼블리시하고, 응답 성공 또는 실패 시 적절한 결과를 반환하는 비동기 작업 흐름을 구성한다.
  • .doOnSuccess는 성공 이벤트를 처리하기 위한 부수 작업(side effect)으로 사용된다.
  • .then은 성공 이후 새로운 Mono를 반환한다.
  1. 블로킹 호출 없음
  • 코드 전반에서 블로킹 메서드 호출이 없는 것을 알 수 있다.
  • 예를 들어, Redis 퍼블리싱(redisTemplate.convertAndSend)이나 데이터 저장(chatMessageService.saveMessage) 등의 작업은 비동기 작업이 완료된 후에 호출된다.
  • 호출 스레드가 블로킹되지 않으므로 더 많은 요청을 효율적으로 처리할 수 있다.

결론

  • Mono와 WebClinnt를 사용해 AI API와의 통신을 비동기로 처리하고
  • Redis 퍼블리싱과 같은 추가 작업을 체이닝하여 비동기적으로 실행되도록 설계되었다.

이런 방식은 스레드 효율성을 극대화하고, 많은 동시 요청을 처리할 수 있는 확장성을 제공한다.

나의 코드에서 Non-Blocking Request의 이점
사용자의 질문을 AI API에 전송한 뒤 AI에서 답변을 받기까지 어느정도의 시간이 걸릴지 알 수 없다. 즉시 답변을 받을 수도 있으나 대기 시간이 몇 초까지도 걸릴수도 있는 것이다. 이런 경우 AI API의 결과가 반환될 때까지 대기한다면 뒤에 들어온 다른 요청의 처리 또한 대기하게 된다. 이는 불필요한 대기 시간을 만들게 되므로 Non-Blocking Request로 처리해 결과가 반환되지 않더라도 다른 작업을 수행하도록 해 요청 처리를 효율적으로 할 수 있다.

흐름 요약
1. 사용자가 질문을 작성
2. "/question/{roomId}" 경로로 질문이 전달
3. 질문이 데이터베이스에 저장
4. 질문이 AI API로 전송되고 응답을 가져옴
5. AI의 응답이 Redis에 퍼블리싱
6. 최종적으로 클라이언트에게 성공 또는 실패 메시지 반환


출처
https://docs.spring.io/spring-framework/reference/web/webflux.html
https://adjh54.tistory.com/232

profile
스프링 백엔드를 공부중인 대학생입니다!

0개의 댓글

관련 채용 정보