스프링의 비동기 처리 기술

wwlee94·2023년 1월 8일
0

비동기 처리

목록 보기
2/2
post-thumbnail

스프링의 비동기 처리 기술

지난 시간에는 자바의 비동기 처리 기술에 대해서 알아보았다.

이번 시간에는 스프링에서 제공하는 여러 비동기 기술에 대하여 자세히 알아보자.

ThreadPoolTaskExecutor

기본적으로 스프링에서 제공하는 SimpleAsyncTaskExecutor 라는 ThreadPool이 있지만 비동기 작업이 들어올 때마다 스레드를 새로 생성하여 관리하기 때문에 실무에서는 반드시 직접 ThreadPool을 명시하여 사용 해야한다.

@EnableAsync
@Configuration
public class AsyncConfig {
    @Bean
    ThreadPoolTaskExecutor tp() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);
        te.setMaxPoolSize(100);
        te.setQueueCapacity(200);
        te.setThreadNamePrefix("mythread-");
        te.initialize();
        return te;
    }
}

옵션 종류

setCorePoolSize(int)

  • 첫 작업이 오면 core 만큼 스레드를 생성한다.
  • Runtime 중에도 수정 가능한 옵션

setQueueCapacity(int)

  • 할당할 스레드가 없는 경우 queue 사이즈만큼 대기열 생성

setMaxPoolSize(int)

  • 큐가 꽉차는 경우 max 사이즈 만큼 늘어난다.

setThreadNamePrefix(String)

  • 스레드 Prefix 네이밍 지정

setTaskDecorator(TaskDecorator)

  • 스레드 생성, 반환하는 시점에 callback 걸기 (로깅이나 분석할때)

@Async

Spring 3.0 버전부터 지원되는 비동기 처리 어노테이션으로 @Async 가 붙은 메서드에 대하여 Spring AOP를 적용해 비동기 호출이 가능해졌다.

메서드의 실제 실행은 Spring의 TaskExecutor에 의해서 실행이 되며, 리턴 타입은 java.util.concurrent.Future 로 ListenableFuture 또는 CompletableFuture을 선언 가능하다.

서비스 설정

@Service
public static class MyService {
    @Async
    public Future<String> hello() throws InterruptedException {
        log.info("hello()");
        Thread.sleep(2000);
        return new AsyncResult<>("Hello");
    }
}

메인 코드

@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class JavaPlaygroundApplication {

    private final MyService myService;

    public static void main(String[] args) {
        // 스프링 어플리케이션 실행 후 바로 종료되도록
        try (ConfigurableApplicationContext c = SpringApplication.run(JavaPlaygroundApplication.class, args)) {
        }
    }

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            Future<String> f = myService.hello();
            log.info("Exit :" + f.isDone());
            log.info("Result :" + f.get());
        };
    }
}

실행 결과

2022-12-31 22:56:37.514  INFO 15625 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-31 22:56:37.525  INFO 15625 --- [           main] c.j.p.JavaPlaygroundApplication          : Started JavaPlaygroundApplication in 1.265 seconds (JVM running for 1.952)
2022-12-31 22:56:37.526  INFO 15625 --- [           main] c.j.p.JavaPlaygroundApplication          : run()
2022-12-31 22:56:37.529  INFO 15625 --- [           main] c.j.p.JavaPlaygroundApplication          : Exit :false
2022-12-31 22:56:37.533  INFO 15625 --- [      mythread-1] c.j.p.JavaPlaygroundApplication          : hello()
2022-12-31 22:56:39.538  INFO 15625 --- [           main] c.j.p.JavaPlaygroundApplication          : Result :Hello
2022-12-31 22:56:39.548  INFO 15625 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]

ListenableFuture

Spring 4.0 버전부터 지원되는 Interface인데 ListenableFuture를 사용하면 비동기 처리의 결과 또는 예외를 처리할 수 있는 callback 메서드를 편리하게 지정할 수 있다.

서비스 설정

@Service
public static class MyService {
    @Async
    public ListenableFuture<String> hello() throws InterruptedException {
        log.info("hello()");
        Thread.sleep(2000);
        return new AsyncResult<>("Hello");
    }
}

메인 코드

@SpringBootApplication
@Slf4j
@RequiredArgsConstructor
public class JavaPlaygroundApplication {

