[자바] 자바의 비동기 프로그래밍

been·2021년 2월 2일
0

TIL2

목록 보기
8/14
post-thumbnail

##Sync VS Async

어원

  • SynchronousSyntogether이란 뜻이고, chronotime이다. 따라서 Synchronous는 함께 시간을 맞춘다라는 뜻으로 해석된다.
  • Asynchronous는 앞에 A라는 접두사가 붙어 부정하는 형태가 되어 시간을 맞추지 않는 것이라 해석할 수 있다.

>> Sync와 Async는 대상이 누구인지, 그 대상들의 시간은 어떻게 다루어지는지 두 가지를 살펴봐야한다.

1. Synchronous (동기)

  • 동기는 두 가지 이상의 대상(함수, 애플리케이션 등)이 서로 시간을 맞춰 행동하는 것이다.

  • 예를들어 호출한 함수가 호출된 함수의 작업이 끝나서 결과값을 반화하기를 기다리거나, 지속적으로 호출된 함수에게 확인 요청을하는 경우가 있다.

어떤 대상 A와 B가 있을 때 동기적으로 처리하는 방법 두 가지

  1. A와 B가

시작 시간 또는 종료 시간이 일치

하면 동기이다.

  • A, B 쓰레드가 동시에 작업을 시작하는 경우 (예를 들면 자바에서 CyclicBarrier)
  • 메서드 리턴 시간(A)결과를 전달받는 시간(B)일치하는 경우
  1. A가 끝나는 시간과 B가 시작하는 시간이 같으면 동기

    이다.

  • 예를 들어 자바에서 synchronized와 BlockingQueue가 위와 같은 경우이다.

2. Asynchronous (비동기)

  • 비동기는 동기와 반대로 대상이 서로 시간을 맞추지 않는 것을 말한다.

  • 예를 들어 호출하는 함수가 호출되는 함수에게 작업을 맡겨놓고 신경을 쓰지 않는 것을 말한다.

##Blocking VS Non-Blocking

  • 블록킹/논블록킹을 동기/비동기와 같이 생각하는 경우가 많은데, 이는 서로 관점이 다르다.

  • 블록킹/논블록킹은 직접 제어할 수 없는 대상을 처리하는 방법에 따라 나눈다.

  • 직접 제어할 수 없는 대상은 대표적으로 IO, 멀티쓰레드 동기화가 있다.

1. Blocking

  • Blocking은 직접 제어할 수 없는 대상의 작업이 끝날 때까지 제어권을 넘겨주지 않는 것이다.
  • 예를 들어 호출하는 함수가 IO를 요청했을 때 IO처리가 완료될 때까지 아무 일도 하지 못한 채 기다리는 것을 말한다.

2. Non-Blocking

  • Non-Blocking은 Blocking과 반대되는 개념이다.
  • 직접 제어할 수 없는 대상의 작업 처리 여부와 상관이 없다.
  • 예를 들어 호출하는 함수가 IO를 요청한 후 IO처리 완료 여부와 상관없이 바로 자신의 작업을 할 수 있다.

##Sync/Async, Blocking/Non-Blocking 예제

동기/비동기와 블록킹/논블록킹은 전혀 다른 개념으로 서로 독립적으로 바라봐야한다.

ExecutorService es = Executors.newCachedThreadPool();
 
String res = es.submit(() -> "Hello Async").get();

위 코드에서 동기/비동기, 블록킹/논블록킹을 분석해보자.

  • es.submit(() -> "Hello Async") : 비동기
    • submit() 메서드 리턴 시간과 Callable의 실행 결과를 받는 시간이 일치하지 않는다.
    • 블록킹/논블록킹을 고려할 대상이 아니다.
  • get() : 동기, 블록킹
    • get() 메서드 리턴 시간과 결과를 가져오는 시간이 일치한다. (동기)
    • 다른 쓰레드의 작업이 완료될 때까지 대기한다. (블록킹)

1. 애플리케이션 관점에서 Sync/Async

예를들어 물건을 주문하는 기능에서 물건을 주문하는데 0.5초가 소요되고, 주문 완료 후 메일을 발송하는데 2초가 소요된다고 하자. 동기와 비동기는 이를 처리하는 시간이 서로 다르다.

  • Sync 응답 속도 = 2.5초
  • Async 응답 속도(메일 발송을 Async로 처리) = 0.5초 + 알파

2. I/O 관점에서 Sync/Async, Blocking/Non-Blocking

1) Synchronous Blocking I/O

