현재 사이프라는 IT 연합동아리에서 Spring Webflux + Coroutines에 대해 스터디를 하는 미션을 하고 있습니다. 1주차에 공부한 내용을 담아 작성하였습니다.
1주차에는 동시성 프로그래밍에서 중요한 Thread부터, Future, Callable, Executor를 실습해보고 내용을 정의한 다음 발표와 피드백을 주고 받는 시간을 가졌습니다. 다만 저는 Java 대신 Kotlin으로 실습한 점 참고부탁드립니다🙏
스레드는 프로세스 내에서 실행되는 작은 단위.
독립적으로 실행될 수 있는 코드의 실행 흐름이라 볼 수도 있다.
JVM 환경에서는 멀티 스레드 환경으로 하나의 프로세스에서 여러 개의 스레드가 존재할 수 있으며, 각 스레드는 동시에 실행된다.
Java에서 java.lang.Thread 클래스를 사용하여 스레드를 생성하게 되는데, 생성된 스레드는 일반적으로 Runtime 환경에서 Runnable 객체로 전달된다.
각 스레드는 우선순위를 가지고, 높은 우선순위를 갖는 스레드는 낮은 우선순위를 갖는 스레드보다 CPU 자원을 더 많이 할당받을 수 있다고 한다.
이렇게 한 프로세스에서 다중 스레드를 다루는 환경에서 여러 스레드 인스턴스가 공유 자원에 동시에 접근할 수 있으니 동기화를 통해 데이터의 무결성을 유지하고 스레드에 대한 안정성을 보장하도록 설계해야 한다.
이렇게 스레드에 대한 동작을 핸들링하기 위해 제공되는 메서드가 존재한다.
public void start() {
synchronized (this) {
// zero status corresponds to state "NEW".
if (holder.threadStatus != 0)
throw new IllegalThreadStateException();
start0();
}
}
예제 코드 중에 Thread start 메서드를 까보았다.
holder? 이게 대체 뭘까??
// Additional fields for platform threads.
// All fields, except task, are accessed directly by the VM.
private static class FieldHolder {
final ThreadGroup group;
final Runnable task;
final long stackSize;
volatile int priority;
volatile boolean daemon;
volatile int threadStatus;
FieldHolder(ThreadGroup group,
Runnable task,
long stackSize,
int priority,
boolean daemon) {
this.group = group;
this.task = task;
this.stackSize = stackSize;
this.priority = priority;
if (daemon)
this.daemon = true;
}
}
private final FieldHolder holder;
스레드는 New, Runnable, Waiting, Timed Waiting, Terminated 5가지 상태가 있다.
Thread에 대한 FieldHolder 인스턴스가 선언된 것을 볼 수 있다.
(volatile은 휘발성인 키워드를 의미하는 거 같다.)
그렇다면 threadStatus가 0인 경우에만 스레드가 실행되는 것을 확인해볼 수 있다.
실행 가능한 상태가 아니라면 IllegalThreadStateException 예외를 발생시킨다.
private native void start0();
실행되는 start0는 어떤 메서드인가??
이 메서드는 JVM에 의해서 호출된다. (native)
생성된 스레드 객체를 스케줄링이 가능한 상태로 전환하도록 JVM에 지시를 한다.
스케줄링에 의해서 스레드가 선택되면 JVM에 의해 run 메서드가 호출된다.
내부적으로 run을 호출하고, 스레드의 상태 역시 Runnable로 바뀌게 된다.
상태가 Runnable로 바뀌기에 start는 1번만 호출 가능하다.
여기서 native란??
Runnable 인터페이스는 1개의 메서드만을 갖는 함수형 인터페이스이다.
스레드를 구현하기 위한 템플릿에 해당한다.
public class Thread implements Rannable {
...
}
Thread 클래스는 반드시 run 메서드를 구현해야 하며, Thread 클래스가 Runnable를 구현하고 있기 때문이다.
@Test
fun runnable() {
val runnable = Runnable {
println("Thread: ${Thread.currentThread().name} is running")
}
val thread = Thread(runnable)
thread.start()
println("Hello: ${Thread.currentThread().name}")
}
// Hello: Test worker
// Thread: Thread-4 is running
위와 같이 Runnable에 대한 run 메서드를 override 하여 사용할 수 있다.
[ Thread와 Runnable 비교 ]
Runnable은 익명 객체 및 람다로 사용할 수 있지만, Thread는 별도의 클래스를 만들어야 한다는 점에서 번거롭다. 또한 Java에서는 다중 상속이 불가능하므로 Thread 클래스를 상속받으면 다른 클래스를 상속받을 수 없어서 좋지 않다. 또한 Thread 클래스를 상속받으면 Thread 클래스에 구현된 코드들에 의해 더 많은 자원(메모리와 시간 등)을 필요로 하므로 Runnable이 주로 사용된다.
기존의 Runnable 인터페이스는 결과를 반환할 수 없다는 한계점이 있다.
반환값을 얻으려면 공용 메모리나 파이프를 사용해야 하는데, 이러한 작업은 번거롭고
제네릭을 사용해 return 받을 수 있는 Callable
이 추가되었다.
@Test
fun callable_void() {
val executorService = Executors.newSingleThreadExecutor()
val callable = Callable<Void> {
// Thread: pool-1-thread-1 is running
println("Thread: ${Thread.currentThread().name} is running")
null
}
executorService.submit(callable)
executorService.shutdown()
}
@Test
fun callable_String() {
val executorService = Executors.newSingleThreadExecutor()
val callable = Callable { "Thread: " + Thread.currentThread().name }
executorService.submit(callable)
executorService.shutdown()
}
동시 여러 요청을 처리해야 하는 경우 매번 스레드를 생성하는 것은 비효율적이다. 그래서 스레드를 미리 만들어두고 재사용하기 위한 Thread Pool이 등장하게 되는데, Executor 인터페이스는 스레드 풀의 구현을 위한 인터페이스이다. 이러한 Executor 인터페이스를 간단히 정리하면
SOLID 원칙 중 I에 해당되는 인터페이스 분리 원칙에 맞는 등록된 작업에 대한 실행만을 수행하는 책임만 존재하며, 존재하는 Runnable을 실행하는 메서드만 가지고 있다.
public interface Executor {
void execute(Runnable command);
}
Executor 인터페이스는 개발자들이 해당 작업의 실행과 스레드의 사용 및 스케줄링 등등 다양한 작업에 대한 것들을 벗어나게 도와준다.
class ExecutorTest {
@Test
fun executorRun() {
val runnable = Runnable {
// Thread: Test worker
println("Thread: ${Thread.currentThread().name}")
}
val executor: Executor = RunExecutor()
executor.execute(runnable)
}
class RunExecutor : Executor {
override fun execute(command: Runnable) {
command.run()
}
}
@Test
fun executorStart() {
val runnable =
Runnable { println("Thread: " + Thread.currentThread().name) }
val executor: Executor = StartExecutor()
executor.execute(runnable)
}
class StartExecutor : Executor {
override fun execute(command: Runnable) {
Thread(command).start()
}
}
}
코드에서는 메인 스레드에서 실행되므로 override된 execute 메서드를 start 메서드를 통해 실행하면 된다.
ExecutorService는 작업(Runnable, Callable) 등록을 위한 인터페이스이며, ExecutorService는 Executor를 상속받아서 작업에 대한 등록뿐만 아니라 실행을 위한 책임
도 갖고 있다. 그래서 스레드 풀은 기본적으로 ExecutorService 인터페이스를 구현한다. 대표적으로 ThreadPoolExecutor가 ExecutorService의 구현체인데, ThreadPoolExecutor 내부에 있는 Blocking Queue에 작업들을 등록해둔다.
같은 크기의 스레드 풀이 있다고 가정하면, 각각의 스레드는 작업들을 할당받아 처리하는데, 만약 사용 가능한 스레드가 없다면 작업은 Blocking Queue에서 계속 대기하게 된다. 그러다가 스레드가 작업을 끝내면 다음 작업을 할당받게 되는 것이다.
ExecutorService는 Executor의 상태 확인과 작업 종료 등 라이프사이클 관리를 위한 메소드들을 제공하고 있다.
List<Runnable>
)을 반환함ExecutorService는 Runnable과 Callbale을 작업으로 사용하기 위한 메소드를 제공한다. 동시에 여러 작업들을 실행시키는 메소드도 제공하고 있는데, 비동기 작업의 진행을 추적할 수 있도록 Future를 반환한다. 반환된 Future들은 모두 실행된 것이므로 반환된 isDone은 true이다. 하지만 작업들은 정상적으로 종료되었을 수도 있고, 예외에 의해 종료되었을 수도 있으므로 항상 성공한 것은 아니다. 이러한 ExecutorService가 갖는 비동기 작업을 위한 메소드들을 정리하면 다음과 같다.
List<Future>
을 반환함ExecutorService의 구현체로는 AbstractExecutorService가 있는데, ExecutorService의 메소드들(submit, invokeAll, invokeAny)에 대한 기본 구현들을 제공한다.
public abstract class AbstractExecutorService implements ExecutorService {
public AbstractExecutorService() {}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
final long nanos = unit.toNanos(timeout);
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
int j = 0;
timedOut: try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
break timedOut;
execute((Runnable)futures.get(i));
}
for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException | ExecutionException ignore) {}
catch (TimeoutException timedOut) {
break timedOut;
}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
// Timed out before all the tasks could be completed; cancel remaining
cancelAll(futures, j);
return futures;
}
}
invokeAll은 최대 쓰레드 풀의 크기만큼 작업을 동시에 실행시킨다. 그러므로 쓰레드가 충분하다면 동시에 실행되는 작업들 중에서 가장 오래 걸리는 작업만큼 시간이 소요된다. 하지만 만약 쓰레드가 부족하다면 대기되는 작업들이 발생하므로 가장 오래 걸리는 작업의 시간에 더해 추가 시간이 필요하다.
invokeAny는 가장 빨리 끝난 작업 결과만을 구하므로, 동시에 실행한 작업들 중에서 가장 짧게 걸리는 작업만큼 시간이 걸린다. 또한 가장 빠르게 처리된 작업 외의 나머지 작업들은 완료되지 않았으므로 cancel 처리되며, 작업이 진행되는 동안 작업들이 수정되면 결과가 정의되지 않는다.
이런 점에서 진행되는 작업되는 과정에서 트랜잭션에 대한 격리성이 어긋나면 결과가 정의되지 않기에 정합성이 어긋나지 않을까싶다.
Callable 인터페이스의 구현체인 작업(Task)은 가용 가능한 스레드가 없어서 실행이 미뤄질 수 있고, 작업 시간이 오래 걸릴 수도 있다.
그래서 실행 결과를 바로 받지 못하고 미래의 어느 시점에 얻을 수 있는데, 미래에 완료된 Callable의 반환값
을 구하기 위해 사용되는 것이 Future입니다.
즉, Future는 비동기 작업을 갖고 있어 미래에 실행 결과를 얻도록 도와줍니다.
이를 위해 비동기 작업의 현재 상태를 확인하고, 기다리며, 결과를 얻는 방법 등을 제공합니다.
Future의 인터페이스를 살펴보며 다음과 같습니다.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
여기서 get() 은 blocking 방식으로 결과를 가져오며, 타임아웃 설정이 가능합니다