CompletableFuture

이연중·2021년 2월 2일
0

JAVA

목록 보기
13/20

자바에서 비동기 프로그래밍을 가능하게 하는 인터페이스

자바 Concurrent 프로그래밍

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어
  • 예) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있음
  • 예) 녹화를 하면서 IntelliJ로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있음

자바에서 지원하는 Concurrent 프로그래밍

  • 멀티프로세싱(ProcessBuilder)
  • 멀티쓰레드

자바 멀티쓰레드 프로그래밍

  • Thread/Runnable

Thread 상속

public static void main(String[] args){
	HelloThread helloThread= new HelloThread();
    helloThread.start(); //쓰레드 생성
    System.out.println("hello : "+Thread.currentThread().getName); 
}

static class HelloThread extends Thread{
    @Override
    public void run(){
        System.out.println("world : "+Thread.currentThread().getName);
    }
}

출력

world : Thread-0
hello : main    

Runnable 구현 또는 람다

Thread thread= new Thread(new Runnable(){
    @Override
    public void run(){
		System.out.println("world : "+Thread.currentThread().getName());
    }
});
thread.start();
System.out.println("hello : "+Thread.currentThread().getName());
//Runnable구현
-----------------------------------------------------------------------------------------------------------
Thread thread= new Thread(()->System.out.println("world : "+Thread.currentThread().getName()));
thread.start();
System.out.println("hello : "+Thread.currentThread().getName());
//람다 구현

