JAVA 비동기 프로그래밍: CompletableFuture

Suyeon Jin·2022년 6월 12일
3

CompletableFuture를 이해하기 위해서 자바의 Concurrent 프로그래밍부터 짚어볼 필요가 있다.

1. Concurrent Programming

Concurrent 소프트웨어는 동시에 여러 작업을 할 수 있는 소프트웨어를 의미한다. 예를 들면, 크롬으로 음악을 틀어두고 문서 작업을 할 수 있는 것처럼..~

자바에서 지원하는 Concurrent 프로그래밍에는 멀티 프로세싱멀티스레드가 있는데, CompletableFuture는 멀티스레드와 관련있으므로 이번에는 멀티스레드에 대해서만 정리하였다.
(cf 스프링 프레임워크는 자바의 멀티쓰레드를 사용한다.)

1-1. Thread

Thread를 상속하면 새로운 Thread 클래스를 새롭게 정의할 수 있다.

public class Main {
    public static void main(String[] args) {
        YourThread yourThread = new YourThread();
        yourThread.start();
        
        MyThread myThread = new MyThread();
        myThread.start();

        System.out.println("메인 쓰레드 : " + Thread.currentThread().getName());
    }

    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("마이 쓰레드 : " + Thread.currentThread().getName());
        }
    }

    static class YourThread extends Thread {
        @Override
        public void run() {
            System.out.println("유어 쓰레드 : " + Thread.currentThread().getName());
        }
    }
}
메인 쓰레드 : main
마이 쓰레드 : Thread-1
유어 쓰레드 : Thread-0

Process finished with exit code 0

여러 개의 Thread를 정의할 수 있다. 또한, Thread는 순서가 보장되지 않는다.
코드에서는 YourThread -> MyThread -> main Thread 순으로 호출했지만, 결과는 예상한대로 나오지 않았다. 실행을 할 때마다 매번 호출 순서가 바뀌어 출력된다.

1-2. Runnable

Thread 클래스를 따로 정의하지 않아도, Runnable 이라는 익명 객체를 통해 새로운 Thread 객체를 사용할 수 있다.

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("뉴 쓰레드 : " + Thread.currentThread().getName());
            }
        });
        thread.start();
        
        System.out.println("메인 쓰레드 : " + Thread.currentThread().getName());
    }
}

Thread를 상속하는 방법보다 간단하긴 하다. 자바8부터는 람다식을 사용하여 위의 코드를 간추릴 수 있다.

        Thread thread = new Thread(() -> {
            System.out.println("뉴 쓰레드 : " + Thread.currentThread().getName())
        });
        thread.start();

2. Executor

Thread, Runnable과 같은 low level API를 프로그래머가 직접 다루는 것은 현실적으로 어렵다. (수십,수백 개 쓰레드의 자원을 관리하려면..머리가 꽤 아플 것이다.)
그래서 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리하여 high level API인 'Executor'에 위임하였다.

주요 인터페이스로는, 1) Executor, 2) ExecutorService, 3) ScheduledExecutorService 가 있다.

2-1. Executor

Executor executor = Executors.newSingleThreadExecutor(); // single thread
executor.execute(() -> System.out.println("Thread: " + Thread.currentThread().getName()));

Executor는 쓰레드를 생성하고 처리하는 인터페이스이다.
execute()라는 메서드만을 가지고 있는데, 쓰레드를 처리할 수는 있지만 종료할 수는 없다. ide에서 실행시켜보면 강제종료하기 전까지 계속 실행되는 것을 확인할 수 있다.

2-2. ExecutorService

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> System.out.println("Thread : " + Thread.currentThread().getName()));
executorService.shutdown();


ExecutorService executorService2 = Executors.newSingleThreadExecutor();
executorService2.submit(() -> {
	try {
    	Thread.sleep(30000);
        System.out.println("Thread2 : " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
              System.out.println("Call shutdownNow!");
              e.printStackTrace();
        }
	});
executorService2.shutdownNow();

ExecutorService는 Executor를 상속 받은 인터페이스이다. 쓰레드를 생성하고 처리하고 종료하는 등의 작업을 할 수 있다.
submit()을 사용하면 작업을 ExecutorService가 만든 쓰레드풀에서 처리하고, shutdown()으로 쓰레드 풀을 종료할 수 있다.
shutdown()을 사용하면 작업이 다 종료될 때까지 기다리고 종료하지만, 작업이 끝나든 말든 지금 당장 종료하고 싶다면 shutdownNow()를 사용하면 된다.

ExecutorService는 Executor를 상속 받았기 때문에 execute()와 submit() 모두 호출이 가능하다.
다만, 두 메서드는 받을 수 있는 인자 타입에 차이가 있다.
execute()는 Runnable 인터페이스만 인자로 받을 수 있지만, submit()은 Runnable과 Callable 인터페이스 모두 인자로 받을 수 있다.
또한, 두 메서드는 리턴 타입도 다르다.
execute()는 반환 값이 없는 반면, submit()은 Future 객체를 반환한다.

출처 https://www.geeksforgeeks.org/difference-between-executorservice-execute-and-submit-method-in-java/

2-3. ScheduledExecutorService

ScheduledExecutorService는 ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(() -> System.out.println("After 3 seconds..."), 3, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();

ScheduledExecutorService scheduledExecutorService2 = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService2.scheduleAtFixedRate(() -> System.out.println("Period is 2 seconds..."), 3, 2, TimeUnit.SECONDS);
Thread.sleep(5000);
scheduledExecutorService2.shutdown();

3. Callable

Runnable은 반환 값이 void 이기 때문에, 작업을 처리하는 것 뿐 작업 결과를 리턴할 수 없었다.
Callable은 Runnable과 유사하지만, 작업의 결과(Future 객체)를 받을 수 있다는 차이가 있다.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
          Thread.sleep(2000);
          return "hello";
        };

        Future<String> result = executorService.submit(hello);
        
        System.out.println("Started!");
        result.get(); // 블로킹
        System.out.println("End!");
        executorService.shutdown();
    }
}

