Spring의 @Async와 CompletableFuture

김예지·2024년 1월 31일
0

Java/Spring

목록 보기
2/3

Spring Framework 비동기 처리 방식

Spring에서는 간단히 비동기를 구현 할 수 있도록 @Async 어노테이션이 구현되어 있다. 이전 게시글에서 설명한 CompletableFuture와 조합해서 사용할 수 있는 @Async는 어떻게 동작하는지 알아보고자 한다.

@Async

  • @Async 어노테이션은 메서드에 적용해 해당 메서드를 비동기적으로 실행하게 한다.
  • TaskExecutor 빈을 사용해 스레드 풀을 구성할 수 있다.
  • return 값을 Future / ListenableFuture / CompletableFuture 로 반환할 수 있다.

사용 시 주의 사항

  1. @Async는 별도의 설정이 없으면 스프링의 AOP를 가져간다. 따라서, AOP와 관련된 제약 사항을 가지며 프록시 패턴에서 발생하는 한계점이 있다.
  • public 메소드에만 사용 가능: proxy에서는 private으로 접근 불가하기 때문이다.
  • 자가 호출(self-invocation) 불가: 자가 호출 시 proxy를 거치지 않으므로 재귀적 호출은 불가하다.
  1. @Async는 새로운 스레드를 생성하여 기존 스레드의 ThreadLocal를 사용하지 못하므로, 데이터를 복사해서 전달해 주어야 한다.

  2. @Async는 기본적으로 void만 반환한다. 따라서, 비동기 스레드에서 발생한 Error는 메인까지 반환하지 못하므로, 메서드 내에서 별도로 처리하거나 Future / ListenableFuture / CompletableFuture 로 반환해 처리해야 한다.

Future

Future은 비동기 블로킹방식이므로 잘 사용하지 않는다.

ListenableFuture

ListenableFuture은 콜백 메소드를 통해 논블로킹 처리가 가능하다. 첫번째 파라미터는 성공시 실행할 것을, 두번째 파라미터는 실패시 실행할 것을 지정해주면 된다.

CompletableFuture

CompletableFuture은 Java 8에서 추가된 클래스로, 비동기 작업의 결과를 처리하기 위한 기능을 제공한다. 비동기 작업 이후의 연결, 조합, 변환을 처리할 수 있다.

@Async 사용법

Application 클래스에 적용

@EnableAsync
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}

@EnableAsync를 Applcation에 바로 적용할 수 있으나, 해당 방법은 SimpleAsyncTaskExecutor을 사용하여 스레드 풀에서 스레드를 가져오는 것이 아니라, 스레드를 생성하는 역할만 한다. 따라서 스레드를 제대로 관리할 수 없으므로 추천하지 않는다.

AsyncConfig 작성

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer{
	private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 10000;
    
    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){
    	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.setThreadNamePrefix("Executor-");
        
        //1. 데코레이터 적용
        taskExecutor.setTaskDecorator(new CustomDecorator());
        //2. 거부 작업 처리
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        return taskExecutor;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
    	//3. 핸들러 생성해 예외처리
    	return new AsyncExceptionHandler();
    }
}

1. 데코레이터 적용 - ThreadLocal 데이터 복사

