Asynchronous Request?

maketheworldwise·2022년 5월 1일
1


이 글의 목적?

백기선님의 강의는 스프링 공식 문서의 내용 중에서 일부를 가져와 다룬 내용들이다. 공식 문서를 보면서 강의에서 다루지 않은 비동기 요청에 대해 정리해보자.

Callable

💡 Runnable?

쓰레드를 구현하는 방법에는 두 가지가 존재한다. 하나는 Thread 클래스를 상속하여 구현하는 방법과 다른 하나는 Runnable 인터페이스를 구현하는 방법이 있다. 자바에서는 다중 상속을 하지 못하기 때문에 대부분 확장성을 고려하여 Runnable 인터페이스를 구현하는 편이다.

추가로, Executor는 기능적으로 보면 Thread와 유사하여 Thread의 대체제로 생각할 수 있지만, 정확하게는 Runnable의 작업을 실행시키는 함수를 담은 인터페이스라고 한다.

공식 문서에서는 DeferredResult 다음으로 Callable에 대한 설명이 기술되어있다. 하지만 Callable에 대해 먼저 설명하는 것이 순서에 맞다. 그 이유는 DeferredResult가 Callable의 대안으로 나왔기 때문이다. Callable 부터 살펴보자.

/**
 * A task that returns a result and may throw an exception.
 * Implementors define a single method with no arguments called
 * {@code call}.
 *
 * <p>The {@code Callable} interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * {@code Runnable}, however, does not return a result and cannot
 * throw a checked exception.
 *
 * <p>The {@link Executors} class contains utility methods to
 * convert from other common forms to {@code Callable} classes.
 *
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call}
 */
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

주석에 따르면, Callable은 concurrent 패키지의 - 단 하나의 인수가 없는 메서드를 가지는 Functional 인터페이스다. 다른 스레드에 의해 인스턴스가 실행될 가능성이 있는 클래스를 위해 설계되었다는 점에서 Runnable과 유사하지만, Callable은 결과를 반환하고 예외를 던질 수 있다는 점에서 차이가 있다.

차이점을 예제로 이해해보자. 먼저 Runnable 예제다.

public class Main {

    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            String calledAt = LocalTime.now().toString();
            System.out.println(calledAt);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Runnable
        MyRunnable runnable = new MyRunnable();
        Thread thread = new Thread(runnable);
        thread.run();
    }
}

Callable 예제는 Runnable과 다르게 FutureTask 객체를 이용하여 호출을 한다. 그리고 futureTask.get() 메서드를 이용하여 Callable의 call() 메서드가 호출되어 결과가 리턴되기를 기다린다.

public class Main {

    static class MyCallable implements Callable {

