자바에서 비동기 프로그래밍을 가능하게 하는 인터페이스
Concurrent 소프트웨어
자바에서 지원하는 Concurrent 프로그래밍
자바 멀티쓰레드 프로그래밍
Thread 상속
public static void main(String[] args){
HelloThread helloThread= new HelloThread();
helloThread.start(); //쓰레드 생성
System.out.println("hello : "+Thread.currentThread().getName);
}
static class HelloThread extends Thread{
@Override
public void run(){
System.out.println("world : "+Thread.currentThread().getName);
}
}
출력
world : Thread-0
hello : main
Runnable 구현 또는 람다
Thread thread= new Thread(new Runnable(){
@Override
public void run(){
System.out.println("world : "+Thread.currentThread().getName());
}
});
thread.start();
System.out.println("hello : "+Thread.currentThread().getName());
//Runnable구현
-----------------------------------------------------------------------------------------------------------
Thread thread= new Thread(()->System.out.println("world : "+Thread.currentThread().getName()));
thread.start();
System.out.println("hello : "+Thread.currentThread().getName());
//람다 구현
쓰레드 주요 기능
현재 쓰레드 멈춰두기(sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만, 그렇다고 락을 놔주진 않음(잘못하면 데드락 걸릴 수도 있음)
다른 쓰레드 깨우기(interrupt): 다른 쓰레드를 깨워서 interruptedException을 발생시킴. 해당 에러가 발생했을 때, 할 일은 코딩하기 나름(종료 시킬수도, 계속 하던 일을 할수도)
Thread thread=new Thread(() -> {
while(true){
System.out.println("Thread : "+Thread.currentThread().getName());
try{
Thread.sleep(1000L);
}catch(InterruptedException e){//interrupt가 발생하면 쓰레드 중지
System.out.println("interrupted!");
return;
}
}
});
thread.start();
System.out.println("Hello : "+Thread.currentThread().getName());
Thread.sleep(3000L);
thread.interrupt(); //interrupt 발생
/*
Thread : Thread-0
Hello : main
Thread : Thread-0
Thread : Thread-0
interrupted! 출력!
*/
다른 쓰레드 기다리기(join): 다른 쓰레드가 끝날 때까지 기다림
쓰레드가 많아질경우 프로그래머가 직접 코딩으로 이를 관리하는 것은 어려운 일이다. Executors는 프로그래머 대신 쓰레드들을 관리해준다.
고수준(High-Level) Concurrency 프로그래밍
Executors가 하는 일
주요 인터페이스
Executor: execute(Runnable)
ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능 제공
ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스를 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있음
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService= Executors.newSingleThreadScheduledExecutor();
executorService.schedule(getRunnable("Hello"),3, TimeUnit.SECONDS); //3초후 메세지 찍힘
executorService.scheduleAtFixedRate(getRunnable("Hello"),1,2,TimeUnit.SECONDS);
//1초 대기후 2초에 한번씩 출력
}
private static Runnable getRunnable(String msg){
return ()-> System.out.println(msg+Thread.currentThread().getName());
}
ExecutorService로 작업 실행하기
ExecutorService executorService= Executors.newSingleThreadExecutor(); //쓰레드 하나 쓰는 Executor
executorService.submit(()->{
System.out.println("Hello : "+Thread.currentThread().getName());
}); //쓰레드 실행
executorService.shutdown(); //Executor는 쓰레드 실행후 대기 상태이기 때문에 명시적으로 종료를 시켜줘야 함
ExecutorService로 멈추기
executorService.shutdown(); //처리중인 작업을 기다렸다가 종료
executorService.shutdownNow(); //즉시 종료
쓰레드의 개수보다 처리하는 작업의 수가 많을때
Fork/Join 프레임워크
Collable
Future
결과 가져오기 get()
ExecutorService executorService= Executors.newSingleThreadException();
Future<String> helloFuture= executorService.submit(()->{
Thread.sleep(2000L);
return "Collable";
});
System.out.println("Hello");
String result= helloFuture.get(); //블록킹 콜(결과값을 가져올때까지 기다림)
System.out.println(result);
executorService.shutdown();
작업 상태 확인하기 isDone()
완료 했으면 true 아니면 false 리턴
ExecutorService executorService=Executors.newSingleThreadExecutor();
Callable<String> hello=()->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture=executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started");
helloFuture.get();
System.out.println(helloFuture.isDone());
System.out.println("End!");
executorService.shutdown();
/*
false
Started
true
End! 출력!
*/
작업 취소하기 cancel()
취소 했으면 true 못했으면 false 리턴
parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다림
ExecutorService executorService=Executors.newSingleThreadExecutor();
Callable<String> hello=()->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture=executorService.submit(hello); helloFuture.cancel(false); //helloFuture 작업 종료
System.out.println(helloFuture.isDone());
System.out.println("Started");
System.out.println(helloFuture.isDone());
System.out.println("End!");
executorService.shutdown();
/*
true
Started
true
End! 출력!
*/
여러 작업 동시에 실행하기 invokeAll()
동시에 실행한 작업 중에 제일 오래 걸리는 작업만큼 시간이 걸림
ExecutorService executorService=Executors.newSingleThreadExecutor();
Callable<String> hello=()->{
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java=()->{
Thread.sleep(3000L);
return "Java";
};
Callable<String> always=()->{
Thread.sleep(1000L);
return "Always"
};
List<Future<String>> futures=executorService.invokeAll(Arrays.asList(hello,java,always));
//hello가 2호 java가 3초 always가 1초이다. invokeAll은 시간이 제일 긴 java를 기다리고 모든 결과값을 받아온다
for(Future<String> f:futures){
System.out.println(f.get());
}
executorService.shutdown();
여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
ForkJoinPool을 사용하기에 따로 Thread Pool을 만들지 않아도 됨
Future로는 하기 어렵던 작업들
Completable(외부에서 명시적으로 Complete 시킬 수 있음) Future
비동기로 작업 실행하기
리턴값이 없는 경우: runAsync()
CompletableFuture<void> future= CompletableFuture.runAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
});
future.get();
리턴값이 있는 경우: supplyAsync()
CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
});
System.out.println(future.get());
원하는 Executor(쓰레드 풀)를 사용해서 실행할 수도 있음(기본은 ForkJoinPool.commonPool())
콜백 제공하기
thenApply(Fucntion): 리턴값을 받아 다른 값으로 바꾸는 콜백
CompletableFuture<void> future= CompletableFuture.runAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).thenApply((s)->{
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(future.get());
thenAccept(Consumer): 리턴값을 받아 또 다른 작업을 처리하는 콜백(리턴 없이)
CompletableFuture<void> future= CompletableFuture.supplyAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).thenAccept((s)->{
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
future.get();
thenRun(Runnable): 리턴값을 받지 않고 다른 작업을 처리하는 콜백
CompletableFuture<void> future= CompletableFuture.supplyAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).thenRun(()->{
System.out.println(Thread.currentThread().getName());
});
future.get();
콜백 자체를 또 다른 쓰레드에서 실행할 수 있음
조합하기
thenCompose(): 두 작업이 서로 이어서 실행하도록 조합
CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
System.out.println("Hello"+Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future=hello.thenCompose(App::getWorld);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorld(String msg){
return CompletableFuture.supplyAsync(()->{
System.out.println(msg+Thread.currentThread().getName());
return msg+" World";
});
}
thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world=CompletableFuture.supplyAsync(()->{
System.out.println("World "+Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> future=hello.thenCombine(world,(h,w)->h+" "+w);
System.out.println(future.get());
allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world=CompletableFuture.supplyAsync(()->{
System.out.println("World "+Thread.currentThread().getName());
return "World";
});
CompletableFuture<Void> future= CompletableFuture.anyOf(hello,world).thenAccept(System.out::println)
future.get();
예외처리
exeptionally(Function)
boolean throwError=true;
CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).exceptionally(ex->{
System.out.println(ex);
return "Error!";
});
System.out.println(hello.get());
handle(BIFunction)
boolean throwError=false;
CompletableFuture<String> hello= CompletableFuture.supplyAsync(()->{
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).handle((result,ex)->{
if(ex!=null){
System.out.println(ex);
return "Error!";
}
return result;
});
System.out.println(hello.get());