    private final MyService myService;

    public static void main(String[] args) {
        try (ConfigurableApplicationContext c = SpringApplication.run(JavaPlaygroundApplication.class, args)) {
        }
    }

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s -> log.info(s), e -> log.error(e.getMessage()));
            log.info("Exit");
        };
    }
}

실행 결과

2022-12-31 22:55:38.075  INFO 15395 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-31 22:55:38.085  INFO 15395 --- [           main] c.j.p.JavaPlaygroundApplication          : Started JavaPlaygroundApplication in 1.443 seconds (JVM running for 1.965)
2022-12-31 22:55:38.087  INFO 15395 --- [           main] c.j.p.JavaPlaygroundApplication          : run()
2022-12-31 22:55:38.091  INFO 15395 --- [           main] c.j.p.JavaPlaygroundApplication          : Exit
2022-12-31 22:55:38.095  INFO 15395 --- [      mythread-1] c.j.p.JavaPlaygroundApplication          : hello()
2022-12-31 22:55:38.099  INFO 15395 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2022-12-31 22:55:38.107  INFO 15395 --- [      mythread-1] c.j.p.JavaPlaygroundApplication          : Hello

메인 스레드가 먼저 전부 종료된 이후에 callback에 등록된 SuccessCallback이 동작한다.

Async Servlet

서블릿 버전 별 주요 변경점

Servlet 3.0 이 등장하면서 어플리케이션에 들어오는 요청을 비동기로 처리할 수 있도록 지원했다.

이를 통해서 요청 자체는 적은 Thread로 많은 요청을 처리할 수 있었지만, 응답 할때 작업 스레드에 기본적인IO(InputStream) 나 DB IO에 의해 발생하는 Blocking 문제를 해결 할 수 없었다.

Servlet 3.0 : 비동기 서블릿

  • HTTP connection은 이미 Non-Blocking IO
  • 서블릿 요청 읽기, 응답 쓰기는 Blocking
  • 비동기 작업 시작 즉시 서블릿 스레드 반납
  • 비동기 작업이 완료되면 서블릿 스레드 재할당
  • 비동기 서블릿 컨텍스트 이용 (AsyncContext)

Servlet 3.1이 등장하면서 Write/Read Listener 를 통해서 Non-Blocking IO로 동작할 수 있게 개선되었다.

  • Callback 방식으로 IO를 처리할 수 있게 되었다

하지만, HTTP connection을 읽고 쓸때, HttpServletRequest, HttpServletResponse 2개가 있는데 내부적으로 InputStream, OutputStream 으로 구현되어 있어 요청 읽기, 응답 쓰기는 Blocking으로 동작하여 완전한 Non-Blocking으로 동작하기에는 한계가 있다.

  • Java에서는 InputStream, OutputStream 는 기본적으로 Blocking이라서 서블릿이 Blocking 구조인 것

그리하여 Spring 5에서는 Servlet을 사용하지 않는 Reactive 기반의 Webflux를 제공하고 있어 완전한 Non-Blocking을 제공한다.

Servlet 3.1 : Non-Blocking IO

  • 서블릿 요청, 응답도 Non-Blocking 처리
  • Callback

번외로 Servlet은 현재 6.0 까지 릴리즈 되어있는데 각 버전의 주요 차이점은 다음과 같다.

Servlet 4.0

  • **HTTP/2** 지원

Servlet 5.0

  • 4.0과 거의 동일하며 javax. 패키지를 jakarta. 로 모두 변경해야한다.

Servlet 6.0

  • 오랫동안 사용되지 않는 메서드 제거
  • 내구성 있는 개체 래퍼 ID 요청 및 응답
  • 잘 정의된 URI 보안 보호
  • API 종속성이 없는 전체 쿠키 기능
  • 향상된 프로그래밍 모델 및 기능

스프링에서 비동기 서블릿 사용 - Callable

@RestController
@RequiredArgsConstructor
@Slf4j
public class MyController {
    @GetMapping("/callable")
    public Callable<String> callable() {
        log.info("callable");
        return () -> {
            log.info("async");
            Thread.sleep(2000);
            return "hello";
        };
    }
}

실행 결과