쓰레드 주요 기능

  • 현재 쓰레드 멈춰두기(sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만, 그렇다고 락을 놔주진 않음(잘못하면 데드락 걸릴 수도 있음)

  • 다른 쓰레드 깨우기(interrupt): 다른 쓰레드를 깨워서 interruptedException을 발생시킴. 해당 에러가 발생했을 때, 할 일은 코딩하기 나름(종료 시킬수도, 계속 하던 일을 할수도)

    • Thread thread=new Thread(() -> {
                  while(true){
                      System.out.println("Thread : "+Thread.currentThread().getName());
                      try{
                          Thread.sleep(1000L);
                      }catch(InterruptedException e){//interrupt가 발생하면 쓰레드 중지
                          System.out.println("interrupted!");
                          return;
                      }
                  }
              });
              thread.start();
      
              System.out.println("Hello : "+Thread.currentThread().getName());
              Thread.sleep(3000L);
              thread.interrupt(); //interrupt 발생
      
      /*
      Thread : Thread-0
      Hello : main
      Thread : Thread-0
      Thread : Thread-0
      interrupted! 출력!
      */
  • 다른 쓰레드 기다리기(join): 다른 쓰레드가 끝날 때까지 기다림

Executors

쓰레드가 많아질경우 프로그래머가 직접 코딩으로 이를 관리하는 것은 어려운 일이다. Executors는 프로그래머 대신 쓰레드들을 관리해준다.

고수준(High-Level) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 어플리케이션에서 분리
  • 해당 기능을 Executors에 위임

Executors가 하는 일

  • 쓰레드 만들기: 어플리케이션이 사용할 쓰레드 풀을 만들어 관리
  • 쓰레드 관리: 쓰레드 생명 주기를 관리
  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공

주요 인터페이스

  • Executor: execute(Runnable)

  • ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능 제공

  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스를 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있음

    • public static void main(String[] args) throws InterruptedException {
              ScheduledExecutorService executorService= Executors.newSingleThreadScheduledExecutor();
              executorService.schedule(getRunnable("Hello"),3, TimeUnit.SECONDS); //3초후 메세지 찍힘
              executorService.scheduleAtFixedRate(getRunnable("Hello"),1,2,TimeUnit.SECONDS);
              //1초 대기후 2초에 한번씩 출력
          }
      
          private static Runnable getRunnable(String msg){
              return ()-> System.out.println(msg+Thread.currentThread().getName());
          }

ExecutorService로 작업 실행하기

ExecutorService executorService= Executors.newSingleThreadExecutor(); //쓰레드 하나 쓰는 Executor
executorService.submit(()->{
    System.out.println("Hello : "+Thread.currentThread().getName());
}); //쓰레드 실행
executorService.shutdown(); //Executor는 쓰레드 실행후 대기 상태이기 때문에 명시적으로 종료를 시켜줘야 함

ExecutorService로 멈추기

executorService.shutdown(); //처리중인 작업을 기다렸다가 종료
executorService.shutdownNow(); //즉시 종료

쓰레드의 개수보다 처리하는 작업의 수가 많을때

  • 구조: main -> ExecutorService[ Blocking-Queue, Thread Pool ]
  • 메인에서 ExecutorService로 넘겨준다. ExecutorService에는 Blocking-Queue와 Thread Pool이 있는데, Blocking-Queue에서는 처리하는 작업들이 대기하고, Thread Pool의 쓰레드들은 큐에 있는 순서에 따라 돌아가며 작업을 수행한다.

Fork/Join 프레임워크

  • ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게끔 도와줌

Callable과 Future

Collable

  • Runnable과 유사하지만 작업의 결과를 받을 수 있음

Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있음

결과 가져오기 get()

ExecutorService executorService= Executors.newSingleThreadException();
Future<String> helloFuture= executorService.submit(()->{
    Thread.sleep(2000L);
    return "Collable";
});
System.out.println("Hello");
String result= helloFuture.get(); //블록킹 콜(결과값을 가져올때까지 기다림)
System.out.println(result);
executorService.shutdown();
  • 블록킹 콜
  • 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있음

작업 상태 확인하기 isDone()

  • 완료 했으면 true 아니면 false 리턴

    • ExecutorService executorService=Executors.newSingleThreadExecutor();
      
      Callable<String> hello=()->{
      	Thread.sleep(2000L);
          return "Hello";
      };
      
      Future<String> helloFuture=executorService.submit(hello);
      System.out.println(helloFuture.isDone());
      System.out.println("Started");
      
      helloFuture.get();
      
      System.out.println(helloFuture.isDone());
      System.out.println("End!");
      executorService.shutdown();
      
      /*
      false
      Started
      true
      End! 출력!
      */

작업 취소하기 cancel()

  • 취소 했으면 true 못했으면 false 리턴

  • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다림

  • ExecutorService executorService=Executors.newSingleThreadExecutor();
    
    Callable<String> hello=()->{
    	Thread.sleep(2000L);
        return "Hello";
    };
    
    Future<String> helloFuture=executorService.submit(hello); helloFuture.cancel(false); //helloFuture 작업 종료
    System.out.println(helloFuture.isDone());
    System.out.println("Started");
            
    System.out.println(helloFuture.isDone());
    System.out.println("End!");
    executorService.shutdown();
    
    /*
    true
    Started
    true
    End! 출력!
    */

여러 작업 동시에 실행하기 invokeAll()

  • 동시에 실행한 작업 중에 제일 오래 걸리는 작업만큼 시간이 걸림

    • ExecutorService executorService=Executors.newSingleThreadExecutor();
      
      Callable<String> hello=()->{
      	Thread.sleep(2000L);
          return "Hello";
      };
      
      Callable<String> java=()->{
      	Thread.sleep(3000L);
          return "Java";
      };
      
      Callable<String> always=()->{
      	Thread.sleep(1000L);
          return "Always"
      };
      
      List<Future<String>> futures=executorService.invokeAll(Arrays.asList(hello,java,always));
              //hello가 2호 java가 3초 always가 1초이다. invokeAll은 시간이 제일 긴 java를 기다리고 모든 결과값을 받아온다
      for(Future<String> f:futures){
      	System.out.println(f.get());
      }
      
      executorService.shutdown();

여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()

  • 동시에 실행한 작업중 제일 짧게 걸리는 작업만큼 시간이 걸림
  • 블록킹 콜

CompletableFuture 1

ForkJoinPool을 사용하기에 따로 Thread Pool을 만들지 않아도 됨

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없음. 취소하거나, get()에 타임아웃을 설정할 수는 있음
  • 블로킹 콜(get())을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없음
  • 여러 Future를 조합할 수 없음(ex) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기)
  • 예외 처리용 API 제공하지 않음

