최대 1개의 결과만 방출
onNext, onComplete, onError 신호 발생
지연 실행(lazy execution) 지원
풍부한 연산자(operators) 제공
// 비어있는 Mono 생성
Mono mono2 = Mono.empty();
// 에러를 포함하는 Mono 생성
Mono mono3 = Mono.error(new RuntimeException("에러 발생"));
// 지연 실행되는 Mono 생성
Mono mono4 = Mono.fromSupplier(() -> generateValue());
// Callable로부터 Mono 생성
Mono mono5 = Mono.fromCallable(() -> computeValue());
// CompletableFuture로부터 Mono 생성
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Future 결과");
Mono mono6 = Mono.fromFuture(future);
2.2 프로그래밍 방식으로 생성
java복사Mono mono = Mono.create(sink -> {
// 비동기 API 호출
someAsyncApi.getData(response -> {
if (response.isSuccess()) {
sink.success(response.getData());
} else {
sink.error(new RuntimeException("데이터 로드 실패"));
}
});
});
// map: 값 변환
Mono lengthMono = original.map(s -> s.length());
// flatMap: 다른 Mono로 변환
Mono userMono = Mono.just("userId")
.flatMap(id -> userRepository.findById(id));
3.2 필터링 연산자
java복사Mono numberMono = Mono.just(42);
// filter: 조건에 맞는지 확인
Mono filteredMono = numberMono.filter(n -> n > 10);
// filterWhen: 비동기 조건 필터링
Mono activatedUserMono = userMono
.filterWhen(user -> permissionService.isActive(user.getId()));
3.3 에러 처리 연산자
java복사Mono dataMono = service.getData();
// onErrorReturn: 에러 발생 시 기본값 반환
Mono withDefault = dataMono.onErrorReturn("기본값");
// onErrorResume: 에러 발생 시 다른 Mono로 대체
Mono withFallback = dataMono
.onErrorResume(e -> backupService.getData());
// doOnError: 에러 로깅 등의 사이드 이펙트
Mono withErrorLogging = dataMono
.doOnError(e -> log.error("에러 발생", e));
3.4 완료 및 구독 연산자
java복사// block: 결과를 동기적으로 기다림 (주의: 가능하면 사용 자제)
String result = dataMono.block();
// subscribe: 비동기 구독 시작
dataMono.subscribe(
data -> System.out.println("데이터: " + data),
error -> System.err.println("에러: " + error),
() -> System.out.println("완료")
);
// then: 앞선 Mono 완료 후 다른 Mono 실행
Mono saveAndNotify = repository.save(entity)
.then(notificationService.notify("저장 완료"));
4. Mono의 내부 동작 원리
Mono의 내부 동작을 이해하려면 리액티브 스트림의 Publisher-Subscriber 모델을 알아야 한다.
4.1 리액티브 스트림 스펙
Mono는 다음 4가지 핵심 인터페이스로 구성된 리액티브 스트림 스펙을 따른다:
Publisher: 데이터를 발행
Subscriber: 데이터를 구독
Subscription: 구독 정보를 관리
Processor: Publisher와 Subscriber 역할을 동시에 수행
4.2 비동기 처리 메커니즘
Mono의 비동기 처리는 다음과 같이 동작한다:
Mono 인스턴스 생성 - 데이터 소스와 처리 파이프라인 정의
subscribe() 호출 - 실제 데이터 처리 시작
백프레셔(backpressure) 메커니즘으로 데이터 흐름 제어
비동기 처리 스케줄러를 통한 실행
4.3 지연 실행(Lazy Execution)
java복사Mono userMono = userRepository.findById("userId");
System.out.println("Mono 정의 완료"); // 이 시점에는 DB 조회 실행 안됨
// subscribe 호출 시점에 실제 DB 조회 실행
userMono.subscribe(user -> {
System.out.println("사용자: " + user);
});
5. 실전에서의 Mono 사용 패턴
5.1 Spring WebFlux 컨트롤러
@RestController
public class UserController {
private final UserService userService;
@GetMapping("/users/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(user -> ResponseEntity.ok(user))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping("/users")
public Mono<ResponseEntity<User>> createUser(@RequestBody Mono<User> userMono) {
return userMono
.flatMap(userService::save)
.map(savedUser -> ResponseEntity
.created(URI.create("/users/" + savedUser.getId()))
.body(savedUser));
}
}
5.2 서비스 레이어에서의 활용
@Service
public class UserService {
private final UserRepository userRepository;
private final KafkaTemplate<String, UserEvent> kafkaTemplate;
public Mono<User> findById(String id) {
return userRepository.findById(id);
}
public Mono<User> save(User user) {
return userRepository.save(user)
.doOnSuccess(savedUser ->
kafkaTemplate.send("user-topic", new UserEvent(savedUser))
);
}
public Mono<Void> processUsers() {
return userRepository.findAll()
.filter(user -> user.isActive())
.flatMap(this::processUser)
.then();
}
private Mono<Void> processUser(User user) {
return Mono.fromRunnable(() -> {
// 사용자 처리 로직
});
}
}
5.3. 비동기 API 호출 조합하기
public Mono<OrderSummary> getOrderSummary(String orderId) {
Mono<Order> orderMono = orderService.getOrder(orderId);
Mono<User> userMono = orderMono
.map(Order::getUserId)
.flatMap(userService::findById);
Mono<List<Product>> productsMono = orderMono
.flatMapIterable(Order::getProductIds)
.flatMap(productService::findById)
.collectList();
return Mono.zip(orderMono, userMono, productsMono)
.map(tuple -> {
Order order = tuple.getT1();
User user = tuple.getT2();
List<Product> products = tuple.getT3();
return new OrderSummary(order, user, products);
});
}
그럼 Mono 의 단점은 무엇이 있을까?
@Transactional
public void processSomething(String id) {
// 이 부분은 현재 트랜잭션 내에서 실행됨
repository.update(id, "PROCESSING");
// 비동기 작업 - 현재 트랜잭션과 분리되어 실행됨
notificationService.sendNotification(id)
.subscribe(
success -> log.info("알림 전송 성공"),
error -> log.error("알림 전송 실패", error)
);
// 메인 트랜잭션은 비동기 작업의 결과와 관계없이 커밋됨
}
이 문제를 해결하기 위해서는 TransactionalOperator를 사용하거나, 전체 플로우를 리액티브 방식으로 재구성해야 한다:
public Mono<Void> processSomethingReactive(String id) {
return transactionalOperator.execute(tx -> {
return repository.updateReactive(id, "PROCESSING")
.then(notificationService.sendNotification(id));
});
}