
스케줄러를 통해 이메일 전송을 비동기적으로 처리하는 로직이 있었다.
아래와 같이 작성되었다.
@Slf4j
@Component
@RequiredArgsConstructor
public class AutoMailEventPublisher {
private final ApplicationEventPublisher eventPublisher;
public void publishAutoMailSendEventOf(Map<AutoMailTargetAggregate, List<Workbook>> eachExcelDataMap) {
// 각 AutoMailTargetAggregate 에 대해 이메일 이벤트 호출
log.warn("Publishing AutoMailSendEvent for email: {}",
eachExcelDataMap.keySet()
.stream()
.map(autoMailAggregate -> autoMailAggregate.getAdmin().getAdmEmail())
.toList());
eachExcelDataMap.keySet().forEach(autoMailAggregate ->
eventPublisher.publishEvent(
AutoMailSendEvent.of(autoMailAggregate.getAdmin().getAdmEmail(), eachExcelDataMap.get(autoMailAggregate))));
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class AutoMailEventListener {
private final static String TODAY = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
private final static String MAIL_TITLE = "[오토메일링] " + TODAY + "일자 결과 보내드립니다.";
private final static String FILE_NAME = TODAY + "일자 " + "오토메일링 결과.xlsx";
private final static String MAIL_BODY = "오토메일링 결과";
private final AutoMailSendHandlerV1 autoMailSendHandlerV1;
/**
* 이메일 전송 처리
*
* @param event 이메일 전송 이벤트
* @apiNote @{@link AutoMailEventPublisher} 에서 publishAutoMailSendEventOf() 을 통해 호출
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void handleAutoMailSendEvent(AutoMailSendEvent event) {
String admEmail = event.getAdmEmail();
List<Workbook> workbooks = event.getWorkbooks();
try {
log.warn("AutoMailSendEvent received");
autoMailSendHandlerV1.sendEmail(MAIL_TITLE, MAIL_BODY, admEmail, FILE_NAME, workbooks););
} catch (Exception e) {
log.error("Failed to send email", e);
}
}
}
하지만 아래와 같은 에러가 발생하였다.


신기하게도 @Async 를 풀고 Sync 로 처리하면 성공한다.
그렇다면 과연 무엇이 문제였을까?
우리는 두 번째 사진에 주목해봐야한다.

이 에러를 보면 무엇인가로 인해 InterruptedException 이 발생했다는 것을 알 수 있다.
그렇다. 가장 근본원인은 Thread 간 Interrupt가 발생했다는 것이다.
이에 대해서 아래에 대해 살펴봐야 한다.
에러 발생 상황 이해하기
Interrupt 가 발생하는가?@Async 는 어떻게 동작하는가?에러 유발 케이스 분석하기
InterruptedException 발생케이스는 없는가?InterruptedException 발생케이스는 없는가?InterruptedException 발생케이스는 없는가?
Executor 인터페이스에는 실행을 위해 Runnable 인스턴스를 execute 하는 단일 execute 메서드를 가지고 있다.
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));
ExecutorService 인터페이스에는 작업을 제어하고 서비스 종료를 관리하는 많은 메서드가 포함되어 있다.
이 인터페이스를 사용하면 실행을 위해 작업을 submit하고 반환된 Future 인스턴스를 사용하여 작업을 제어할 수도 있다.
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();
Executor, ExecutorService 는 로우 레벨 인스턴스이다. 실질적으로 우리가 사용할 때는 이들을 상위에서 한 번 감싼 ThreadPoolExecutor 를 사용하게된다.
그렇다면 ThreadPoolExecutor 의 역할은 무엇일까?
ThreadPoolExecutor 는 fine tuning 을 위해 파라미터와 설정메서드들을 갖춘 스레드 풀 구현체이다.
여기서 주요 구성 매개변수는 corePoolSize, maximumPoolSize 및 keepAliveTime 이다.
풀은 항상 유지되는 고정된 수의 코어 스레드로 구성된다.
또한 더 이상 필요하지 않을 때 생성되었다가 종료될 수 있는 추가스레드들 또한 포함될 수 있다.
이 때 이 추가스레드들을 조정하기 위해 위 세 가지 변수가 주로 사용된다. (추가로 다른 설정도 처리될 수 있다)
corePoolSize 는 **풀에 유지될 코어 스레드의 수이다.
maximumPoolSize 는 풀이 커질 수 있는 코어 스레드 최대제한수이다.
코어 스레드가 늘어나는 조건은 아래와 같다.
1) 만약 새 작업이 들어올 때
2) 모든 코어 스레드가 사용 중이고
3) internal queue 가 가득 차면
thread pool은 maximumPoolSize 까지 커질 수 있다.
keepAliveTime 는 (corePoolSize를 초과하여 인스턴스화된) 초과 스레드가 유휴 상태로 존재하도록 허용되는 시간 간격이다.
기본적으로 ThreadPoolExecutor는 non-core 스레드만 제거 대상으로 고려한다.
코어 스레드에도 동일한 제거 정책을 적용하려면 allowCoreThreadTimeOut(true) 메서드를 사용할 수 있다.
이외에도 스레드가 가득찼을 때의 정책, 스레드 생성 정책 등등을 설정할 수 있다.
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());
Introduction to Thread Pools in Java | Baeldung

우리가 새 스레드를 생성하게되면 해당 스레드는 NEW 상태가 된다.
프로그램이 start() 메서드를 사용하여 스레드를 시작할 때까지 이 상태가 유지된다.
스레드에서 start() 메서드를 호출하면 스레드는 RUNNABLE 상태가 된다.
이 상태의 스레드는 실행 중이거나 실행할 준비가 된 상태이다.
* 스레드가 monitor lock 을 기다리다가 다른 스레드에 의해 lock 된 code 에 액세스하려고 하면 BLOCKED* 상태가 된다.
바로 이 상태의 스레드들에게 execution 을 submit 하는 것이다.
스레드는 wait() 메서드 호출과 같은 다양한 이벤트에 의해 WAITING 상태가 될 수 있다.
이 상태에서는 스레드가 다른 스레드의 신호를 기다리고 있다.
스레드가 실행을 완료하거나 비정상적으로 종료되면 TERMINATED 상태가 된다.
스레드는 중단될 수 있으며, 스레드가 중단되면 InterruptedException 이 발생하게 된다.
스레드가 대기 중이거나(waiting), 절전 중이거나(sleeping), 다른 방식으로 점유 중인 상태에서 스레드가 중단되면(occupied) InterruptedException이 발생한다.
즉, 일부 코드가 스레드에서 interrupt() 메서드를 호출한 경우이다.
이 exception 은 checked exception 이며, Java의 많은 blocking operation에서 이 예외가 발생할 수 있다.
How to Handle InterruptedException in Java | Baeldung
@Async 는 어떻게 동작하는가?@Async 는 Executor 에 대한 커스텀 설정이 없다면 SimpleAsyncTaskExecuter 를 통해 처리한다.
SimpleAsyncTaskExecuter 는 스레드풀과 같이 일정량의 스레드들을 관리 & 재사용하지 않는다.
SimpleAsyncTaskExecuter 는 요청마다 스레드를 생성하고 이에 따라 컨텍스트 스위칭이 발생한다.
즉, 스레드 생성비용과 유지비용이 발생한다.
하지만 만약 TaskExecutor 를 빈으로 등록해놨다면 @Async처리 시 Spring Context 를 통해 TaskExecutor 이 등록되어있는지를 조회, 해당 TaskExecutor 를 사용한다.
이에 따라 나는 아래와 같이 직접 ThreadPoolTaskExecutor 를 통해 ThreadPoolExecutor 설정을 해주었다.
** ThreadPoolTaskExecutor 은 Java 의 ThreadPoolExecutor 설정을 쉽게하기 위한 Spring 에서 제공하는 Utils Class 라고 보면 된다.
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("EmailAsyncThreadPool")
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("EmailAsync-");
executor.setThreadGroupName("EmailAsyncExecutor");
executor.setCorePoolSize(16);
executor.setMaxPoolSize(64);
,,,
executor.initialize();
return executor;
}
@Bean
public AsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
public static class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
System.out.println("Exception message : " + throwable.getMessage());
System.out.println("Method name : " + method.getName());
for (Object param : obj) {
System.out.println("Parameter value : " + param);
}
}
}
}
InterruptedException 발생케이스는 없는가?있다.
이것이 바로 이 에러의 핵심 원인이었다. (aws sdk 도 retry 도 문제가 아니였다)
정확히 말하자면 아래와 같은 순서로 발생하였다.
즉 Spring Context 로 인한 shutdown() 호출이 문제였다.
Spring Context 의 생명주기 처리과정부터 하나하나 자세히 살펴보자.
Spring Context 는 아래 순서로 빈들을 초기화하고 제거한다.
스프링 빈 생명주기(Bean Lifecycle) 메서드와 실행 순서
Customizing the Nature of a Bean :: Spring Framework
postConstruct
afterPropertiesSet
initTaengPonent
preDestroy
destroy
destoryTaengPonent
스프링 앱 종료 명령이 떨어지면 ThreadPoolTaskExecutor 클래스도 종료를 위해
ExecutorConfigurationSupport 클래스에 정의된 shutdown 메서드가 호출된다.
좀 더 정확하게는, ExecutorConfigurationSupport 추상 클래스가 Bean LifeCycle Callback 중 하나인 DisposableBean을 구현하고 있고 destroy() 메서드가 호출되면서, destory 메서드 안에 있는 shutdown 메서드가 호출되는 구조이다.
** destory() 는 SpringBean 메서드로 ThreadPoolExecutor Bean 자체를 죽임
** shutdown() 은 ThreadPoolTaskExecutor 메서드로 submitted tasks 들을 shutdown 함

이에 따라 ThreadPoolExecutor 의 shutdown() 이 호출되는데
새로운 task 를 받지 않고 이전에 submit 된 task 들을 종료처리한다.
하지만 아래를 볼 수 있듯이 task 들의 종료를 기다려주지 않고 종료처리를 한다.
즉 현재 작업 중인 스레드들을 강제로 interrupt 하여 terminated 상태로 몰아넣는 것이다.

이 프로세스를 그림으로 정리하자면 아래 그림과 같이 정리될 수 있을 것이다.

이제 원인 파악이 완료되었다.
그렇다면 어떻게 execution 의 성공을 보장하면서 shutdown() 을 처리할 수 있을까?
결론부터 말하자면 waitForTasksToCompleteOnShutdown , awaitTerminationSeconds 을 처리하면 된다.
ThreadPoolTaskExecutor 빈이 destroy될 때 shutdown 처리되게 되는데,
기본적으로는 작업을 더 이상받지 않고, 만약 실행 중인 작업이 있다면 중지시키게(interrupt) 된다.
만약 작업이 끝날 때까지 대기하게 하고 싶다면, 다음 설정값으로 조정할 수 있다.
이에 따라 나는 아래와 같이 처리해주었다.
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("EmailAsyncThreadPool")
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("EmailAsync-");
executor.setThreadGroupName("EmailAsyncExecutor");
executor.setCorePoolSize(16);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(32);
executor.setKeepAliveSeconds(60);
executor.setAwaitTerminationSeconds(10);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
바로 성공하는 모습을 볼 수 있었다!!

사실 이 분의 포스트를 참고하여 해결을 하였다. 그런데 이 분의 설명이 기가 막히고 코가 막히다.
더 깊이 알고싶다면 꼭 참고해보자.
ThreadPoolTaskExecutor의 waitForTasksToCompleteOnShutdown 속성 알아보기
Amazon SES API v2 examples using SDK for Java 2.x - AWS SDK for Java 2.x
AWS SES (Simple Email Service) Spring Boot 프로젝트에서 사용하기
Use asynchronous programming - AWS SDK for Java 2.x
How to Handle InterruptedException in Java | Baeldung
Introduction to Thread Pools in Java | Baeldung
What are corePoolSize and maxPoolSize in thread pool configuration? When is maxPoolSize used?
ThreadPoolTaskExecutor corePoolSize vs. maxPoolSize | Baeldung
Introduction to Thread Pools in Java | Baeldung
A Guide to the Java ExecutorService | Baeldung
ThreadPoolExecutor - Java Thread Pool Example | DigitalOcean
ThreadPoolExecutor (Java Platform SE 8 )
ExecutorService Internal Working in Java
@Async 적용@Async 기본 ThreadPool 선택What are the defaults in Spring @Async?
스프링 빈 생명주기(Bean Lifecycle) 메서드와 실행 순서
Customizing the Nature of a Bean :: Spring Framework
ExecutorService - Waiting for Threads to Finish | Baeldung
ThreadPoolTaskExecutor의 waitForTasksToCompleteOnShutdown 속성 알아보기