
리액티브 스트림에서는 데이터 흐름 중 발생하는 에러를 제어하기 위해 다양한 에러 처리 오퍼레이터를 제공한다. 이 오퍼레이터들은 에러 발생 시점에 따라 대체 값을 emit하거나, 시퀀스를 중단하거나 재시작하는 방식으로 유연하게 대응할 수 있도록 돕는다.
error – 의도적으로 에러를 발생시킴Flux.error(...)는 명시적으로 onError 시그널을 발생시켜 시퀀스를 종료시킨다. Java의 throw 키워드와 유사한 역할을 하며, 특정 조건에서 의도적으로 오류를 발생시킬 수 있다.
Flux.range(1, 5)
.flatMap(num -> {
if (num == 3) {
return Flux.error(new RuntimeException("Error at number " + num));
}
return Flux.just(num * 2);
})
.subscribe(
data -> System.out.println("Data: " + data),
error -> System.out.println("Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
출력 결과:
Data: 2 Data: 4 Error: Error at number 3
onErrorReturn – 에러 발생 시 대체 값 반환onErrorReturn은 에러가 발생했을 때 시퀀스를 종료하지 않고, 지정한 단일 값을 emit한 후 완료 신호를 전송한다.
Flux.range(1, 5)
.map(number -> {
if (number == 3) {
throw new RuntimeException("Error at number: " + number);
}
return "Number: " + number;
})
.onErrorReturn("Default Value")
.subscribe(System.out::println);
출력 결과:
Number: 1 Number: 2 Default Value
onErrorResume – 에러 발생 시 대체 Publisher로 전환onErrorResume은 에러 발생 시점에 기존 스트림을 중단하고, 새로운 Publisher로 대체하여 이어나갈 수 있도록 한다. 에러 객체를 활용하여 동적으로 대체 Flux를 생성할 수 있다.
Flux.range(1, 5)
.map(number -> {
if (number == 3) {
throw new RuntimeException("Error at number: " + number);
}
return "Number: " + number;
})
.onErrorResume(error -> {
System.out.println("Error occurred: " + error.getMessage());
return Flux.just("Fallback 1", "Fallback 2");
})
.subscribe(System.out::println);
출력 결과:
Number: 1 Number: 2 Error occurred: Error at number: 3 Fallback 1 Fallback 2
onErrorContinue – 에러 발생 시 해당 요소만 건너뛰고 계속 진행onErrorContinue는 특정 데이터 처리 중 예외가 발생하면 해당 값을 건너뛰고 스트림을 계속 진행할 수 있도록 한다. 콜백으로 에러 객체와 건너뛴 데이터를 전달받는다.
Flux.range(1, 6)
.map(number -> {
if (number == 5) {
throw new RuntimeException("Error at number: " + number);
}
return number;
})
.onErrorContinue((error, value) -> {
System.out.println("Error: " + error.getMessage() + ", skipping value: " + value);
})
.subscribe(data -> System.out.println("Data: " + data));
출력 결과:
Data: 1 Data: 2 Data: 3 Data: 4 Error: Error at number: 5, skipping value: 5 Data: 6
retry – 에러 발생 시 시퀀스를 다시 시작retry(n)은 전체 Flux 시퀀스 실행 중 에러가 발생했을 경우, Flux 전체를 처음부터 재시작하는 동작을 최대 n회까지 허용한다. 항목 단위가 아니라 시퀀스 단위로 재시도된다.
AtomicInteger attempt = new AtomicInteger(1);
Flux.range(1, 3)
.map(num -> {
System.out.println("Attempt " + attempt.get() + ": Processing " + num);
if (num == 2) {
try {
Thread.sleep(Duration.ofMillis(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("Temporary failure");
}
return "Processed: " + num;
})
.retry(2)
.subscribe(
data -> System.out.println("Data: " + data),
error -> System.out.println("Final Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
Thread.sleep(2000);
출력 결과 (예상 흐름):
Attempt 1: Processing 1 Attempt 1: Processing 2 → 실패 Attempt 2: Processing 1 Attempt 2: Processing 2 → 실패 Attempt 3: Processing 1 Attempt 3: Processing 2 → 실패 Final Error: Temporary failure
※ 위 예제에서는 num == 2일 때 예외가 발생하므로, Flux 시퀀스는 총 3회 실행되고 최종적으로 실패한다.
| 오퍼레이터 | 동작 방식 | 특징 |
|---|---|---|
error | 명시적으로 예외 emit | 스트림 즉시 종료 |
onErrorReturn | 에러 발생 시 대체 값 emit | 단일 fallback 값 반환 |
onErrorResume | 에러 발생 시 대체 Publisher로 전환 | 동적 전환 가능 |
onErrorContinue | 에러 발생 요소만 건너뜀 | 스트림 전체 유지 |
retry | 전체 시퀀스 재시도 | 항목 단위가 아닌 Flux 단위로 재실행 |