2023-01-01 00:42:00.939  INFO 40463 --- [nio-8080-exec-1] c.j.playground.controller.MyController   : callable
2023-01-01 00:42:00.946  INFO 40463 --- [      MvcAsync1] c.j.playground.controller.MyController   : async

결과에서 보이듯이 요청 받았을 때의 스레드(nio-8080-exec-1) 와 작업 스레드(MvcAsync1)가 다른 것을 볼 수 있다.

  • 테스트용으로 Spring의 기본 스레드 풀(SimpleAsyncTaskExecutor)을 사용한 것이고 실무에서는 AsyncSupportConfigurer를 설정하여 별도로 명시된 TaskExecutor를 사용 해야한다.

WebAsyncTask 도 있는데 Callable과 내부 동작과 구조가 동일하나 Timeout, TaskExecutor 설정이 더 쉽다.

DefferredResult

Spring 3.2 버전 이후에 지원되는 비동기 처리 기술로 ‘지연된 결과’ 라는 의미로 응답을 나중에 반환할 수 있다.

  • DefferredResult 큐를 사용하면, 별도로 작업 스레드를 생성하지 않아도 처리 가능하다.

@RestController
@RequiredArgsConstructor
@Slf4j
public class MyController {
    Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();

    @GetMapping("/dr")
    public DeferredResult<String> dr() {
        DeferredResult<String> dr = new DeferredResult<>(600000L);
        results.add(dr);
        return dr;
    }

    @GetMapping("/dr/count")
    public String drCount() {
        return String.valueOf(results.size());
    }

    @GetMapping("/dr/event")
    public String drEvent(String msg) {
        for (DeferredResult<String> dr : results) {
            dr.setResult("Hello " + msg);
            results.remove(dr);
        }
        return "OK";
    }
}
  1. /dr 호출 시, 계속 대기

  1. /dr/count 호출하여 큐 개수 확인

  1. /dr/event?msg=Second Message 호출 하면 계속 대기하면 /dr API 에 응답이 내려지게 된다.


소켓 통신 없이도 간단한 채팅방, 대기 상태가 있다가 여러 대상들에게 결과를 쏴주어야 할 때 유용하다.

가장 중요한 건 워커 스레드가 별도로 만들어지는 것이 아니다.

ResponseBodyEmitter

Spring 4.2 버전부터 지원되는 기술로 비동기 요청의 결과를 여러번 나누어 전달할때 사용되는 기술이다.

StreamingResponseBody의 경우 파일 Byte 데이터를 Stream으로 나누어 전달도 가능하다.

SseEmitter 를 사용하여 Server-Sent-Events(SSE) 기술 적용도 가능하다.

  • 클라이언트 측에서 폴링을 따로 사용하지 않고, HTTP 커넥션을 통해 서버에서 이벤트 발생 시 클라이언트 측으로 데이터를 푸시하는 기술

ResponseBodyEmitter 사용 예제

@RestController
@RequiredArgsConstructor
@Slf4j
public class EmitterController {
    @GetMapping("/emitter")
    public ResponseBodyEmitter emitter() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                for (int i = 1; i <= 50; i++) {
                    emitter.send("<p> Stream " + i + "</p>");
                    Thread.sleep(100);
                }
            } catch (Exception e) {
            }
        });

        return emitter;
    }
}

실행 결과

마무리

Spring의 비동기 처리 기술에 대해서 개념과 예제로 실제 어떻게 사용되는지 알아보았다.

스프리에서 자주 보이던 @Async 처리부터 비동기 연산의 결과와 예외를 받을 수 있는 ListenableFuture, 비동기 연산 요청을 보낸 후 대기하다가 이벤트를 보내면 대기 중인 클라이언트 모두에게 결과를 반환할 수 있는 DefferredResult, 비동기 요청의 결과를 여러번 분할해서 보낼 수 있는 ResponseBodyEmitter 까지 다양하게 알아 보았다.

평소 자바나 스프링에서 이렇게 많은 기술을 지원하는지는 몰랐는데 여러 비즈니스 니즈에 따라서 다양하게 활용 가능 할 것 같다고 느꼈다.

레퍼런스

https://www.youtube.com/watch?v=aSTuQiPB4Ns

profile
우엉이의 개발 블로그 📝

0개의 댓글