WebFlux 선행 지식 (2) - Spring의 비동기 처리

김정욱·2021년 10월 26일
1

WebFlux

목록 보기
2/5
post-thumbnail

@Async / @EnableAsync

[ @Async ]

정의

  • Spring에서 Task를 별도의 쓰레드에서 처리하는 비동기를 위해 지원하는 애너테이션

원리

  • AOP가 적용되어 spring context에 등록되어 있는 빈 객체의 메서드가 호출되었을 때 스프링이 끼어들어 @Async가 적용되어있다면 메서드를 가로채서 다른 쓰레드(풀)에서 실행시켜주는 매커니즘
  • 즉, @Async를 선언한 메소드를 호출한 호출자는 즉시 리턴하고, 실제 실행Spring TaskExecutor 에 의해 실행

적용

  • @EnableAsync를 선언해서 Spring에게 사용함을 알려줘야 한다
    • 방법 1 : main mathod가 존재하는 class에 추가
    • 방법 2 : 별도의 threadpool 선언 후, Bean등록 후 선언
  • 비동기 처리를 하기 위한 메소드에 @Async 추가

사용되는 내부 로직

  • springframework.aop.interceptor 패키지의 AsyncExecutionAspectSupport 클래스 로 이동 후
  • 처리 로직은 AsyncExecutionAspectSupport 클래스의 doSubmit() 메서드에서 4가지 반환타입에 따라 처리
/* AsyncExecutionAspectSupport의 dosubmit() */
@Nullable
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        /* returnType이 CompletableFuture일 때 처리 */
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                } catch (Throwable var2) {
                    throw new CompletionException(var2);
                }
            }, executor);
                   /* returnType이 ListenableFuture일 때 처리 */
        } else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor)executor).submitListenable(task);
                   /* returnType이 Future일 때 처리 */
        } else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
           /* returnType이 void일 때 처리 */
        } else {
            executor.submit(task);
            return null;
        }
    }

4가지 반환타입에 의한 처리

  • void
    • Task 수행 후 null 반환
  • Future
    • Future 객체 반환
    • 즉, get()을 통한 블록킹을 발생시켜 결과를 받아야 한다
      => ListenableFuture / CompletableFuture을 통해 콜백 메서드사용
      => Non-Blocking 으로 동작 가능!
  • ListenableFuture
    • ListenableFuture 반환
    • 콜백을 지정해서 Non-Blocking으로 동작 가능
    • 연속적인 비동기 작업 수행을 할 때에는 계속 콜백(Callback)을 중첩시켜 지정해야 한다
      => Callback hell 발생
  • CompletableFuture
    • CompletableFuture을 반환
    • thenAccept() / thenApply() 와 같은 API로 연속되거나, 여러개의 비동기 작업을 처리하도록 조합할 수 있다
    • 람다 표현식과 파이프라이닝을 활용하여 구조적으로 예쁘고 편리하게 구성 가능

주의사항

  • 기본으로 SimpleAsyncTaskExecutor 쓰레드 풀이 사용되는데 권장 X
    => 요청 만큼 쓰레드를 계속 만들어서 던지기만 하기 때문 (캐싱 X, 클리어 X)
    => @EnableAsync와 함께 직접 쓰레드 풀을 정의해서 사용하는 것을 권장 !

[ @EnableAsync ]

정의

  • @Async를 통한 비동기 작업을 활성화 하기 위한 애노테이션

적용 방법 1 : main method가 존재하는 class에 추가

  • 이때 추가적으로 별도의 threadpool을 지정해주지 않으면 SimpleAsyncTaskExecutor 를 사용
    => 별도의 스레드풀 지정을 권장
@EnableAsync
@SpringBootApplication
public class DemoApplication{
  public static void main(String[] args){
    ...
  }
}

적용 방법 2 : 별도의 threadpool 구성 - AsyncConfigurer

@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {

	@Override
	public Executor getAsyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(3);
		executor.setMaxPoolSize(30);
		executor.setQueueCapacity(10);
executor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
		executor.initialize();
		return executor;
	}
}

적용 방법 3 : 별도의 threadpool 구성 - threadpool Bean 생성

  • ThreadPool 생성 Bean 생성 방법
    • 메소드 수준에서 executor를 override하는 것
    • 직접 쓰레드풀을 가지는 Spring Bean등록하는 방법
    • AsyncConfigurer를 구현하는 방법과 다르게 다수의 쓰레드풀 적용 가능
      (dev, cbtphase마다 다르게 설정 가능)
@Configuration
@EnableAsync
public class AsyncConfigure {

  @Bean(name = "tp1")
  public Executor threadPoolTaskExecutor()
  {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(3);
    taskExecutor.setMaxPoolSize(30);
    taskExecutor.setQueueCapacity(10);
    taskExecutor.setThreadNamePrefix("Executor-");
    taskExecutor.initialize();
    return taskExecutor;
  }

  @Bean(name = "tp2")
  public Executor threadPool~~~()
  {
    ...
    return taskExecutor;
  }
}