Completable(외부에서 명시적으로 Complete 시킬 수 있음) Future

  • Implements Future
  • Implements CompletionStage

비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()

    • CompletableFuture<void> future= CompletableFuture.runAsync(()->{
          System.out.println("Hello "+Thread.currentThread().getName());
      });
      
      future.get();
  • 리턴값이 있는 경우: supplyAsync()

    • CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{
          System.out.println("Hello "+Thread.currentThread().getName());
          return "Hello";
      });
      
      System.out.println(future.get());
  • 원하는 Executor(쓰레드 풀)를 사용해서 실행할 수도 있음(기본은 ForkJoinPool.commonPool())

콜백 제공하기

  • thenApply(Fucntion): 리턴값을 받아 다른 값으로 바꾸는 콜백

    • CompletableFuture<void> future= CompletableFuture.runAsync(()->{
          System.out.println("Hello "+Thread.currentThread().getName());
          return "Hello";
      }).thenApply((s)->{
          System.out.println(Thread.currentThread().getName());
          return s.toUpperCase();
      });
      
      System.out.println(future.get());
  • thenAccept(Consumer): 리턴값을 받아 또 다른 작업을 처리하는 콜백(리턴 없이)

    • CompletableFuture<void> future= CompletableFuture.supplyAsync(()->{
          System.out.println("Hello "+Thread.currentThread().getName());
          return "Hello";
      }).thenAccept((s)->{
          System.out.println(Thread.currentThread().getName());
          System.out.println(s.toUpperCase());
      });
      
      future.get();
  • thenRun(Runnable): 리턴값을 받지 않고 다른 작업을 처리하는 콜백

    • CompletableFuture<void> future= CompletableFuture.supplyAsync(()->{
          System.out.println("Hello "+Thread.currentThread().getName());
          return "Hello";
      }).thenRun(()->{
          System.out.println(Thread.currentThread().getName());
      });
      future.get();
  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있음

CompletableFuture 2

조합하기

  • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합

    • CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
      	System.out.println("Hello"+Thread.currentThread().getName());
          return "Hello";
      });
      
      CompletableFuture<String> future=hello.thenCompose(App::getWorld);
      System.out.println(future.get());
      }
      
      private static CompletableFuture<String> getWorld(String msg){
      	return CompletableFuture.supplyAsync(()->{
      		System.out.println(msg+Thread.currentThread().getName());
              return msg+" World";
          });
      }
  • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행

    • CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
      	System.out.println("Hello "+Thread.currentThread().getName());
      	return "Hello";
      });
      
      CompletableFuture<String> world=CompletableFuture.supplyAsync(()->{
      	System.out.println("World "+Thread.currentThread().getName());
      	return "World";
      });
      
      CompletableFuture<String> future=hello.thenCombine(world,(h,w)->h+" "+w);
      System.out.println(future.get());
  • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

  • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

    • CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
      	System.out.println("Hello "+Thread.currentThread().getName());
      	return "Hello";
      });
      
      CompletableFuture<String> world=CompletableFuture.supplyAsync(()->{
      	System.out.println("World "+Thread.currentThread().getName());
      	return "World";
      });
      
      CompletableFuture<Void> future= CompletableFuture.anyOf(hello,world).thenAccept(System.out::println)
      future.get();

예외처리

  • exeptionally(Function)

    • boolean throwError=true;
      
      CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
      	if(throwError){
      		throw new IllegalArgumentException();
           }
      	System.out.println("Hello "+Thread.currentThread().getName());
      	return "Hello";
      }).exceptionally(ex->{
      	System.out.println(ex);
          return "Error!";
      });
              
      System.out.println(hello.get());
  • handle(BIFunction)

    • boolean throwError=false;
      
      CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
      	if(throwError){
      		throw new IllegalArgumentException();
           }
      	System.out.println("Hello "+Thread.currentThread().getName());
      	return "Hello";
      }).handle((result,ex)->{
          if(ex!=null){
             System.out.println(ex);
             return "Error!";
          }
          return result;
      });
              
      System.out.println(hello.get());

참고

https://www.inflearn.com/course/the-java-java8

profile
Always's Archives

0개의 댓글