CompletableFuture[작성중]

김성환·2022년 6월 21일
0

자바

목록 보기
11/12

CompletableFuture란

CompletableFuture란 기존에 Future객체의 한계점을 보안한 인터페이스이다.
CompletableFuture를 알아보기 전에 쓰레드풀 개념부터 알아보자

Thread pool이란?

스레드 풀은 작업처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업큐 (Queue)에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다.
쓰레드 풀은 미리 작업처리에 사용되는 쓰레드의 개수를 정하기 때문에 작업처리 요청이 많아도 전체개수가 늘어나지 않아 성능이 급격히 저하되지 않는다.
이러한 쓰레드 풀을 생성하고 사용하기 위해서는 java.util.concurrent 패키지에서 ExecutorService인터페이스와 Executors클래스를 사용해야 한다.

ExecutorService란?

ExecutorService병렬작업 시 여러 개의 작업을 효율적으로 처리하기 위해 제공되는 라이브러리이다.

쓰레드 풀 생성방법

생성방법에는 4가지가 존재한다.(Executors를 사용하여 생성)

  • newFixed ThreadPool(int) : 인자 개수만큼 고정된 쓰레드풀을 만든다.
  • newCachedThredPool() : 필요할때 필요한 만큼 쓰레드풀 생성. 이미 생성된 쓰레드를 재활용 할수 있어 성능 용이.
  • newScheculedThreadPool(int) : 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 작업이 있다면 스케줄스레드 풀을 활용 할 수 있다.
  • newSingleThredExecutor() : 쓰레드 1개인 ExecutorService를 리턴한다. 싱글 스레드에서 동작하는 작업 처리시 사용한다.
    예제코드는 다음과 같다.
ExecutorService executorService = Executors.newCachedThreadPool();

작업생성

작업을 생성한다는 것이 바로 쓰레드를 생성한다는 의미와 동일하다.
작업을 생성하기 위해서는 runnable 객체(run)를 가져오거나 callable객체(call)를 가져와 작업 생성한다.

  • Runnable : 어떤 객체도 리턴하지 않는다.
  • Callable : 특정 타입의 객체를 리턴한다.
// Runnable
Runnable task = () -> {
    // 스레드가 처리할 내용
}
// Callable
Callable<T> task = () -> {
	// 스레드가 처리할 내용
    return T; // 리턴 값 
}

작업처리

작업처리를 하기 위해서는 생성된 쓰레드 풀에 작업들을 넣어야한다.
즉, ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣어야 한다는 것이다.
작업을 처리하기 위해서 ExecutorService는 여러가지 메서드를 제공한다.

  • execute() : 작업의 실행결과나 작업의 상태를 알 수 없음
  • submit() : 작업을 할당하고 Future타입의 결과값을 받는다.(결과를 리턴해야 하므로 callable을 주로 이용)(이 방식을 주로 사용)
  • invokeAny() : Collection을 인자로 받고 실행에 성공한 작업 중 하나의 리턴값을 반환한다.
  • invokeAll() : Collection을 인자로 받고 모든 작업의 리턴값을 List<Future<>> 타입으로 반환한다.

예제코드

public class Result {
    int accumValue;
    public synchronized void addValue(int value) {
        this.accumValue += value;
    }
}
public class Task implements Runnable {
    private final Result result;
    public Task(Result result) {
        this.result = result;
    }
    @Override
    public void run() {
        int sum = 0;
        for (int i = 1; i <= 10; i++) {
            sum += i;
        }
        result.addValue(sum);
    }
}
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
        System.out.println("[작업 처리 요청]");
        Result result = new Result();
        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);
        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);
        try {
            result = future1.get();
            result = future2.get();
            System.out.println("[처리 결과] " + result.accumValue);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("[실행 예외 발생함] " + e.getMessage());
        }
        executorService.shutdown();
    }
}

위 예제코드는 submit함수를 사용해서 task1,2를 실행하였다.
또한 future1,future2에 그 결과를 집어넣었고, get함수를 이용해서 해당 작업의 결과를 얻었다.
만약, get함수를 사용할 당시에 작업이 아직 안끝날 경우, 해당 작업(get함수를 사용한 작업)이 끝날때까지 다른 작업들이 block되고 기다리게 된다.

CompletableFuture가 등장한 배경

Future인터페이스는 여러가지 한계점을 가지고 있다.

  • Future를 외부에서 완료시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수 없다.
  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    -> Future를 통해 결과값을 만들고 무언가를 하는 작업은 get() 이후에 와야한다.
  • 여러 Future를 조합할 수 없다. 예) 이벤트 정보를 가져온 다음에 이벤트에 참여한 회원 목록 가져오기
  • 예외처리용 API를 제공하지 않는다.

작업을 생성하는 방법

CompletableFuture로 작업을 생성하는 방법은 2가지가 있다.
리턴값이 없는 경우: runAsync()
리턴값이 있는 경우: supplyAsync()
CompletableFuture 문서 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
아래는 예제코드이다.

public class App {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newFixedThreadPool(4);
		CompletableFuture<Void> futureThenRun = CompletableFuture.supplyAsync(() -> {
			System.out.println("Hi ThenRun " + Thread.currentThread().getName());
			return "Hi ThenRun";
		}, executorService).thenRun(() -> {
			System.out.println(Thread.currentThread().getName());
		});
		futureThenRun.get();
		executorService.shutdown();
	}
}

위 코드를 보면 ExecutorService executorService = Executors.newFixedThreadPool(4); 문장으로 쓰레드 풀을 생성하고 futureThenRun = CompletableFuture.supplyAsync() 문장으로 리턴값이 있는 작업을 생성한다.
작업은 람다식으로 작성되었으며, 해당 코드는 다음과 같다.

() -> {
	System.out.println("Hi ThenRun " + Thread.currentThread().getName());
	return "Hi ThenRun"; // 리턴 값 존재
}

콜백함수 실행(작업은 get() 이후에 오지 않아도 됨)

CompletableFuture의 경우 콜백함수를 실행할 수 있는 메서드가 존재한다.
이러한 메서드로 인해 생성된 결과값을 이용해 조합하거나 계산을 할 수 있게 된다.(기존 Future로는 불가능)
thenApply(Function) = 리턴값을 받아서 다른 값으로 바꾸는 콜백(리턴있이)
thenAccept(Consumer) = 리턴값을 또 다른 작업을 처리하는 콜백(리턴없이)
thenRun(Runnable) = 리턴값을 받지 않고 다른 작업을 처리하는 콜백
위의 예제코드에서는 thenRun()함수가 사용되었다.
아래는 thenRun()함수 안에 들어가는 콜백함수이다.

() -> {
	System.out.println(Thread.currentThread().getName());
}

타임아웃 설정하기

orTimeout 메서드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException 으로 완료하면서 또 다른 CompletableFuture를 반환할 수 있도록 해준다.
예시코드이다.

Future<Double> futurePriceInUSD = 
  CompletableFuture.supplyAsync(()->shop.getPrice(product))
  .thenCombine(
		CompletableFuture.supplyAsync() -> exchangeService.getRate(Money.EUR, Money.USD)),
		(price, rate) -> price * rate
  )
  .orTimeout(3, TImeUnit.SECONDS);

.orTimeout 해당 메서드를 추가하여 3초후에 작업을 끝내지 못할 경우 TimeoutException이 발생하도록 하였다.

참고
https://rlawls1991.tistory.com/entry/CompletableFuture

profile
개발자가 되고 싶다

0개의 댓글