- 반드시 해당 지식들이 있어야 WebFlux를 사용할 수 있는 것은 아닙니다.
- 하지만, 결국 Java의 thread 관련 기술들을 알아야 깊은 WebFlux의 필요성과 이해를 할 수 있다고 생각해서 추가하게되었습니다.
Java의 Concurrent ?
- Java에서 지원하는 Concurrent 프로그래밍
- 멀티 프로세싱(
process
) => ProcessBuilder
- 멀티 쓰레딩(
thread
) => Thread
, Runnable
, Callable
, Executor
등
- 본 글에서는 Java의
멀티 쓰레딩 관련 개념
들을 살펴본다
Executor와 구현체들
[ Executor ]
[ 개념 ]
- Java 5부터 Java Concurrency api를 뒷받침하기 위해 제공된 인터페이스(interface)
- 비동기적으로 호출을 실행하는 메서드를 제공
- 하위 구현체와 인터페이스들을 통해 실제 사용
[ 구조 ]
- 수행할 작업인 Runnable 을 받아서 실행
package java.util.concurrent.Executor;
public interface Executor {
void excute(Runnable command);
}
[ Executor 인터페이스와 하위 관계 ]
[ ExecutorService ]
[ 개념 ]
- Executor 인터페이스를 확장해서 라이프 사이클(시작 / 중지 / 종료)을 관리할 수 있는 기능을 정의한 인터페이스(interface)
- Thread의 라이프사이클이나 발생할 수 있는 low level의 고려사항을 개발자가 신경쓰지 않게 해준다
- ExecutorService에 Task(작업)을 지정해주면, 가진 ThreadPool을 이용해서 Task를 실행
- 만약, Thread Poool의 Thread 수 보다 Task가 많다면, Task는 큐(Queue)에서 대기하게 된다
[ 구성 ]
< T > Future< T >submit(Callable< T > task)
Future<?> submit(Runnable task)
< T > Future< T > submit(Runnable task, T result)
< T > List<Future< T > > invokeAll(Collections<? extends Callable< T > > tasks)
< T > List<Future< T > > invokeAll(Collections<? extends Callable< T > > tasks , long timeout, TimeUnit unit)
< T > invokeAny(Collection<? extends Callable< T > > tasks)
T invokeAny(Collections<? extends Callable< T > > tasks, long timeout, TimeUnit unit)
void shutdown()
List< Runnable > shutdownNow()
boolean isShutdown()
boolean isTerminated()
boolean awaitTermination(long timeout, TimeUnit unit)
[ ThreadPoolExecutor ]
[ 개념 ]
- Spring에서는
ThreadPoolExecutor
를 상속
받은 ThreadPoolTaskExecutor
를 제공해서 더 편리하게 사용 가능
[ 핵심 파라미터 ]
- corePoolSize
최초 생성
되는 쓰레드 사이즈
corePoolSize
가 될 때 까지는 계속 쓰레드를 유지
- 계속 유지가 되기 때문에, 적절한 값을 찾아서 적용하는 것이 중요
- maximumPoolSize
- 해당 풀에
최대로 유지
할 수 있는 개수
corePoolSize = maximumPoolsize
이면 fixed-size thread pool
과 동일
- keepAliveTime
corePoolSize
를 넘어 maximumPoolSize
까지 증가하는 과정에서 KeepAliveTime
동안 idle
상태에 있으면 다시 corePoolSize
로 개수를 줄인다
- unit
- workQueue
corePoolSize
보다 많아졌을 경우, Task
가 쌓이며 기다리는 BlockingQueue
의 work
방식을 설정
- 종류
SynchronousQueue
: task를 큐에 유지하지 않고, 바로 스레드로 넘기는 방식의 큐
LinkedBlockingQueue
: 크기 제한이 없는 큐
ArrayBlockingQueue
: 크기 제한이 있는 큐
[ 생성 ]
final int corePoolSize = 3;
final int maximumPoolSize = 5;
final int queueCapacity = 3;
final ThreadPoolExecutor executor
= new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
1L,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(queueCapacity));
ExecutorService executorService = Executors.newSingleThreadExecutor()
ExecutorService executorService = Executors.newFixedThreadPool(4)
ExecutorService executorService = Executors.newCachedThreadPool()
ExecutorService executorService = Executors.newWorkStealingPool(10)
...
[ Executors ]
Executors.newSingleThreadExecutor()
Executors.newFixedThreadPool(int Threads)
Threads
갯수 만큼 고정된 쓰레드 풀을 생성
Executors.newCachedThreadPool()
- 필요할 때, 필요한 만큼의 쓰레드 풀을 생성 (계속 증가 가능 => 위험성 존재)
- 이미 생성된 쓰레드를 재활용할 수 있기 때문에 성능상의 이점이 있을 수 있음
Executors.newWorkStealingPool(int)
- 시스템에 가용 가능한 만큼 쓰레드를 활용하는 풀을 생성
Executors.newWorkStealingPool(int parallelism)
- Java8 에서 도입된 Thread Pool
- 지정된
parallelism
을 지원할 만큼 충분한 Thread를 유지하고, 여러 Queue를 사용하여 경합을 줄임
- Thread를 동적으로 늘리고 줄인다
- 작업이 실행되는 순서를 보장하지 않음
[ 그 외 ]
ScheduledExecutorService
- 지정한 스케쥴에 따라 작업을 수행할 수 있는 기능이 추가된
ExecutorService
인터페이스
ScheduledThreadPoolExecutor
Runnable / Callable
[ Runnable ]
[ 개념 ]
- 다중 스레드 작업을 나타내기 위해 제공되는 핵심 인터페이스
void
반환 타입을 가진다
=> 실행 후 결과를 받지 X
[ 구성 ]
public interface Runnable {
public void run();
}
[ Callable ]
[ 개념 ]
- Runnable과 동일하게, 다중 스레드 작업을 나타내기 위해 제공되는 핵심 인터페이스
<V>
반환 타입 (특정 객체 리턴)
=> 실행 후 결과를 객체로 받음
=> 기존 Runnable에서는 실행 결과를 받기 위해 공용 메모리
나 파이프
와 같은 것들이 필요했다
[ 구성 ]
public Interface Callable<V> {
V call() throws Exception
}
[ 특징 ]
- 최상단 인터페이스인 Executor에는 Runnable만 매개변수로 가진다
- 하지만, Executor를 상속받은 ExecutorService에 Callable을 변수로 가지는 메소드들이 정의
=> submit()
/ invokeAll()
/ invokeAny()
등
Future 타입
[ Future ]
[ 설명 ]
- 비동기적인 Task의 현재 상태를 조회하거나 결과를 가져오기 위한 객체
- Runnable / Callable의 상태를 조회하거나 결과를 확인하기 위해 사용
- FutureTask라는 구현체를 가진다
- 시간이 걸릴 수 있는 작업을 Future 내부에 작성하고,
호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 할 수 있음
=> 실행을 맞기고 미래 시점에 결과를 얻는 것으로 이해 가능
[ 구성 ]
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
boolean isCancelled();
boolean isDone();
boolean cancel(boolean mayInterruptIfRunning)
}
[ 한계 ]
- 복잡한 로직 구현 불가
- isDone() / isCanceled처럼 기본 사항들만 체크할 수 있기 때문에 복잡한 로직 구현에 한계가 존재
- get() 뒤에만 콜백을 지정할 수 있었다
- get()을 통한 강제 Blocking 발생
- 결과가 보장되지 않는 상태라서 콜백을 Future를 정의하며 등록할 수 없었다
- 여러 Future 조합 불가능
- 예외 처리용 API를 제공하지 않음
[ CompletableFuture ]
[ 설명 ]
[ 비동기 작업 실행 ]
- 특징
- 직접 쓰레드를 생성하지 않는다
--> 내부적으로 ForkJoinPool.commonPool()
의 쓰레드를 수행해서 비동기 작업 수행
[ 콜백 제공 ]
thenApply(Function
)
비동기 로직이 수행
된 후 결과값
을 받고, 값을 반환(return)
한다
Function
이니까 입력값 1개
/ 출력값 1개
thenAccept(Consumer
)
비동기 로직이 수행
된 후 결과값을 받고
, 값을 반환(return)하지 않는다
Consumer
니까 입력값 1개
/ 출력값 0개
thenRun(Runnable
)
비동기 로직이 수행된 후
결과값을 받지 않고
, 값을 반환(return)하지도 않는다
Runnable
이니까 입력값 0개
/ 출력값 0개
[ 조합하기 ]
thenCompose()
연관성이 있는 2개의 작업
을 처리
할 때 사용
해당 작업을 처리한 후
결과를 받아서
다음 비동기 작업을 처리
할 때 사용
thenCombine()
연관성이 없는 2개의 작업
을 처리
할 때 사용
allOf()
연관성이 없는 다수의 작업
을 수행
할 때 사용
각 작업의 반환(return)값
이 같다고 보장할 수 없기 때문
에 반환값을 가지지 않는다
만약 모든 작업의 결과
를 ArrayList로 저장
하려면 아래처럼 결과 값에 접근
해서 만들어야 한다
(에러처리의 복잡함
때문에 get()
대신 join()
사용)
anyOf()
연관성이 없는 다수의 작업
들 중 먼저 끝나는 하나가 있으면 종료
하는 방식
[ 예외처리 ]
exceptionally(Function)
비동기 작업을 하는 중
에 에러가 발생
했을 때, 에러를 처리
하기 위한 용도
handle(BiFunction)
비동기 작업을 하는 중
에 에러가 발생
했을 때, 결과 값
과 에러
모두를 처리
하기 위한 용도
[ ListenableFuture ]
[ 설명 ]
- Spring에서 제공하는 클래스로 콜백 메소드를 추가할 수 있도록 Future를 확장한 인터페이스
- Future가 이미 완료된 상태에서 콜백이 추가된다면 콜백은 즉시 호출
[ 구성 ]
public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> callback);
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture(this);
this.addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
ref