지난 시간에는 자바의 비동기 처리 기술에 대해서 알아보았다.
이번 시간에는 스프링에서 제공하는 여러 비동기 기술에 대하여 자세히 알아보자.
기본적으로 스프링에서 제공하는 SimpleAsyncTaskExecutor
라는 ThreadPool이 있지만 비동기 작업이 들어올 때마다 스레드를 새로 생성하여 관리하기 때문에 실무에서는 반드시 직접 ThreadPool을 명시하여 사용 해야한다.
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(10);
te.setMaxPoolSize(100);
te.setQueueCapacity(200);
te.setThreadNamePrefix("mythread-");
te.initialize();
return te;
}
}
setCorePoolSize(int)
setQueueCapacity(int)
setMaxPoolSize(int)
setThreadNamePrefix(String)
setTaskDecorator(TaskDecorator)
Spring 3.0 버전부터 지원되는 비동기 처리 어노테이션으로 @Async 가 붙은 메서드에 대하여 Spring AOP를 적용해 비동기 호출이 가능해졌다.
메서드의 실제 실행은 Spring의 TaskExecutor에 의해서 실행이 되며, 리턴 타입은 java.util.concurrent.Future 로 ListenableFuture 또는 CompletableFuture을 선언 가능하다.
서비스 설정
@Service
public static class MyService {
@Async
public Future<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(2000);
return new AsyncResult<>("Hello");
}
}
메인 코드
@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class JavaPlaygroundApplication {
private final MyService myService;
public static void main(String[] args) {
// 스프링 어플리케이션 실행 후 바로 종료되도록
try (ConfigurableApplicationContext c = SpringApplication.run(JavaPlaygroundApplication.class, args)) {
}
}
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
Future<String> f = myService.hello();
log.info("Exit :" + f.isDone());
log.info("Result :" + f.get());
};
}
}
실행 결과
2022-12-31 22:56:37.514 INFO 15625 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-31 22:56:37.525 INFO 15625 --- [ main] c.j.p.JavaPlaygroundApplication : Started JavaPlaygroundApplication in 1.265 seconds (JVM running for 1.952)
2022-12-31 22:56:37.526 INFO 15625 --- [ main] c.j.p.JavaPlaygroundApplication : run()
2022-12-31 22:56:37.529 INFO 15625 --- [ main] c.j.p.JavaPlaygroundApplication : Exit :false
2022-12-31 22:56:37.533 INFO 15625 --- [ mythread-1] c.j.p.JavaPlaygroundApplication : hello()
2022-12-31 22:56:39.538 INFO 15625 --- [ main] c.j.p.JavaPlaygroundApplication : Result :Hello
2022-12-31 22:56:39.548 INFO 15625 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
Spring 4.0 버전부터 지원되는 Interface인데 ListenableFuture를 사용하면 비동기 처리의 결과 또는 예외를 처리할 수 있는 callback 메서드를 편리하게 지정할 수 있다.
서비스 설정
@Service
public static class MyService {
@Async
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(2000);
return new AsyncResult<>("Hello");
}
}
메인 코드
@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class JavaPlaygroundApplication {
private final MyService myService;
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(JavaPlaygroundApplication.class, args)) {
}
}
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> log.info(s), e -> log.error(e.getMessage()));
log.info("Exit");
};
}
}
실행 결과
2022-12-31 22:55:38.075 INFO 15395 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-31 22:55:38.085 INFO 15395 --- [ main] c.j.p.JavaPlaygroundApplication : Started JavaPlaygroundApplication in 1.443 seconds (JVM running for 1.965)
2022-12-31 22:55:38.087 INFO 15395 --- [ main] c.j.p.JavaPlaygroundApplication : run()
2022-12-31 22:55:38.091 INFO 15395 --- [ main] c.j.p.JavaPlaygroundApplication : Exit
2022-12-31 22:55:38.095 INFO 15395 --- [ mythread-1] c.j.p.JavaPlaygroundApplication : hello()
2022-12-31 22:55:38.099 INFO 15395 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2022-12-31 22:55:38.107 INFO 15395 --- [ mythread-1] c.j.p.JavaPlaygroundApplication : Hello
메인 스레드가 먼저 전부 종료된 이후에 callback에 등록된 SuccessCallback이 동작한다.
Servlet 3.0 이 등장하면서 어플리케이션에 들어오는 요청을 비동기로 처리할 수 있도록 지원했다.
이를 통해서 요청 자체는 적은 Thread로 많은 요청을 처리할 수 있었지만, 응답 할때 작업 스레드에 기본적인IO(InputStream) 나 DB IO에 의해 발생하는 Blocking 문제를 해결 할 수 없었다.
Servlet 3.0 : 비동기 서블릿
Servlet 3.1이 등장하면서 Write/Read Listener 를 통해서 Non-Blocking IO로 동작할 수 있게 개선되었다.
하지만, HTTP connection을 읽고 쓸때, HttpServletRequest, HttpServletResponse 2개가 있는데 내부적으로 InputStream, OutputStream 으로 구현되어 있어 요청 읽기, 응답 쓰기는 Blocking으로 동작하여 완전한 Non-Blocking으로 동작하기에는 한계가 있다.
그리하여 Spring 5에서는 Servlet을 사용하지 않는 Reactive 기반의 Webflux를 제공하고 있어 완전한 Non-Blocking을 제공한다.
Servlet 3.1 : Non-Blocking IO
번외로 Servlet은 현재 6.0 까지 릴리즈 되어있는데 각 버전의 주요 차이점은 다음과 같다.
Servlet 4.0
Servlet 5.0
@RestController
@RequiredArgsConstructor
@Slf4j
public class MyController {
@GetMapping("/callable")
public Callable<String> callable() {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}
실행 결과
2023-01-01 00:42:00.939 INFO 40463 --- [nio-8080-exec-1] c.j.playground.controller.MyController : callable
2023-01-01 00:42:00.946 INFO 40463 --- [ MvcAsync1] c.j.playground.controller.MyController : async
결과에서 보이듯이 요청 받았을 때의 스레드(nio-8080-exec-1) 와 작업 스레드(MvcAsync1)가 다른 것을 볼 수 있다.
WebAsyncTask 도 있는데 Callable과 내부 동작과 구조가 동일하나 Timeout, TaskExecutor 설정이 더 쉽다.
Spring 3.2 버전 이후에 지원되는 비동기 처리 기술로 ‘지연된 결과’ 라는 의미로 응답을 나중에 반환할 수 있다.
@RestController
@RequiredArgsConstructor
@Slf4j
public class MyController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
@GetMapping("/dr")
public DeferredResult<String> dr() {
DeferredResult<String> dr = new DeferredResult<>(600000L);
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drCount() {
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drEvent(String msg) {
for (DeferredResult<String> dr : results) {
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}
/dr
호출 시, 계속 대기 /dr/count
호출하여 큐 개수 확인/dr/event?msg=Second Message
호출 하면 계속 대기하면 /dr
API 에 응답이 내려지게 된다.
소켓 통신 없이도 간단한 채팅방, 대기 상태가 있다가 여러 대상들에게 결과를 쏴주어야 할 때 유용하다.
가장 중요한 건 워커 스레드가 별도로 만들어지는 것이 아니다.
Spring 4.2 버전부터 지원되는 기술로 비동기 요청의 결과를 여러번 나누어 전달할때 사용되는 기술이다.
StreamingResponseBody의 경우 파일 Byte 데이터를 Stream으로 나누어 전달도 가능하다.
SseEmitter 를 사용하여 Server-Sent-Events(SSE) 기술 적용도 가능하다.
ResponseBodyEmitter 사용 예제
@RestController
@RequiredArgsConstructor
@Slf4j
public class EmitterController {
@GetMapping("/emitter")
public ResponseBodyEmitter emitter() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 1; i <= 50; i++) {
emitter.send("<p> Stream " + i + "</p>");
Thread.sleep(100);
}
} catch (Exception e) {
}
});
return emitter;
}
}
실행 결과
Spring의 비동기 처리 기술에 대해서 개념과 예제로 실제 어떻게 사용되는지 알아보았다.
스프리에서 자주 보이던 @Async 처리부터 비동기 연산의 결과와 예외를 받을 수 있는 ListenableFuture, 비동기 연산 요청을 보낸 후 대기하다가 이벤트를 보내면 대기 중인 클라이언트 모두에게 결과를 반환할 수 있는 DefferredResult, 비동기 요청의 결과를 여러번 분할해서 보낼 수 있는 ResponseBodyEmitter 까지 다양하게 알아 보았다.
평소 자바나 스프링에서 이렇게 많은 기술을 지원하는지는 몰랐는데 여러 비즈니스 니즈에 따라서 다양하게 활용 가능 할 것 같다고 느꼈다.