Future의 get()은 작업 결과를 리턴하는 함수인데, 블로킹 콜이라 리턴될 때까지 리소스를 잡고 있는다. 결과를 최대한으로 기다릴 시간(타임아웃)을 설정할 수 있다.

Future의 isDone()함수로는 작업 상태를 확인할 수 있다. 작업이 완료되었으면 true, 아니면 false를 리턴한다.

future.isDone();

Future의 cancel()함수를 사용하면 해당 스레드의 작업을 취소할 수 있다.
인자로는 true/false를 줄 수 있다. true는 현재 진행중인 작업을 interrupt하면서 종료하는 것이고, false는 현재 진행중인 작업이 끝날때까지 기다렸다가 종료하는 것이다.
false를 줬다고 해서 결과를 얻을 수 있는건 아니다. 단지 작업을 기다려주는 것 뿐이다.

future.cancel(true);

executorService.invokeAll()을 사용하면 여러 작업을 Collections으로 넘겨 동시에 실행할 수 있다. 다만, 동시에 실행한 작업 중 제일 오래 걸리는 작업 만큼 시간이 걸린다.
Thread pool에 있는 Callable 동작들을 각 Thread가 처리하고 모든 작업이 끝나면, 즉 마지막 작업까지 종료된 4초 뒤에야 작업의 결과를 가져올 수 있다.

public class StudyCallable {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
          Thread.sleep(2000);
          return "hello";
        };

        Callable<String> so = () -> {
            Thread.sleep(3000);
            return "so";
        };

        Callable<String> bye = () -> {
            Thread.sleep(4000);
            return "bye";
        };

        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, so, bye));
        for (Future<String> f : futures) {
            System.out.println(f.get());
        }

        executorService.shutdown();
    }

4. CompletableFuture

Future를 사용해서도 비동기 프로그래밍이 가능했지만, Future만으로 힘든 작업들도 많았다.

  • Future는 취소하거나 get()에 타임아웃을 설정할 수는 있지만, 외부에서 완료시킬 수는 없다.
  • 블로킹 코드(get())를 사용하지 않고는 작업이 끝났을 때 콜백을 실행할 수 없다.
  • 여러 Future를 조합할 수 없다. (ex. 유튜브 영상 정보를 가져오고 해당 영상의 댓글 목록 가져오기)
  • 예외 처리용 API를 제공하지 않는다.

CompletableFuture는 Future만으로는 힘들었던 비동기 작업을 가능하게 하는 인터페이스이다. CompletableFuture를 사용하면 ExecutorService 객체와 Future 객체를 따로 만들지 않아도 된다.

CompletableFuture를 사용하여 비동기 작업을 실행해보자.
Runnable처럼 리턴값 없이 사용할 때는 runAsync()를 사용한다.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("No Return"));

리턴값이 있는 경우에는 supplyAsync()를 사용한다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return");
            return "ParamReturn";
});
System.out.println(future.get()); // 결과 가져오기

CompletableFuture를 사용하여 Asynchronous하게 콜백을 실행시킬수도 있다. 리턴값을 받아서 다른 값으로 바꾸는 콜백을 구현하기 위해 thenApply()를 사용할 수 있다.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return: " + Thread.currentThread().getName());
            return "ParamReturn";
        }).thenApply((s) -> {
            System.out.println("CallBack: " + Thread.currentThread().getName());
            return s.toUpperCase();
        });
        System.out.println(future.get()); // 결과 가져오기
    }
}

thenApply() 말고도 콜백을 구현할 수 있는 함수들이 있다.
thenAccept()을 사용하면 리턴값을 받아 리턴 없이 또 다른 작업을 처리하는 콜백을 구현할 수 있고,
thenRun()을 사용하면 리턴값을 받지 않고 또 다른 작업을 처리하는 콜백을 구현할 수 있다.

Executor를 해당 스레드에서 사용하게 할 수도 있다.
runAsync() OR supplyAsync()의 두 번째 인자로 ExecutorService를 주면 해당 스레드 풀 내에서 작업을 할당해서 처리한다.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello: " + Thread.currentThread().getName());
            return "HelloReturn";
        }, executorService).thenAccept((s) -> {
            System.out.println("HelloCallBack: " + Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });

        future.get();
        executorService.shutdown();
    }
}

스레드의 콜백작업을 또 다른 스레드에서 처리할 수 있는데! then~() 에 Async를 붙인 then~Async()함수를 사용하고 두 번째 인자로 ExecutorService를 주면 된다.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Bye: " + Thread.currentThread().getName());
            return "ByeReturn";
        }, executorService).thenAcceptAsync((s) -> {
            System.out.println("ByeCallBack: " + Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        }, executorService);
        
        future.get();
        executorService.shutdown();
    }
}

0개의 댓글