예외 처리

  • 메서드 반환 유형이 Future 타입이면 예외를 throw해서 처리하기가 쉽다
    (Future, ListenableFuture, CompletableFuture)
  • 그 외의 void 반환 유형일 경우 예외가 호출 스레드로 전파되지 않는다
    => 따라서 추가 구성이 필요하다
  • AsyncUncaughtExceptionHandler 사용자 정의 예외 핸들러 만들기
    • AsyncUncaughtExceptionHandler 인터페이스를 상속한 사용자 정의 예외 핸들러 작성
    • AsyncConfigurergetAsyncUncaughtExceptionHandler()재정의 하여 해당 오류를 반환
    • (예외 발생 이후) 해당 예외를 @ExceptionHandler로 잡아서 처리
/* AsyncUncaughtExceptionHandler 사용자 정의 예외 처리기 생성 */
public class CustomAsyncExceptionHandler
  implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(
      Throwable throwable, Method method, Object... obj) {

        System.out.println("Exception message - " + throwable.getMessage());
        System.out.println("Method name - " + method.getName());
        for (Object param : obj) {
            System.out.println("Parameter value - " + param);
        }
    }
}


/* AsyncConfigurer 인터페이스의 getAsyncUncaughtExceptionHandler()로 핸들링 */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new CustomAsyncExceptionHandler();
}

TaskExecutor / ThreadPoolTaskExecutor

[ TaskExecutor 설명 ]

  • JDK의 Executor, CommonJ WorkManager, QuartzTask 실행기에 대한 어댑터를 지원하는 Spring 서비스 추상화 모델
  • Spring의 Task의 실행(Executor)과 스케줄링(Scheduling)
    ref : https://blog.outsider.ne.kr/1066

[ TaskExecutor 종류 ]

  • ThreadPoolTaskExecutor
    • Java 5 환경의 java.util.concurrent.ThreadPoolExecutor 기반의 TaskExecutor
    • 가장 일반적으로 사용하는 TaskExecutor
  • SimpleAsyncTaskExecutor
    • Task를 수행할 스레드를 재사용하지 않고 호출 마다 새로운 스레드를 시작
    • 동시접속 제한(concurrency limit)을 넘어서면 빈 공간이 생길 때 까지 Block
  • WorkManagerTaskExecutor
    • CommonJ WorkManager 기반의 TaskExecutor

[ ThreadPoolTaskExecutor 핵심 파라미터 ]

  • ThreadPoolExecutor 기반이라서 파라미터가 비슷하지만 추가적인 필드가 존재
    • queueCapacity : BlockingQueue의 크기
  • 파라미터 적용 예시
    • 최초 CorePoolSize10 만큼 할당
    • Task가 CorePoolSize보다 많아지면 QueueCapacity200이 될 때 까지 큐에 적재
    • QueueCapacity200을 넘어가면 MaxPoolSize만큼 증가
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(10); 
   executor.setMaxPoolSize(100);
   executor.setQueueCapacity(200);
   executor.setBeanName("DemoThreadPool");
   executor.initialize();

AutoConfiguration

[ 개요 ]

  • SpringBoot는 Spring과 마찬가지로 component-scan을 통해 component를 찾고 Bean으로 등록
  • 이러한 과정에서, SpringBoot가 미리 작성해둔 auto configuration에 있는 것들도 함께 Bean으로 등록
  • ex) Spring에서는 ThreadPoolExecutor를 사용하기 위해선 Bean으로 직접 등록해야 했다
    => SpringBoot에서는 AutoConfiguration에 의해 자동 등록

[ 정의 ]

  • SpringBoot에서 제공하는 다양한 자동 설정 기능
  • @EnableAutoConfiguration 으로 활성화
    (일반적으로 @ComponentScan과 함께 사용)

[ 자동 등록 대상 ]

[ 충돌 ]

  • 사용자가 직접 정의하는 Bean과 AutoConfiguration에서 같은 Bean을 생성시 충돌이 날 수 있다
    => @Conditional / @Conditionspring.factories에 있는 AutoConfigurationImportFilter 설정으로 보완 가능
/* spring.factories의 필터 부분 */
...
# Auto Configuration Import Filters
org.springframework.boot.autoconfigure.AutoConfigurationImportFilter=\
org.springframework.boot.autoconfigure.condition.OnBeanCondition,\
org.springframework.boot.autoconfigure.condition.OnClassCondition,\
org.springframework.boot.autoconfigure.condition.OnWebApplicationCondition
...
  • 예시
    • @ConditionalOnMissingBean ( OnBeanCondition에 포함 )
      • 특정 Bean이 사전에 생성되지 않은 경우 조건이 만족
      • @Bean과 함께 사용해서 이미 생성된 Bean이 없을 때 Bean을 생성하게 설정 가능
/* ThreadPoolTaskExecutor 빈을 생성하는 TaskExecutionAutoConfiguration.java */
...(생략)
@Configuration(proxyBeanMethods = false)
public class TaskExecutionAutoConfiguration {

    /**
     * Bean name of the application {@link TaskExecutor}.
     */
    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    ...(생략)

    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
        AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
    /* Bean이 없을 때 생성되도록 설정 */
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }
}

refs

profile
Developer & PhotoGrapher

0개의 댓글