WebFlux (SubscribeOn, PublishOn)

박찬섭·2024년 12월 20일

스프링

목록 보기
12/14

스레드 스케쥴러
SubscribeOn
PublishOn
스케쥴러 종류
사용 예시

스레드 스케쥴러

스레드 스케줄러는 MonoFlux와 같은 WebFlux의 비동기 타입에서 사용할 수 있는 연산자다.

기본적으로 WebFlux 기반의 자바 코드는 비동기 방식으로 동작한다.
스레드가 이벤트 루프를 순환하면서 작업을 처리한다.
하지만 일부 작업은 동기적으로, 즉 블로킹 방식으로 처리되는 경우가 있다.

예를 들어

  • 데이터베이스 요청
  • 외부 서버 API 호출

이러한 작업에서는 요청과 응답 사이의 대기 시간 동안 스레드가 블로킹된다.

스레드 스케줄러 연산자는 이를 해결하기 위해 사용된다.

SubscribeOn

SubscribeOn은 다음과 같은 상황에서 사용할 수 있다.
해당 작업의 스레드가 블로킹이 되면 지정한 스레드풀에 해당 작업을 넘긴다.
사용 방법은 다음과 같다.

	public Mono<Entity> proccess(Long id) {
        return Mono.fromCallable(() -> entityRepository.findById(id))
                .subscribeOn(Schedulers.boundedElastic())
                ...
                ...
    }

해당 작업은 다음과 같이 해석할 수 있다.

entityRepository.findById(id)

이 부분이 DB와 요청과 응답을 주고 받으며 대기시간이 발생하고 스레드가 블로킹이 된다.

.subscribeOn(Schedulers.boundedElastic())

블로킹이 발생하는 지점부터 Schedulers에 boundedElastic 스레드풀에 해당 작업을 넘긴다.

이렇게 해당 작업을 다른 스레드풀에 넘기게되면 해당 블로킹이 끝나도 그대로 boundedElastic 스레드풀에서 작업을 처리한다.


PublishOn

PublishOn은 다음과 같은 상황에서 사용할 수 있다.
해당 PublishOn이 선언된 부분부터 해당 작업을 원하는 스레드풀에게 넘길 수 있다.

	public Mono<String> fetchDataAndProcess() {
        return Mono.fromCallable(() -> externalApiCall())
                   .publishOn(Schedulers.parallel()) // CPU 집약적인 작업으로 전환
                   .map(data -> processData(data));  // 데이터 처리
    }

    private String externalApiCall() {
        // 외부 API 호출 (가정: I/O 작업)
        return "data-from-api";
    }

해당 작업은 다음과 같이 해석할 수 있다.

.publishOn(Schedulers.parallel())

externalApiCall() 부분까지는 동기적으로 작업되다 이후 부터 병렬스레드로 작업을 연임한다.

둘다 같이 사용해서 조금 더 효율적으로 스레드를 다룰 수 있다.


스케쥴러 종류

스케줄러설명
Schedulers.parallel()고정 크기 스레드 풀, CPU 집약적 작업에 사용.
Schedulers.boundedElastic()동적 스레드 풀, 블로킹 작업(I/O 등)에 사용.
Schedulers.immediate()호출된 스레드에서 바로 실행.
Schedulers.single()단일 스레드에서 실행.

사용 예시

public Mono<String> processWithSubscribeOnAndPublishOn() {
    return Mono.fromCallable(() -> externalApiCall())    // 블로킹 작업
               .subscribeOn(Schedulers.boundedElastic()) // I/O 작업을 별도 스레드에서 실행
               .publishOn(Schedulers.parallel())         // 이후 작업을 병렬 스레드로 전환
               .이후 작업
               //
               //
}

private String externalApiCall() {
    // 블로킹 작업 예시 (외부 API 호출 등)
    return "data-from-api";
}

시나리오
externalApiCall은 boundedElastic 스레드풀에서 작업하고
이후 작업은 parallel 스레드풀에서 처리한다.

profile
백엔드 개발자를 희망하는

0개의 댓글