Dandi의 Spring Batch Job에서 API 호출 성능 개선하며, 멀티 스레드 환경을 구축하고 있었습니다.
CompletableFuture<Weathers> weatherFuture = CompletableFuture.supplyAsync(
() -> weatherRequester.getWeathers(baseDateTime, weatherLocation), executor);
weathersFutures.add(weatherFuture);
if (weathersFutures.size() == threadSize) {
weathersFutures.forEach(weathersFuture -> weathers.add(getFutureValue(weathersFuture)));
weathersFutures.clear();
}
과정에서 위와 같이 CompletableFuture 객체를 반환하는 비동기 스레드 메서드를 사용했습니다.
private Weathers getFutureValue(CompletableFuture<Weathers> weathersFuture) {
try {
return weathersFuture.get();
} catch (Exception e) {
// ...
}
}
CompletableFuture의 get 메서드를 통해 비동기 처리한 결과 값을 얻을 수 있는데요.
/**
* Waits if necessary for this future to complete, and then
* returns its result.
*
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
@SuppressWarnings("unchecked")
public T get() throws InterruptedException, ExecutionException {
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
}
get 메서드는 비동기 처리한 결과 값이 아직 완료되지 않으면 wait 하고 완료되었을 시에 값을 얻을
수 있습니다. 그 말은, 비동기 처리를 하고 있는 스레드는 값을 얻기 위한 작업을 하고 있다는 것이죠.
해당 결과 값을 얻기를 원하는 스레드는 wait을 하고
비동기 처리를 수행하는 스레드가 Socket에서 응답 대기, 복잡한 연산 등 결과 값을 만들고 있는 것입니다.
해당 메서드에 나와있듯이, get 메서드는 2개의 unchecked exception를 throws 합니다.
먼저, ExecutionException은 비동기 처리 내부에서 발생한 Exception을 wrapping한 것입니다.
그렇다면, InterruptedException은 무엇일까요?
Interrupt는 OS에 존재하는 개념입니다. 아래와 같은 정의를 가지고 있는데요.
마이크로프로세서에서 인터럽트(interrupt), 끼어듦, 또는 가로막기란, 마이크로프로세서(CPU)가 프로그램을 실행하고있을 때, 입출력하드웨어 등의 장치에 예외상황이 발생하여 처리가 필요할 경우에 마이크로프로세서에게 알려 처리할 수 있도록 하는 것을 말한다
한 마디로, 프로세서가 A의 일을 진행하고 있다가 B의 일을 진행해야 할 때, 프로세서에게 알려 B를 수행하도록 하는 개념입니다.
멀티스레드가 가능한 환경인 Java에서 Interrupt는 스레드에게 하던 일을 멈추도록 하는 스레드 매커니즘 중 하나입니다.
CompletableFuture<Weathers> weatherFuture = CompletableFuture.supplyAsync(
() -> weatherRequester.getWeathers(baseDateTime, weatherLocation), executor);
weatherFuture.get(); // 끝나지 않는다면?
위 코드는 제 프로젝트에서 API 통신을 통해 값을 받아오는 비동기 로직인데요.
만약, API 응답이 평생 오지 않고 스레드가 무한정 기다린다면 계속 wait 하는 상태로 남아있는 것입니다.
Thread.interrupt();
이런 스레드에게 하는 일을 그만두라고 제어하는 것이 java에서의 Interrupt
입니다. interrupt 메서드를 통해서 제어할 수 있습니다. 예시에서는 API 응답에 대한 wait을 그만두게 할 것입니다.
따라서, InterruptedException은 스레드 입장에서 내가 뭔일을 하고 있었는데… 누가 그만하라고 했어요! 예외 발생시킬래요!
라는 맥락이라고 생각할 수 있습니다.
그렇다면, InterruptedException은 언제 발생하는 것일까요?
f this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class,
then its interrupt status will be cleared and it will receive an InterruptedException.
interrupt가 호출된다면 interrupt status가 clear되고 InterruptedException이 발생
한다는 주석 설명이 있습니다. 주의 깊게 보셔야 할 것은 interrupt status를 set하는 것이 라니라 clear 한다는 것입니다.
public void interrupt() {
if (this != Thread.currentThread()) {
checkAccess();
// thread may be blocked in an I/O operation
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // set interrupt status
b.interrupt(this);
return;
}
}
}
// set interrupt status
interrupt0();
}
private native void interrupt0();
interrupt 메서드 내부 구현은 위와 같습니다. set interrupt status라는 주석이 있습니다. 여기서 약간의 혼동이 왔는데요. method 설명 주석에는 status가 clear된다고 했는데 메서드 내부에서는 set한다는 주석이 있습니다. interrupt0는 native 메서드라 분석해볼 수 없었습니다.
제 추측에 따른 동작 방식은 아래와 같습니다. 정확한 내용을 아시는 분은 댓글로 알려주시면 감사하겠습니다!
결론적으로, InterruptedException이 발생할 때 interrupt status는 clear
됩니다.
InterruptedException이 발생했다는 것은 스레드가 자신이 해야할 일을 정상적으로 완료하지 못했다는 뜻
입니다. 따라서, catch block을 비워두면서 해당 Exception을 무시한다면 interrupt status는 clear 되었기 때문에 스레드가 Interrupt 되었다는 사실을 알지 못할 것
입니다. 따라서, 절대 무시하면 안되고 처리해줘야 합니다.
정석과 같은 방식이라고 생각합니다.
interrupt 메서드를 다시 호출거나 re-throw 해서 외부로 InterruptedException를 발생시킴으로써, Interrupt 되었다는 사실을 알려
줍니다.
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (InterruptedException e) {
throw e;
}
해당 스레드의 interrupt를 호출
저는 스레드 풀을 Batch에서 StepScope로 사용하고 있는데요. 따라서, Job에서 재시도할 수 없는 FatalException을 던져서 Step을 종료시켜 해당 스레드풀을 소멸시키는 방식
을 사용했습니다.
@Bean
@StepScope
public ExecutorService weatherApiThreadPool(@Value("#{jobParameters[weatherApiThreadSize]}") Long weatherApiThreadSize) {
return Executors.newFixedThreadPool(weatherApiThreadSize.intValue());
}
catch (InterruptedException e) {
throw new WeatherRequestFatalException("(날씨 API Thread InterruptedException)" + e.getMessage());
}
도움 많이 되었어요 잘 보고 갑니다.