device = IO.open()
//# 이 thread는 데이터를 읽을 때까지 아무 일도 할 수 없음
data = device.read()
print(data)
  • Synchronous: read() 메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간이 일치한다.
  • Blocking: 커널의 작업이 완료될 때까지 대기한다.

2) Synchronous Non-Blocking I/O

device = IO.open()
ready = False
while not ready:
    print("There is no data to read!")
 
    // 다른 작업을 처리할 수 있음
 
    // while 문 내부의 다른 작업을 다 처리하면 데이터가 도착했는지 확인한다.
    ready = IO.poll(device, IO.INPUT, 5)
data = device.read()
print(data)
  • Synchronous: read() 메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간일치한다.
  • Non-Blocking: 애플리케이션으로부터 요청을 받은 커널은 작업 완료 여부와 상관없이 바로 반환하여 제어권을 애플리케이션에게 넘겨준다. 커널의 작업이 완료되면 작업 결과를 애플리케이션에게 반환한다.
  • 대표적인 예로는 멀티플랙싱을 수행하는 select(), epoll() 함수가 있다.

3) Asynchronous Non-Blocking I/O (AIO)

ios = IO.IOService()
device = IO.open(ios)
 
def inputHandler(data, err):
    "Input data handler"
    if not err:
        print(data)
 
device.readSome(inputHandler)
// 이 thread는 데이터가 도착했는지 신경쓰지 않고 다른 작업을 처리할 수 있다.
ios.loop()
  • Asynchronous: readSome() 메서드(애플리케이션)가 리턴하는 시간과 커널에서 결과를 가져오는 시간이 일치하지 않는다.
  • Non-Blocking: 애플리케이션으로부터 요청을 받은 커널은 작업 완료 여부와 상관없이 바로 반환하여 제어권을 애플리케이션에게 넘겨준다. 작업이 끝나면 애플리케이션에게 시그널 또는 콜백을 보낸다.
  • 대표적인 예로는 윈도우에서 멀티플랙싱을 수행하는 IOCP가 있다.(epoll()보다 성능이 좋다.)

4) Asynchronous Blocking

  • Asynchronous Blocking 조합은 비효율적이라 직접적으로 사용하는 모델은 없다.

  • 하지만 Asynchronous Non-Blocking 모델 중에서 Blocking 으로 동작하는 작업이 있는 경우 의도와 다르게 Asynchronous Blocking으로 동작할 때가 있다고 한다.

  • 대표적인 예로는

Node.js와 MySQL을 함께 사용하는 경우

이다.

  • Node.js는 비동기로 작업하려 하지만 MySQL 드라이버가 Blocking 방식으로 동작하므로 어쩔 수 없이 Asynchronous Blocking 방식으로 동작하게 된다.

##정리

  • Synchronous VS Asynchronous
    • 두 가지 이상의 대상(메서드, 작업, 처리 등)과 이를 처리하는 시간으로 구분한다.
    • Synchronous: 호출된 함수의 리턴하는 시간결과를 반환하는 시간일치하는 경우
    • Asynchronous: 호출된 함수의 리턴하는 시간과 결과를 반환하는 시간이 일치하지 않는 경우
  • Blocking VS Non-Blocking
    • 호출되는 대상이 직접 제어할 수 없는 경우 이를 구분할 수 있다.
    • Blocking: 직접 제어할 수 없는 대상의 작업이 끝날 때까지 기다려야 하는 경우
    • Non-Blocking: 직접 제어할 수 없는 대상의 작업이 완료되기 전에 제어권을 넘겨주는 경우

##자바의 비동기 프로그래밍

Java에서 데이터를 비동기 적으로 처리할 때 어떻게 수행하는지

  • Thread, Runnable, Callable , Future (및 확장 된 ScheduledFuture ),
    CompletableFuture 및 물론 ExecutorService 및 ForkJoinPool에서 시작한다.

##Thread

Java 동시성의 매우 기본적이지만 강력한 구성 요소는 Thread이다.

Java 스레드는 실제로 운영 체제의 스레드와 관련이 있다.

Thread를 생성하는 가장 기본적인 방법 : 확장하고 run 메소드 재정의

public class TestThread extends Thread{
    @Override
    public void run() {
        // Logic
        super.run();
    }
}
TestThread t = new TestThread();
// starts thread
t.start();// starting the thread, causes the run method be called

스레드를 시작하면 run () 메서드가 호출된다.

대부분의 경우 스레드의 다른 메서드를 재정의하는 건 권장않는 이유

  • Thread 클래스를 확장하면 Java가 다중 상속을 지원하지 않기 때문에 확장 클래스는 추가
    확장 기능을 상실한다.

  • 각 스레드는 확장 할 때 자체 개체를 가지며, 확장 된 개체가

