[강의] 인프런 : 더 자바, Java8 섹션 6

haeny-dev·2021년 7월 29일
0
post-thumbnail

📌 섹션 6 : CompletableFuture

📚 Java Concurrent Programming

➕ Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어를 말하며 예를 들어, 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑 할 수 있다.

  • 자바에서 지원하는 Concurrent 프로그래밍에는 멀티프로세싱(ProcessBuilder) 와 멀티쓰레드가 있는데, 여기서는 쓰레드에 대해서만 간단하게 학습한다.

  • 자바 멀티쓰레드를 위해 Thread와 Runnable 객체를 이용한다.

➕ Thread/Runnable

1. Thread 구현

// Thread를 상속받아 새로운 스레드 객체를 만드는 경우
static class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread: " + Thread.currentThread().getName());
    }
}

public static void main(String[] args) throws InterruptedException {
    /*
    * Thread 객체를 상속받은 새로운 스레드 객체를 만들어서 사용하는 경우
    * */
    MyThread myThread = new MyThread();
    myThread.start();

    /*
    * Thread 객체를 생성할 때, Runnable 인터페이스를 넘겨주어 람다로 구현하여 사용하는 경우
    * */
    Thread thread = new Thread(() -> {
        try {
            Thread.sleep(1000L);    // 해당 스레드를 1000millis 동안 재운다
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Thread: " + Thread.currentThread().getName());
    });
    thread.start();
}

2. 주요 기능

• sleep

현재 Thread 멈춰두고, 다른 Thread가 처리할 수 있도록 기회를 준다.
그렇다고 락을 놔주진 않는다.

• interrupt

해당 쓰레드를 깨워서 interruptException을 발생시킨다. 단지 그 예외만 발생시키며, 발생했을 때 어떻게 처리하느냐에 따라 다르게 처리될 수 있다.

public static void main(String[] args) throws InterruptedException {
    /*
    * Interrupt 기능
    * */
    Thread interruptThread = new Thread(() -> {
       while(true){
           System.out.println("Thread: " + Thread.currentThread().getName());

           try {
               Thread.sleep(1000L);
           } catch (InterruptedException e) {   // sleep 되는 동안 interrupt를 받으면 발생한다.
               System.out.println("Interrupt!");
               return;
           }
       }
    });
    interruptThread.start();
    interruptThread.interrupt();
}

// 출력결과
Thread: Thread-2
Interrupt!
• join

해당 스레드가 끝날 때까지 기다렸다가 현재 스레드를 진행하게 됩니다.

public static void main(String[] args) throws InterruptedException {
    /*
    * Join 기능
    * */
    Thread joinThread = new Thread(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    });
    joinThread.start();
    
    System.out.println("Hello: " + Thread.currentThread().getName());
    joinThread.join();  // 해당 스레드가 끝날 때까지 기다린다.
    System.out.println(joinThread + " is finished!");
}

// 출력결과
Hello: main
Thread: Thread-3
Thread[Thread-3,5,] is finished!

➕ Executors

Thread가 해야할 작업에 대해서만 정의를 해주고, 그 외 Thread를 만들고 관리하는 작업은 Executor에게 위임한다.

실제, Executor를 사용하기보다 해당 인터페이스를 상속받은 ExecutorService를 사용하여 Callable도 실행시킬 수 있으며, 여러 Callable을 동시에 실행하는 등의 기능을 사용할 수 있다.

/**
 * Runnable 인터페이스의 run 메서드의 경우 반환타입이 void 이다.
 */
private static Runnable getRunnable(String message) {
    return () -> System.out.println(message + " :: " + Thread.currentThread().getName());
}

public static void main(String[] args) {
    /**
     * Executor는 Runnable 객체만 구현하여 넘겨주고 나머지 스레드 관리는 Executor에게 넘겨주는 개념이며,
     * Runnable 객체를 받아서 사용하는 만큼 실행 결과 반환값이 없는 경우일 때 사용한다.
     */
    ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();  // 스레드가 1개인 ExecutorService 생성
    ExecutorService fixedExecutorService = Executors.newFixedThreadPool(2);  // 스레드가 2개인 ExecutorService 생성
    fixedExecutorService.submit(getRunnable("Hello"));
    fixedExecutorService.submit(getRunnable("Haeny"));
    fixedExecutorService.submit(getRunnable("The"));
    fixedExecutorService.submit(getRunnable("Java"));
    fixedExecutorService.submit(getRunnable("Thread"));

    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//        scheduledExecutorService.schedule(getRunnable("HELLO"), 3, TimeUnit.SECONDS);
    scheduledExecutorService.scheduleAtFixedRate(getRunnable("Period"), 1, 2, TimeUnit.SECONDS);

    fixedExecutorService.shutdown();	// 처리중인 작업을 기다렸다가 종료
    // executorServece.shutdownNow(); 	// 당장 종료
}

