스프링에서의 비동기 처리: Mono 사용법과 원리

SUUUI·2025년 3월 21일
0
  1. Mono란 무엇인가?
    Mono는 Project Reactor에서 제공하는 Publisher 인터페이스의 구현체로, 0-1개의 결과를 비동기적으로 생성한다. Optional과 비슷하게 값이 있거나 없을 수 있지만, 추가로 비동기적 특성을 가진다는 점이 다르다.
    기본적인 특징은 다음과 같다:

최대 1개의 결과만 방출
onNext, onComplete, onError 신호 발생
지연 실행(lazy execution) 지원
풍부한 연산자(operators) 제공

  1. Mono의 생성 방법
    Mono 인스턴스를 생성하는 방법은 여러 가지가 있다.
    2.1 정적 팩토리 메서드 사용
    java복사// 단일 값을 포함하는 Mono 생성
    Mono mono1 = Mono.just("Hello, Reactor!");

// 비어있는 Mono 생성
Mono mono2 = Mono.empty();

// 에러를 포함하는 Mono 생성
Mono mono3 = Mono.error(new RuntimeException("에러 발생"));

// 지연 실행되는 Mono 생성
Mono mono4 = Mono.fromSupplier(() -> generateValue());

// Callable로부터 Mono 생성
Mono mono5 = Mono.fromCallable(() -> computeValue());

// CompletableFuture로부터 Mono 생성
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Future 결과");
Mono mono6 = Mono.fromFuture(future);
2.2 프로그래밍 방식으로 생성
java복사Mono mono = Mono.create(sink -> {
// 비동기 API 호출
someAsyncApi.getData(response -> {
if (response.isSuccess()) {
sink.success(response.getData());
} else {
sink.error(new RuntimeException("데이터 로드 실패"));
}
});
});

  1. Mono의 주요 연산자
    Mono는 함수형 스타일로 연산을 조합할 수 있는 다양한 연산자를 제공한다.
    3.1 변환 연산자
    java복사Mono original = Mono.just("Hello");

// map: 값 변환
Mono lengthMono = original.map(s -> s.length());

// flatMap: 다른 Mono로 변환
Mono userMono = Mono.just("userId")
.flatMap(id -> userRepository.findById(id));
3.2 필터링 연산자
java복사Mono numberMono = Mono.just(42);

// filter: 조건에 맞는지 확인
Mono filteredMono = numberMono.filter(n -> n > 10);

// filterWhen: 비동기 조건 필터링
Mono activatedUserMono = userMono
.filterWhen(user -> permissionService.isActive(user.getId()));
3.3 에러 처리 연산자
java복사Mono dataMono = service.getData();

// onErrorReturn: 에러 발생 시 기본값 반환
Mono withDefault = dataMono.onErrorReturn("기본값");

// onErrorResume: 에러 발생 시 다른 Mono로 대체
Mono withFallback = dataMono
.onErrorResume(e -> backupService.getData());

// doOnError: 에러 로깅 등의 사이드 이펙트
Mono withErrorLogging = dataMono
.doOnError(e -> log.error("에러 발생", e));
3.4 완료 및 구독 연산자
java복사// block: 결과를 동기적으로 기다림 (주의: 가능하면 사용 자제)
String result = dataMono.block();

// subscribe: 비동기 구독 시작
dataMono.subscribe(
data -> System.out.println("데이터: " + data),
error -> System.err.println("에러: " + error),
() -> System.out.println("완료")
);

// then: 앞선 Mono 완료 후 다른 Mono 실행
Mono saveAndNotify = repository.save(entity)
.then(notificationService.notify("저장 완료"));
4. Mono의 내부 동작 원리
Mono의 내부 동작을 이해하려면 리액티브 스트림의 Publisher-Subscriber 모델을 알아야 한다.
4.1 리액티브 스트림 스펙
Mono는 다음 4가지 핵심 인터페이스로 구성된 리액티브 스트림 스펙을 따른다:

Publisher: 데이터를 발행
Subscriber: 데이터를 구독
Subscription: 구독 정보를 관리
Processor: Publisher와 Subscriber 역할을 동시에 수행

4.2 비동기 처리 메커니즘
Mono의 비동기 처리는 다음과 같이 동작한다:

Mono 인스턴스 생성 - 데이터 소스와 처리 파이프라인 정의
subscribe() 호출 - 실제 데이터 처리 시작
백프레셔(backpressure) 메커니즘으로 데이터 흐름 제어
비동기 처리 스케줄러를 통한 실행

4.3 지연 실행(Lazy Execution)
java복사Mono userMono = userRepository.findById("userId");
System.out.println("Mono 정의 완료"); // 이 시점에는 DB 조회 실행 안됨

// subscribe 호출 시점에 실제 DB 조회 실행
userMono.subscribe(user -> {
System.out.println("사용자: " + user);
});
5. 실전에서의 Mono 사용 패턴
5.1 Spring WebFlux 컨트롤러

@RestController
public class UserController {
    
    private final UserService userService;
    
    @GetMapping("/users/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(user -> ResponseEntity.ok(user))
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    
    @PostMapping("/users")
    public Mono<ResponseEntity<User>> createUser(@RequestBody Mono<User> userMono) {
        return userMono
            .flatMap(userService::save)
            .map(savedUser -> ResponseEntity
                .created(URI.create("/users/" + savedUser.getId()))
                .body(savedUser));
    }
}

5.2 서비스 레이어에서의 활용

@Service
public class UserService {

    private final UserRepository userRepository;
    private final KafkaTemplate<String, UserEvent> kafkaTemplate;
    
    public Mono<User> findById(String id) {
        return userRepository.findById(id);
    }
    
    public Mono<User> save(User user) {
        return userRepository.save(user)
            .doOnSuccess(savedUser -> 
                kafkaTemplate.send("user-topic", new UserEvent(savedUser))
            );
    }
    
    public Mono<Void> processUsers() {
        return userRepository.findAll()
            .filter(user -> user.isActive())
            .flatMap(this::processUser)
            .then();
    }
    
    private Mono<Void> processUser(User user) {
        return Mono.fromRunnable(() -> {
            // 사용자 처리 로직
        });
    }
}
  

5.3. 비동기 API 호출 조합하기

public Mono<OrderSummary> getOrderSummary(String orderId) {
    Mono<Order> orderMono = orderService.getOrder(orderId);
    Mono<User> userMono = orderMono
        .map(Order::getUserId)
        .flatMap(userService::findById);
    Mono<List<Product>> productsMono = orderMono
        .flatMapIterable(Order::getProductIds)
        .flatMap(productService::findById)
        .collectList();
    
    return Mono.zip(orderMono, userMono, productsMono)
        .map(tuple -> {
            Order order = tuple.getT1();
            User user = tuple.getT2();
            List<Product> products = tuple.getT3();
            
            return new OrderSummary(order, user, products);
        });
}

그럼 Mono 의 단점은 무엇이 있을까?

  1. 트랜잭션 관리 문제
    Mono의 가장 큰 단점 중 하나는 트랜잭션 관리에 있다. 동기 함수의 트랜잭션 내에서 비동기 작업은 별도로 움직인다. 즉, 동기 함수의 요청은 성공했지만 내부에 존재하는 비동기 작업은 실패할 수도 있다.
    @Transactional
    public void processSomething(String id) {
       // 이 부분은 현재 트랜잭션 내에서 실행됨
       repository.update(id, "PROCESSING");
       
       // 비동기 작업 - 현재 트랜잭션과 분리되어 실행됨
       notificationService.sendNotification(id)
           .subscribe(
               success -> log.info("알림 전송 성공"),
               error -> log.error("알림 전송 실패", error)
           );
       
       // 메인 트랜잭션은 비동기 작업의 결과와 관계없이 커밋됨
    }
     

이 문제를 해결하기 위해서는 TransactionalOperator를 사용하거나, 전체 플로우를 리액티브 방식으로 재구성해야 한다:

 public Mono<Void> processSomethingReactive(String id) {
   return transactionalOperator.execute(tx -> {
       return repository.updateReactive(id, "PROCESSING")
           .then(notificationService.sendNotification(id));
   });
}
 
  1. 디버깅 어려움
    콜 스택이 비동기적으로 실행되기 때문에 디버깅이 복잡해진다. 에러가 발생했을 때 추적이 어려울 수 있다.
profile
간단한 개발 기록

0개의 댓글