많이 Thread 생성 되면 메모리 상태

에 좋지 않다.

  • 상속받으면 Thread에 있는 모든 클래스를 사용할 수있게 되므로 자원이 많이듬

>>Java는 Runnable 인터페이스로 이러한 문제를 해결

실제로 Thread에는 Runnable을 사용하는 오버로드 된 메서드가 있다

**인터페이스
: 클래스와 클래스 사이의 상호 작용의 규격을 나타낸 것

  • 인터페이스에는 몸체가 없는 추상 메소드만 정의된다.

    public interface RemoteControl {
    // 추상 메소드 정의
        public void turnOn(); // 가전 제품을 켠다.
        public void turnOff(); // 가전 제품을 끈다.
    }
  • implements로 인터페이스를 구현한다.

    public class Television implements RemoteControl {
        public void turnOn()
        {
            // 실제로 TV의 전원을 켜기 위한 코드가 들어 간다.
        }
        public void turnOff()
        {
            // 실제로 TV의 전원을 끄기 위한 코드가 들어 간다.
        }
    }

##Runnable

Runnable은 메소드가 하나 뿐인 인터페이스 : run().

Runnable은 기능적 인터페이스이며 해당 인스턴스는 람다 함수로 생성 가능

// With lambda
    Runnable runnable = ()->System.out.println("I'm a runnable from lambda.");
// With implemention, we can hold the data and related stuff that we want to process.
// Otherwise we got to manage them in the launching thread
public class RunnableImplemented implements Runnable{
    List<Object> mayBeAListOfData;
    Object mayBeAService;
    Object mayBeADao;
 
    public RunnableImplemented(List<Object> mayBeAListOfData,
                                Object mayBeAService, Object mayBeADao) {
        super();
        this.mayBeAListOfData = mayBeAListOfData;
        this.mayBeAService = mayBeAService;
        this.mayBeADao = mayBeADao;
    }
    @Override
    public void run() {
        // code logic
    }
}

Runnable에는 run()메소드 가 있지만 스레드가 제어 (전달) 할 때까지는 스레드가 아니라 Java 클
래스 일뿐이다.

스레드가 시작되면 실행 가능한 개체의 run()메서드가 호출

public class TestThread {
    private static Runnable runnable = ()->System.out.println("I'm a runnable from lambda.");
     
    public static void main(String[] args) {
        Thread t = new Thread(runnable);// takes runnable here
        t.start();
    }
}

##Callable

  • Callable은 일반 인터페이스이다.
  • 제네릭 유형으로 반환 값의 유형이다.
  • Callable은 너무 기능적인 인터페이스이며 call() 예외를 발생시키고, 제네릭 유형 값을 반환하는 인수가없는 유일한 메서드이다.
private static Callable<Integer> callable = ()-> {
    String data = "I'm in callable.";
    System.out.println();
    return data.length();
};
public static void main(String[] args) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> callFuture = executor.submit(callable);
    Integer integer = callFuture.get();
}

여기에서 call 메서드는 데이터를 처리하고 실행 후 수집 할 수 있는 값을 반환

그러나 호출하는 데 큰 차이가 있을까?

  • ExecutorService 를 사용하여 호출하고, Future를 사용하여 결과를 보유한다.
  • 그 이유는?
    • 스레드 (실행 가능 또는 호출 가능)를 만들고 실행하는 제어 된 동작이 없다.
    • 각 스레드가 OS의 스레드와 관련되어 있으므로 한 번에 실행되는 스레드 수를 제어 할 수 있다.
    • 우리가 실행하는 스레드 수는 사용 가능한 CPU 코어 수보다 적어야한다.

>> Java는 ExecutorService 인터페이스로 해결하자.

#Runnable과 Callable의 차이점

  • Runnable: 어떤 객체도 리턴하지 않습니다. Exception을 발생시키지 않습니다.
  • Callable: 특정 타입의 객체를 리턴합니다. Exception을 발생킬 수 있습니다.

[Future]

Future는 특정 작업에 대해 결과값을 받아올 수 있는 인터페이스이다.

예를 들어, 특정 비동기 작업을 실행시키는 메소드는 Future형 반환값을 내뱉고 이 Future형 반환값을 통해 나중에 작업이 완료됐을 때 .get()메소드를 통해 작업의 결과를 받아오거나 할 수 있다.

[Executor 인터페이스]

Executor 인터페이스란 특정 작업을 실행하는 execute 메소드를 갖는 Single Abstract Method 인터페이스이다.