// 출력결과
Hello :: pool-2-thread-1
Haeny :: pool-2-thread-2
The :: pool-2-thread-1
Thread :: pool-2-thread-1
Java :: pool-2-thread-2
Period :: pool-3-thread-1
Period :: pool-3-thread-1
Period :: pool-3-thread-1
...

➕ Callable과 Future

1. Callable

Runnable과 유사하지만, 작업의 결과를 받을 수 있다.

2. Future

비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

• get, isDone, cancel
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    /**
     * Callable 객체는 Runnable 객체와 다르게 리턴타입을 정해줄 수 있다.
     */
    Callable<String> hello = () -> {
      Thread.sleep(2000L);
      return "Hello";
    };

    Future<String> helloFuture = executorService.submit(hello);
    System.out.println(helloFuture.isDone());
    System.out.println("Started!");

    helloFuture.get();   // Blocking call
    /*
    * true 경우 interrupt를 발생시키고, false의 경우 해당 작업을 기다린다.
    * cancel을 사용한 경우 get을 통해서 값을 가져올 수 없다.
    * -> 취소한 작업에서 왜 값을 가져오려고 하냐는 예외를 발생시킨다.
    * */
    helloFuture.cancel(false);  //

    System.out.println(helloFuture.isDone());
    System.out.println("End!");
    executorService.shutdown();		
}

// 출력결과
false
Started!
true
End!
• invokeAll, invokeAny

비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> java = () -> {
        Thread.sleep(3000L);
        return "java";
    };

    Callable<String> haeny = () -> {
        Thread.sleep(1000L);
        return "haeny";
    };

    List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, haeny));
    for(Future<String> f : futures){
        System.out.println(f.get());
    }

    String s = executorService.invokeAny(Arrays.asList(hello, java, haeny));	// Blocking Call
    System.out.println(s);

    executorService.shutdown();
}

// 출력결과
Hello
java
haeny
haeny

📚 CompletableFuture

➕ 자바에서 비동기 프로그래밍

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 Java8 인터페이스이다. Future를 사용해서 어느정도 가능하였지만, 제한되는 것이 많았다.

Future로 하기 어려운 작업

  • Future를 외부에서 완료 시킬 수 없다.
  • 블로킹 코드(get)를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행 할 수 없다.
  • 여러 Future를 조합할 수 없다.
  • 예외처리용 API를 제공하지 않는다.

➕ 비동기 작업 실행

public static void main(String[] args) throws ExecutionException, InterruptedException {
    /* 기본 생성 방식 */
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("future value");
    System.out.println(future.get());

    CompletableFuture<String> future1 = CompletableFuture.completedFuture("future1 value");
    System.out.println(future1.get());

    /* 리턴값이 없는 경우 */
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
        System.out.println("Void :: " + Thread.currentThread().getName());
    });
    voidCompletableFuture.get();

    /* 리턴값이 있는 경우 */
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("String :: " + Thread.currentThread().getName());
        return "Hello";
    });
    
    System.out.println(stringCompletableFuture.get());
}

// 출력결과
future value
future1 value
Void :: ForkJoinPool.commonPool-worker-1
String :: ForkJoinPool.commonPool-worker-1
Hello

➕ 콜백 제공하기

public static void main(String[] args) throws ExecutionException, InterruptedException {

    /* 리턴값이 있는 경우 */
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("String :: " + Thread.currentThread().getName());
        return "Hello";
    }).thenApply((s) -> {   // 콜백함수를 사용하여 리턴값을 반환할 때
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    });
    System.out.println(stringCompletableFuture.get());

    /* 콜백함수에서 인자값을 받아서 함수 수행 후 리턴값이 없는 경우 */
    CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    }).thenAccept((s) -> {
        System.out.println(s + " :: " + Thread.currentThread().getName());
    });
    voidCompletableFuture1.get();

    /* 인자값이 필요없고, 리턴값이 없는 콜백함수 실행 */
    CompletableFuture<Void> voidCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello :: " + Thread.currentThread().getName());
        return "Hello";
    }).thenRun(() -> {
        System.out.println(Thread.currentThread().getName());
    });
    voidCompletableFuture2.get();
}