public class CustomDecorator implements TaskDecorator{
	@Override
    public Runnable decorate(Runnable runnable){
    	//현재 요청의 RequestAttribute를 가져옴
        RequestAttributes attributes = RequestContextHolder.currentRequestAttributes();
        
        return () -> {
        	try{
            	//작업 실행 전에 RequestAttributes를 설정
                RequestContextHolder.setRequestAttributes(attributes);
                
                //작업 실행
                runnable.run()
            } finally{
            	//작업 실행 후에 RequestAttributes를 제거
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

decorate()를 오버라이드 하여 구현한다.

2. 거부 작업 처리 - Rejection Policy

  • AbortPolicy: 작업이 거부되면 RejectedExecution을 던진다.
  • CallerRunsPolicy: Async 메소드를 불렀던 메인 스레드에서 거부된 작업을 실행한다.
  • DiscardOldestPolicy: 큐에서 가장 오래된 task를 제거하고 실행시킨다.
  • DiscardPolicy: Reject된 Task에 대해 어떠한 작업도 하지 않는다.

3. 핸들러 생성해 예외처리(반환 값 void일 때)

@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler{

	@Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params){
    	log.error( ex.getMessage(), ex);
    }
}

@Async 적용 메서드 작성

스레드 풀의 종류를 여러개 사용할 경우, @Async 설정 시 taskExecutor의 Bean 이름과 동일하게 맞춰주면 된다.

1. 반환 값이 없는(void) 경우

@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{

	@Async("sampleExecutor")
	public void asyncMethod(String message){
			log.info("Thread " + i);
	}
}

2. ListenableFuture로 return 값 설정

@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{

	@Async("sampleExecutor")
	public ListenableFuture<String> asyncMethod(String message){
			log.info("Thread " + message);
	}
    
    return new AsyncResult<>("성공" + message);
}
@RestController
@RequiredArgsConstructor
public class TestController{

	private final TestService testService;
    
    @GetMapping("/test")
    public void main(){
    	for (int i = 1; i <=10; i++){
        	ListenableFuture<String> listenableFuture = testService.asyncMethod(String.valueOf(i));
            listenableFuture.addCallback(result -> log.info(result), 
            	error -> log.info(error.getMessage()))
        }
    }

}

addCallback() 메소드로 첫번째 파라미터는 성공시 실행할 것, 두번째 파라미터는 실패시 실행할 것을 지정해주면 된다.

3. CompletableFuture로 return 값 설정

@Sl4j
@Service
@RequiredArgsConstructor
pubic class TestService{

	@Async("sampleExecutor")
	public CompletableFuture<String> asyncMethod(String message){
    	try{
			log.info("Thread " + message);
            
            if (message == "5"){
            	throw new RuntimeException();
            }
            return CompletableFuture.completedFuture("성공" + message );
        } catch (Exception e){
        	return CompletableFuture.failedFuture(e);
        }
	}
}

작업이 성공하면 결과 값을 감싸고, 예외가 발생하면 예외를 감싼 CompletableFuture를 반환한다.
failedFuture()를 사용하지 않고 예외를 바로 던져도 상관없으며, 예외를 명시적으로 처리하기 위해 사용한다.

@RestController
@RequiredArgsConstructor
public class TestController{

	private final TestService testService;
    
    @GetMapping("/test")
    public void main(){
    	for (int i = 1; i <=10; i++){
        	CompletableFuture<String> completableFuture = testService.asyncMethod(String.valueOf(i));
            
            completableFuture.thenApply(result -> {
            	//result를 사용한 로직 수행
            	return "로직 수행 결과";
            }).exceptionally(e -> {
            	log.error("예외 발생" + e.getMessage());
                return null;
            });
        }
    }
}

1. thenApply(Function< ? super T,? extends U >fn)

  • CompletableFuture의 결과를 가공하고 변환하는 데 사용된다.
  • Function< T, U >를 인자로 받아 결과를 변환한 CompletedFuture< U >를 반환한다.

2. thenAccept(Consumer< ? super T >action)

  • CompletableFuture의 결과를 소비하고 추가 작업을 수행하는 데 사용된다.

3. thenRun(Runnable action)

  • CompletableFuture의 결과를 사용하지 않고 추가 작업을 수행하는 데 사용된다.

4. exceptionally(Function< Throwable,? extends T>fn)

  • CompletableFuture에서 발생한 예외를 처리하는 데 사용된다.
  • Function< Throwable, T >를 인자로 받아 예외를 처리하고 대체 값 CompletableFuture< T >를 반환한다.

5. handle(BiFunction< ? super T,Throwable,? extends U >fn)

  • CompletableFuture의 결과 또는 예외를 처리하는데 사용된다.
  • BiFunction< T,Throwable,U >를 인자로 받아 결과 또는 예외를 처리하고 결과를 반환하는 CompletableFuture< U >를 반환한다.

6. whenComplete(BiConsumer< ? super T,? super Throwable >action)

  • CompletableFuture의 결과 또는 예외를 소비하고 추가 작업을 수행하는 데 사용된다.

7. allOf(CompletableFuture< ? >... cfs)

  • 여러 개의 CompletableFuture을 동시에 실행하고, 모든 CompletableFuture의 완료를 기다리는데 사용된다.
  • 모든 CompletableFuture가 완료되면 CompletableFuture< Void >를 반환한다.

8. anyOf(CompletableFuture< ? >... cfs)

  • 여러 개의 CompletableFuture 중 가장 빨리 완료된 CompletableFuture을 반환하는 데 사용한다.
  • 가장 빨리 완료된 CompletableFuture의 결과 타입을 반환한다.

이 외의 메서드 참고
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

0개의 댓글

관련 채용 정보