##ExecutorService

  • 쓰레드풀을 생성하여 병렬처리를 할 수 있음

Executors 메소드

  • java.util.concurrent.Executor;
  • 쓰레드 풀을 개수 및 종류를 정하는 메소드 :
    • newFixedThreadPool(int) : 인자 개수만큼 고정된 쓰레드풀을 만듭니다.
    • newCachedThreadPool() : 필요할 때, 필요한 만큼 쓰레드풀을 생성합니다. 이미 생성된 쓰레드를 재활용할 수 있기 때문에 성능상의 이점이 있을 수 있습니다.
    • newScheduledThreadPool(int) : 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 작업이 있다면 ScheduledThreadPool을 고려해볼 수 있습니다.
    • newSingleThreadExecutor() : 쓰레드 1개인 ExecutorService를 리턴합니다. 싱글 쓰레드에서 동작해야 하는 작업을 처리할 때 사용합니다.

ExecutorService 메소드

  • java.util.concurrent.ExecutorService;
  • submit() :
    • 작업처리
    • submit(() → { })은 멀티쓰레드로 처리할 작업을 예약인자로 람다식 전달 가능
  • shutdown()
    • 더 이상 쓰레드풀에 작업을 추가하지 못하도록 함
    • 처리 중인 Task가 모두 완료되면 쓰레드풀을 종료시킴
  • awaitTermination()
    • 이미 수행 중인 Task가 지정된 시간동안 끝나기를 기다림
    • 지정된 시간 내에 끝나지 않으면, false를 리턴
    • 이 때 shutdownNow()를 호출하면 실행 중인 Task를 모두 강제로 종료시킬 수 있음
public class ExecutorServiceTest {
    public static void main(String args[]) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        // 더이상 ExecutorService에 Task를 추가할 수 없습니다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        executor.shutdown();
        // shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있습니다.
        // Timeout을 20초 설정하고 완료되기를 기다립니다.
        // 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴합니다.
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");
            // 모든 Task를 강제 종료합니다.
            executor.shutdownNow();
        }
        System.out.println("end");
    }
}

##Future

  • 예약된 작업에 대한 결과를 알 수 있다.
  • executor.submit() : Future객체를 리턴
  • future.get() : 작업이 종료될 때 까지 기다림
public class ExecutorServiceTest3 {
    public static void main(String args[]) {
        //Runtime.getRuntime().availableProcessors()는 현재 사용가능한 core 개수를 리턴
        final int maxCore = Runtime.getRuntime().availableProcessors();
        final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        final List<Future<String>> futures = new ArrayList<>();
        for (int i = 1; i < 5; i++) {
            final int index = i;
            futures.add(executor.submit(() -> { //executor.submit으로 List에 future 객체 add
                System.out.println("finished job" + index);
                return "job" + index + " " + Thread.currentThread().getName();
            }));
        }
        for (Future<String> future : futures) {
            String result = null;
            try {
                result = future.get(); //future.get()으로 작업 종료까지 기다림 > 출력
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(result);
        }
        executor.shutdownNow();
        System.out.println("end");
    }
}

결과 : 작업은 순서대로 처리되지 않을 수 있지만, 로그는 순차적으로 출력

finished job1
finished job3
finished job2
job1 pool-1-thread-1
job2 pool-1-thread-2
job3 pool-1-thread-3
finished job4
job4 pool-1-thread-4
end

##BlockingQueue

  • java.util.concurrent.BlockingQueue;
  • 그럼 무엇을 막을까?
    • Queue가 꽉찼을때의 삽입 시도 / Queue가 비어있을때의 추출 시도를 막는다.
    • 이 자동으로 '막는' 기능이 있어 BlockingQueue 의 구현체는 모두 Thread-safe 하다.

위의 Future의 단점:

  • 작업이 늦게 처리된다면 다른 작업에 대한 로그도 늦게 출력이 됨

BlockingQueue로 단점 해결

  • 작업이 끝날 때 BlockingQueue에 결과를 추가하고 메인쓰레드에서 Queue를 기다리면 됨
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
public class ExecutorServiceTest4 {
    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");
        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }
        System.out.println("end");
        service.close();
    }
    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
        public ParallelExcutorService() {
        }
        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}

결과 : 처리한 순서대로 메인쓰레드에서 로그를 출력함

finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end

참조 : https://dzone.com/articles/async-programming-in-java-part-i

https://velog.io/@codemcd/Sync-VS-Async-Blocking-VS-Non-Blocking-sak6d01fhx

https://codechacha.com/ko/java-executors/

0개의 댓글