// 출력결과
String :: ForkJoinPool.commonPool-worker-1
main
HELLO
Hello :: main
Hello :: ForkJoinPool.commonPool-worker-1
main

➕ ThreadPool 제공하기

public static void main(String[] args) throws ExecutionException, InterruptedException {
    /*
     * CompletableFuture 는 기본적으로 ThreadPool을 생성해주지 않아도, ForkJoinPool 을 사용한다.
     * 그러나, 직접 ThreadPool 을 지정해 주고싶다면, ExecutorService 를 생성하여 인자값으로 넣어주어 설정해줄 수 있다.
     * */
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Executor :: " + Thread.currentThread().getName());
        return "Executor";
    }, executorService).thenRunAsync(() -> {
        System.out.println("Executor :: " + Thread.currentThread().getName());
    }, executorService);

    voidCompletableFuture3.get();
    executorService.shutdown();
}

// 출력결과
Executor :: pool-1-thread-1
Executor :: pool-1-thread-2

➕ 조합하기

• thenCompose

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });

    /* thenCompose 사용 경우 */
    CompletableFuture<String> composeFuture = hello.thenCompose(App2::getWorld);
    System.out.println(composeFuture.get());
}

private static CompletableFuture<String> getWorld(String message) {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return message + " World";
    });
}

// 출력결과
Hello ForkJoinPool.commonPool-worker-1
World ForkJoinPool.commonPool-worker-1
Hello World

• thenCombine

public static void main(String[] args) throws ExecutionException, InterruptedException {

    /* thenCombine 사용 경우 */
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
    
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });

    CompletableFuture<String> combineFuture = hello.thenCombine(world, (h, w) -> h + " " + w);
    System.out.println(combineFuture.get());
}

// 출력결과
World ForkJoinPool.commonPool-worker-1
Hello World

• 두 개 이상의 task 처리

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });
    
    /* 2개 이상의 task를 합쳐서 실행 시키는 경우 */
    /* 모든 task 결과를 다 기다리는 경우 */
    CompletableFuture<Integer> integer = CompletableFuture.supplyAsync(() -> {
        return 100;
    });

    List<CompletableFuture> futures = Arrays.asList(hello, world, integer);
    CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]);

    CompletableFuture<List<Object>> results = CompletableFuture.allOf(futureArray)
            .thenApply(v -> futures.stream()
                    .map(CompletableFuture::join)   // join 을 하면 Unchecked Exception 이 발생함
                    .collect(Collectors.toList()));

    results.get().forEach(System.out::println);

    /* 아무거나 먼저 끝나는 결과를 받아오는 경우 */
    CompletableFuture<Void> anyOfFuture = CompletableFuture.anyOf(futureArray).thenAccept(System.out::println);
    anyOfFuture.get();
}

// 출력결과
Hello
World
100
Hello

➕ 예외처리

• exceptionally

public static void main(String[] args) throws ExecutionException, InterruptedException {
    boolean throwError = true;

    /* exceptionally */
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError){
            throw new IllegalArgumentException();
        }

        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).exceptionally(ex -> {
        System.out.println(ex);
        return "Error!";
    });
    
    System.out.println(hello.get());
}

// 출력결과
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!

• handle

public static void main(String[] args) throws ExecutionException, InterruptedException {

    /* handle */
    boolean throwError2 = false;
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        if(throwError2){
            throw new IllegalArgumentException();
        }

        System.out.println("World " + Thread.currentThread().getName());
        return "World";
    }).handle((result, ex) -> {
       if(ex != null){
           System.out.println(ex);
           return "ERROR!";
       }
       return result;
    });

    System.out.println(world.get());
}

// 출력결과
World ForkJoinPool.commonPool-worker-1
World

📖 REFERENCE

Lesson: Concurrency - https://docs.oracle.com/javase/tutorial/essential/concurrency/
Thread (Java Platform SE 8) - https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt--
Executors - https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html
Future (Java Platform SE 8) - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
CompletableFuture (Java Platform SE 8) - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
CompletionStage (Java Platform SE 8) - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
ForkJoinPool (Java Platform SE 8) - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html

0개의 댓글