어원
>> Sync와 Async는 대상이 누구인지, 그 대상들의 시간은 어떻게 다루어지는지 두 가지를 살펴봐야한다.
동기는 두 가지 이상의 대상(함수, 애플리케이션 등)이 서로 시간을 맞춰 행동하는 것이다.
예를들어 호출한 함수가 호출된 함수의 작업이 끝나서 결과값을 반화하기를 기다리거나, 지속적으로 호출된 함수에게 확인 요청을하는 경우가 있다.
어떤 대상 A와 B가 있을 때 동기적으로 처리하는 방법 두 가지
시작 시간 또는 종료 시간이 일치
하면 동기이다.
A가 끝나는 시간과 B가 시작하는 시간이 같으면 동기
이다.
synchronized
와 BlockingQueue가 위와 같은 경우이다.비동기는 동기와 반대로 대상이 서로 시간을 맞추지 않는 것을 말한다.
예를 들어 호출하는 함수가 호출되는 함수에게 작업을 맡겨놓고 신경을 쓰지 않는 것을 말한다.
블록킹/논블록킹을 동기/비동기와 같이 생각하는 경우가 많은데, 이는 서로 관점이 다르다.
블록킹/논블록킹은 직접 제어할 수 없는 대상을 처리하는 방법에 따라 나눈다.
직접 제어할 수 없는 대상은 대표적으로 IO, 멀티쓰레드 동기화가 있다.
동기/비동기와 블록킹/논블록킹은 전혀 다른 개념으로 서로 독립적으로 바라봐야한다.
ExecutorService es = Executors.newCachedThreadPool();
String res = es.submit(() -> "Hello Async").get();
위 코드에서 동기/비동기, 블록킹/논블록킹을 분석해보자.
es.submit(() -> "Hello Async")
: 비동기submit()
메서드 리턴 시간과 Callable의 실행 결과를 받는 시간이 일치하지 않는다.get()
: 동기, 블록킹get()
메서드 리턴 시간과 결과를 가져오는 시간이 일치한다. (동기)예를들어 물건을 주문하는 기능에서 물건을 주문하는데 0.5초가 소요되고, 주문 완료 후 메일을 발송하는데 2초가 소요된다고 하자. 동기와 비동기는 이를 처리하는 시간이 서로 다르다.
device = IO.open()
//# 이 thread는 데이터를 읽을 때까지 아무 일도 할 수 없음
data = device.read()
print(data)
read()
메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간이 일치한다.device = IO.open()
ready = False
while not ready:
print("There is no data to read!")
// 다른 작업을 처리할 수 있음
// while 문 내부의 다른 작업을 다 처리하면 데이터가 도착했는지 확인한다.
ready = IO.poll(device, IO.INPUT, 5)
data = device.read()
print(data)
read()
메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간이 일치한다.select()
, epoll()
함수가 있다.ios = IO.IOService()
device = IO.open(ios)
def inputHandler(data, err):
"Input data handler"
if not err:
print(data)
device.readSome(inputHandler)
// 이 thread는 데이터가 도착했는지 신경쓰지 않고 다른 작업을 처리할 수 있다.
ios.loop()
readSome()
메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간이 일치하지 않는다.epoll()
보다 성능이 좋다.)Asynchronous Blocking 조합은 비효율적이라 직접적으로 사용하는 모델은 없다.
하지만 Asynchronous Non-Blocking 모델 중에서 Blocking 으로 동작하는 작업이 있는 경우 의도와 다르게 Asynchronous Blocking으로 동작할 때가 있다고 한다.
대표적인 예로는
Node.js와 MySQL을 함께 사용하는 경우
이다.
Java에서 데이터를 비동기 적으로 처리할 때 어떻게 수행하는지
Java 동시성의 매우 기본적이지만 강력한 구성 요소는 Thread이다.
Java 스레드는 실제로 운영 체제의 스레드와 관련이 있다.
Thread를 생성하는 가장 기본적인 방법 : 확장하고 run 메소드 재정의
public class TestThread extends Thread{
@Override
public void run() {
// Logic
super.run();
}
}
TestThread t = new TestThread();
// starts thread
t.start();// starting the thread, causes the run method be called
스레드를 시작하면 run () 메서드가 호출된다.
대부분의 경우 스레드의 다른 메서드를 재정의하는 건 권장않는 이유
Thread 클래스를 확장하면 Java가 다중 상속을 지원하지 않기 때문에 확장 클래스는 추가
확장 기능을 상실한다.
각 스레드는 확장 할 때 자체 개체를 가지며, 확장 된 개체가
많이 Thread 생성 되면 메모리 상태
에 좋지 않다.
>>Java는 Runnable 인터페이스로 이러한 문제를 해결
실제로 Thread에는 Runnable을 사용하는 오버로드 된 메서드가 있다
**인터페이스
: 클래스와 클래스 사이의 상호 작용의 규격을 나타낸 것
인터페이스에는 몸체가 없는 추상 메소드만 정의된다.
public interface RemoteControl {
// 추상 메소드 정의
public void turnOn(); // 가전 제품을 켠다.
public void turnOff(); // 가전 제품을 끈다.
}
implements로 인터페이스를 구현한다.
public class Television implements RemoteControl {
public void turnOn()
{
// 실제로 TV의 전원을 켜기 위한 코드가 들어 간다.
}
public void turnOff()
{
// 실제로 TV의 전원을 끄기 위한 코드가 들어 간다.
}
}
Runnable은 메소드가 하나 뿐인 인터페이스 : run().
Runnable은 기능적 인터페이스이며 해당 인스턴스는 람다 함수로 생성 가능
// With lambda
Runnable runnable = ()->System.out.println("I'm a runnable from lambda.");
// With implemention, we can hold the data and related stuff that we want to process.
// Otherwise we got to manage them in the launching thread
public class RunnableImplemented implements Runnable{
List<Object> mayBeAListOfData;
Object mayBeAService;
Object mayBeADao;
public RunnableImplemented(List<Object> mayBeAListOfData,
Object mayBeAService, Object mayBeADao) {
super();
this.mayBeAListOfData = mayBeAListOfData;
this.mayBeAService = mayBeAService;
this.mayBeADao = mayBeADao;
}
@Override
public void run() {
// code logic
}
}
Runnable에는 run()메소드 가 있지만 스레드가 제어 (전달) 할 때까지는 스레드가 아니라 Java 클
래스 일뿐이다.
스레드가 시작되면 실행 가능한 개체의 run()메서드가 호출됨
public class TestThread {
private static Runnable runnable = ()->System.out.println("I'm a runnable from lambda.");
public static void main(String[] args) {
Thread t = new Thread(runnable);// takes runnable here
t.start();
}
}
private static Callable<Integer> callable = ()-> {
String data = "I'm in callable.";
System.out.println();
return data.length();
};
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> callFuture = executor.submit(callable);
Integer integer = callFuture.get();
}
여기에서 call 메서드는 데이터를 처리하고 실행 후 수집 할 수 있는 값을 반환
그러나 호출하는 데 큰 차이가 있을까?
>> Java는 ExecutorService 인터페이스로 해결하자.
#Runnable과 Callable의 차이점
[Future]
Future는 특정 작업에 대해 결과값을 받아올 수 있는 인터페이스이다.
예를 들어, 특정 비동기 작업을 실행시키는 메소드는 Future형 반환값을 내뱉고 이 Future형 반환값을 통해 나중에 작업이 완료됐을 때 .get()메소드를 통해 작업의 결과를 받아오거나 할 수 있다.
[Executor 인터페이스]
Executor 인터페이스란 특정 작업을 실행하는 execute 메소드를 갖는 Single Abstract Method 인터페이스이다.
Executors 메소드
ExecutorService 메소드
public class ExecutorServiceTest {
public static void main(String args[]) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Job1 " + threadName);
});
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Job2 " + threadName);
});
// 더이상 ExecutorService에 Task를 추가할 수 없습니다.
// 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
executor.shutdown();
// shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있습니다.
// Timeout을 20초 설정하고 완료되기를 기다립니다.
// 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴합니다.
if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
System.out.println(LocalTime.now() + " All jobs are terminated");
} else {
System.out.println(LocalTime.now() + " some jobs are not terminated");
// 모든 Task를 강제 종료합니다.
executor.shutdownNow();
}
System.out.println("end");
}
}
public class ExecutorServiceTest3 {
public static void main(String args[]) {
//Runtime.getRuntime().availableProcessors()는 현재 사용가능한 core 개수를 리턴
final int maxCore = Runtime.getRuntime().availableProcessors();
final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
final List<Future<String>> futures = new ArrayList<>();
for (int i = 1; i < 5; i++) {
final int index = i;
futures.add(executor.submit(() -> { //executor.submit으로 List에 future 객체 add
System.out.println("finished job" + index);
return "job" + index + " " + Thread.currentThread().getName();
}));
}
for (Future<String> future : futures) {
String result = null;
try {
result = future.get(); //future.get()으로 작업 종료까지 기다림 > 출력
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(result);
}
executor.shutdownNow();
System.out.println("end");
}
}
결과 : 작업은 순서대로 처리되지 않을 수 있지만, 로그는 순차적으로 출력
finished job1
finished job3
finished job2
job1 pool-1-thread-1
job2 pool-1-thread-2
job3 pool-1-thread-3
finished job4
job4 pool-1-thread-4
end
위의 Future의 단점:
BlockingQueue로 단점 해결
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ExecutorServiceTest4 {
public static void main(String args[]) {
ParallelExcutorService service = new ParallelExcutorService();
service.submit("job1");
service.submit("job2");
service.submit("job3");
service.submit("job4");
for (int i = 0 ; i < 4; i++) {
String result = service.take();
System.out.println(result);
}
System.out.println("end");
service.close();
}
private static class ParallelExcutorService {
private final int maxCore = Runtime.getRuntime().availableProcessors();
private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
public ParallelExcutorService() {
}
public void submit(String job) {
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished " + job);
String result = job + ", " + threadName;
try {
queue.put(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
public void close() {
List<Runnable> unfinishedTasks = executor.shutdownNow();
if (!unfinishedTasks.isEmpty()) {
System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
}
}
}
}
결과 : 처리한 순서대로 메인쓰레드에서 로그를 출력함
finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end
참조 : https://dzone.com/articles/async-programming-in-java-part-i
https://velog.io/@codemcd/Sync-VS-Async-Blocking-VS-Non-Blocking-sak6d01fhx