Spring에서는 간단히 비동기를 구현 할 수 있도록 @Async 어노테이션이 구현되어 있다. 이전 게시글에서 설명한 CompletableFuture와 조합해서 사용할 수 있는 @Async는 어떻게 동작하는지 알아보고자 한다.
@Async는 새로운 스레드를 생성하여 기존 스레드의 ThreadLocal를 사용하지 못하므로, 데이터를 복사해서 전달해 주어야 한다.
@Async는 기본적으로 void만 반환한다. 따라서, 비동기 스레드에서 발생한 Error는 메인까지 반환하지 못하므로, 메서드 내에서 별도로 처리하거나 Future / ListenableFuture / CompletableFuture 로 반환해 처리해야 한다.
Future은 비동기 블로킹방식이므로 잘 사용하지 않는다.
ListenableFuture은 콜백 메소드를 통해 논블로킹 처리가 가능하다. 첫번째 파라미터는 성공시 실행할 것을, 두번째 파라미터는 실패시 실행할 것을 지정해주면 된다.
CompletableFuture은 Java 8에서 추가된 클래스로, 비동기 작업의 결과를 처리하기 위한 기능을 제공한다. 비동기 작업 이후의 연결, 조합, 변환을 처리할 수 있다.
@EnableAsync
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@EnableAsync
를 Applcation에 바로 적용할 수 있으나, 해당 방법은 SimpleAsyncTaskExecutor을 사용하여 스레드 풀에서 스레드를 가져오는 것이 아니라, 스레드를 생성하는 역할만 한다. 따라서 스레드를 제대로 관리할 수 없으므로 추천하지 않는다.
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer{
private int CORE_POOL_SIZE = 3;
private int MAX_POOL_SIZE = 10;
private int QUEUE_CAPACITY = 10000;
@Bean(name = "sampleExecutor")
public Executor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
taskExecutor.setThreadNamePrefix("Executor-");
//1. 데코레이터 적용
taskExecutor.setTaskDecorator(new CustomDecorator());
//2. 거부 작업 처리
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
//3. 핸들러 생성해 예외처리
return new AsyncExceptionHandler();
}
}
public class CustomDecorator implements TaskDecorator{
@Override
public Runnable decorate(Runnable runnable){
//현재 요청의 RequestAttribute를 가져옴
RequestAttributes attributes = RequestContextHolder.currentRequestAttributes();
return () -> {
try{
//작업 실행 전에 RequestAttributes를 설정
RequestContextHolder.setRequestAttributes(attributes);
//작업 실행
runnable.run()
} finally{
//작업 실행 후에 RequestAttributes를 제거
RequestContextHolder.resetRequestAttributes();
}
};
}
}
decorate()를 오버라이드 하여 구현한다.
@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler{
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params){
log.error( ex.getMessage(), ex);
}
}
스레드 풀의 종류를 여러개 사용할 경우, @Async 설정 시 taskExecutor의 Bean 이름과 동일하게 맞춰주면 된다.
@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{
@Async("sampleExecutor")
public void asyncMethod(String message){
log.info("Thread " + i);
}
}
@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{
@Async("sampleExecutor")
public ListenableFuture<String> asyncMethod(String message){
log.info("Thread " + message);
}
return new AsyncResult<>("성공" + message);
}
@RestController
@RequiredArgsConstructor
public class TestController{
private final TestService testService;
@GetMapping("/test")
public void main(){
for (int i = 1; i <=10; i++){
ListenableFuture<String> listenableFuture = testService.asyncMethod(String.valueOf(i));
listenableFuture.addCallback(result -> log.info(result),
error -> log.info(error.getMessage()))
}
}
}
addCallback() 메소드로 첫번째 파라미터는 성공시 실행할 것, 두번째 파라미터는 실패시 실행할 것을 지정해주면 된다.
@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{
@Async("sampleExecutor")
public CompletableFuture<String> asyncMethod(String message){
try{
log.info("Thread " + message);
if (message == "5"){
throw new RuntimeException();
}
return CompletableFuture.completedFuture("성공" + message );
} catch (Exception e){
return CompletableFuture.failedFuture(e);
}
}
}
작업이 성공하면 결과 값을 감싸고, 예외가 발생하면 예외를 감싼 CompletableFuture를 반환한다.
failedFuture()를 사용하지 않고 예외를 바로 던져도 상관없으며, 예외를 명시적으로 처리하기 위해 사용한다.
@RestController
@RequiredArgsConstructor
public class TestController{
private final TestService testService;
@GetMapping("/test")
public void main(){
for (int i = 1; i <=10; i++){
CompletableFuture<String> completableFuture = testService.asyncMethod(String.valueOf(i));
completableFuture.thenApply(result -> {
//result를 사용한 로직 수행
return "로직 수행 결과";
}).exceptionally(e -> {
log.error("예외 발생" + e.getMessage());
return null;
});
}
}
}
1. thenApply(Function< ? super T,? extends U >fn)
2. thenAccept(Consumer< ? super T >action)
3. thenRun(Runnable action)
4. exceptionally(Function< Throwable,? extends T>fn)
5. handle(BiFunction< ? super T,Throwable,? extends U >fn)
6. whenComplete(BiConsumer< ? super T,? super Throwable >action)
7. allOf(CompletableFuture< ? >... cfs)
8. anyOf(CompletableFuture< ? >... cfs)
이 외의 메서드 참고
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html