        @Override
        public Object call() throws Exception {
            String calledAt = LocalTime.now().toString();
            return calledAt;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable callable = new MyCallable();
        FutureTask futureTask = new FutureTask(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        System.out.println(futureTask.get());
    }
}

DeferredResult

DeferredResult는 비동기 요청 처리를 위해 Callable의 대안으로 나왔다. 모든 코드를 넣기에 너무 많으니 주석만 살펴보자. 😅

/**
 * {@code DeferredResult} provides an alternative to using a {@link Callable} for
 * asynchronous request processing. While a {@code Callable} is executed concurrently
 * on behalf of the application, with a {@code DeferredResult} the application can
 * produce the result from a thread of its choice.
 *
 * (생략)

주석에 따르면, Callable이 애플리케이션을 대신하여 동시에 실행되는 동안 DeferredResult를 사용하여 애플리케이션은 선택한 쓰레드에서 결과를 생성할 수 있다고 한다. 도대체 무슨 말일까? 주석을 해석해도 이해가 안되니 구글링을 해보자.

DeferredResult를 직역하면 지연된 결과로 해석할 수 있다. 어떤 요청에 대한 응답 이벤트를 큐에 저장하고 있다가 DeferredResult.setResult() 메서드가 호출되면 DispatcherServlet에 응답을 보내는 형태로 이루어진다고 한다. 단, 쓰레드는 스프링 MVC에서 관리되지 않는다고 한다.

그래도 이해가 되지 않는다... 언제나 그렇듯 일단 받아들이고 문서의 Processing 항목을 읽어보자. 그럼 이해가 되지 않을까 싶다.

Processing

공식 문서에 나온 내용을 해석해보자.

Servlet 비동기 요청 처리

  • ServletRequest는 request.startAsync()를 호출하여 비동기 모드로 전환할 수 있다. 이 경우에는 서블릿 및 모든 필터가 종료가 되어도, 나중에 응답 처리를 완료할 수 있다.
  • request.startAsync()의 호출은 AsyncContext를 반환하는데, 이는 비동기 처리에 대한 추가 제어에 사용할 수 있다. 예를 들어, AsyncContext는 서블릿 API의 포워드 요청과 유사하지만 추가로 서블릿 컨테이너 쓰레드에 애플리케이션에 요청 처리를 재개할 수 있는 Dispatch 메서드를 제공한다.
  • ServletRequest는 현재 DispatcherServletType에 접근할 수 있게 하고 - 초기 요청인지, 비동기 요청인지, 포워드 요청인지, 기타 Dispatch 요청인지 구분할 수 있게 해준다.

DeferredResult 요청 처리 과정

  1. Controller는 DefferedResult를 반환하고 이를 메모리 내 큐나 리스트에 저장해 나중에 접근할 수 있도록 한다.
  2. 스프링 MVC는 request.startAsync()를 호출한다.
  3. 그 동안 DispatcherServlet과 모든 필터는 쓰레드를 종료하지만 응답은 아직 처리되지 않았다.
  4. 애플리케이션은 DeferredResult를 다른 쓰레드에서 처리하게 하고 스프링 MVC는 서블릿 컨테이너에 다시 요청을 Dispatch한다.
  5. DispatcherServlet은 다시 호출되고, 비동기 리턴값의 처리를 다시 재개한다.

Callable 요청 처리 과정

  1. Controller는 Callable을 반환한다.
  2. 스프링 MVC는 request.startAsync()를 호출하고 별도의 쓰레드에서 처리하기 위해 Callable을 내부 TaskExecutor에 전송한다.
  3. 그 동안 DispatcherServlet과 모든 필터는 쓰레드를 종료하지만 응답은 아직 처리되지 않았다.
  4. Callable이 리턴값을 생성하고 스프링 MVC가 서블릿 컨테이너에 처리를 완료하도록 서블릿 컨테이너로 다시 요청을 Dispatch한다.
  5. DispatcherServlet은 다시 호출되고, Callable을 리턴한 값의 처리를 다시 재개한다.

그래서 결국?

맨 처음에 이 글을 읽고 내가 이해한 내용을 정리해보자. 요청이 들어오면 하나의 쓰레드(A)에서 처리가 되는데, request.startAsync()를 만나는 순간 쓰레드(A)는 종료되면서 풀에 반납되고, 다른 쓰레드(B)에게 요청을 처리하도록 전달한다. 쓰레드(B)에서 처리가 완료되면, 결과를 응답으로 처리할 쓰레드(C)를 여는 구조로 진행된다.


어느 정도 흐름이 그려지니 DeferredResult 처리 과정을 하단의 이미지로 다시 이해해보자.

  1. 요청이 들어옴
  2. 요청을 처리할 쓰레드(A)를 생성
  3. 쓰레드(A)에서 요청에 대해 DeferredResult를 반환하고 이를 큐에 저장
  4. 쓰레드(A)를 풀에 반환
  5. 큐에 저장된 DeferredResult 처리 (DeferredResult.setResult() 메서드 호출)
  6. 쓰레드 풀에서 새로운 쓰레드(B) 요청
  7. 쓰레드(B)에서 응답을 처리하여 결과를 반환
  8. 쓰레드(B)를 풀에 반환

그 다음으로는 Callable을 살펴보자.

  1. 요청이 들어옴
  2. DispatcherServlet이 해당 Path에 알맞는 Controller 메서드를 찾아 요청하고 Callable을 반환 받음
  3. DispatcherServlet이 Callable을 비동기 처리하도록 WebAsyncManager에 요청함
  4. WebAsyncManager는 Callable을 특정 TaskExecutor에 할당하여 비동기 처리함
  5. Callable의 작업이 완료되면 WebAsyncManager로 결과값을 돌려줌
  6. WebAsyncManager는 DispatcherServlet에게 Dispatch 요청을 함
  7. DispatcherServlet은 WebAsyncManager에게서 결과값을 받은 다음 적절하게 처리하여 응답을 돌려줌

DeferredResult와 Callable은 요청이 들어왔을 때 - 처음 요청을 처리하기 위한 쓰레드(A)와는 다른 쓰레드(B)에서 생산된 값을 반환한다는 부분은 동일하다. 하지만 DeferredResult는 외부의 몇몇 이벤트로 부터 결과에 대한 응답을 반환한다는 점에서 차이가 있다고 한다. (Long Polling 방식이라고 하더라..)

지금의 내 수준에서 차이점을 이해를 하자면, Callable의 경우 TaskExecutor에서 처리를 할 때 사용하는 쓰레드는 스프링 MVC에서 관리되는 쓰레드지만, DeferredResult의 경우 큐에 저장된 내용을 처리하는 쓰레드는 스프링 MVC가 관리하는 쓰레드가 아닌 외부에서 처리가 된다! 정도다. (왠지 잘못된 지식같지만 지금은 이 정도로 만족하자 🙏🏻)

Callable은 주로 요청 처리가 오래 걸리는 DB 작업, REST API 요청 처리를 하는데 적합하고 DeferredResult는 JMS, AMQP, Scheduler, Redis, 등 다른 HTTP 요청에서 사용된다고 한다.

💡 startAsync()는 어디에서 호출될까?

실제로 startAsync()가 실행되는 지점은 어디에 있을까? RequestMappingHandlerAdapter에서 HandlerMethodReturnValueHandler를 들고 있는데, 이 중에 응답이 DeferredResult인 경우 DeferredResultMethodReturnValueHandler가 사용되고 그 안에서 WebAsyncManager의 startDeferredResultProcessing()이 호출된다.

WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); 

그리고 그 안에서 startAsyncProcessing(processingContext)가 호출되며 그 내부에 ServletRequest의 startAsync()가 호출된다.

HTTP Streaming

DeferredResult와 Callable은 하나의 요청에 하나의 응답만 가능하다. 그렇다면 여러 개의 비동기 값을 가지고 응답을 해야할 때는 어떻게 해야할까?

문서에 따르면, HTTP Streaming을 이용하면 Object, SSE(Server Send Event), Raw Data를 포함한 여러 비동기 리턴 값을 반환할 수 있다고 한다.

Object

Object는 ResponseBodyEmitter를 반환하여 객체 스트림을 생성할 수 있다. 각 객체는 HttpMessageConverter에 의해 직렬화되어 응답에 추가된다.

@GetMapping("/events")
public ResponseBodyEmitter handle() {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    // Save the emitter somewhere..
    return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();

SSE

SSE는 ResponseBodyEmitter의 하위 클래스인 SseEmitter를 이용하여 데이터 스트림을 포함한 비동기 리턴값을 반환할 수 있게 해준다.

@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {
    SseEmitter emitter = new SseEmitter();
    // Save the emitter somewhere..
    return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();

Raw Data

파일 다운로드와 같이 바이트 단위의 입출력을 하는 경우에는 Message Conversion 과정을 수행하지 않고 바로 OutputStream이 처리하도록 데이터 스트림을 반환하는 경우가 있다. 이 경우에는 StreamingResponseBody를 사용한다.

@GetMapping("/download")
public StreamingResponseBody handle() {
    return new StreamingResponseBody() {
        @Override
        public void writeTo(OutputStream outputStream) throws IOException {
            // write...
        }
    };
}

Reactive Types

스프링 MVC는 Controller에서 spring-webflux에서의 WebClient와 같은 Reactive Client 라이브러리를 제공한다고 한다. 이 영역은 아직 내 수준으로는 이해하기 어려우므로 나중에 다시 알아보도록 하자.

Disconnects

SseEmitter나 Reactive Types를 이용해 응답을 기다리는 동안 주기적으로 데이터를 전송하는 과정에서, 클라이언트가 연결을 끊으면 당연히 실패하게 된다. 서블릿에서는 이렇게 클라이언트가 연결을 끊었을 경우에 대한 알림을 제공하지 않는다. 문서에서는 이 경우를 위해 heartbeat 메커니즘을 이용해보라고 한다. 이 영역 역시 나에게는 아직 벅찬 내용이므로 연결이 끊어지는 부분에 대한 해결책이 필요하다! 정도로만 이해하고 넘어가자.

Configuration

비동기 요청 처리 설정 방법에 대한 내용은 생략한다.

이 글의 레퍼런스

profile
세상을 현명하게 이끌어갈 나의 성장 일기 📓

0개의 댓글