Dispatcher

정승훈·2024년 9월 17일
0

Coroutine

목록 보기
2/2
post-thumbnail

본론에 들어가기 앞서 한 가지 질문을 하고자 한다.

suspend fun defaultDispatcher() = coroutineScope {
    repeat(64) {
        launch(Dispatchers.Default) {
            Thread.sleep(1000)
        }
    }
}

suspend fun ioDispatcher() = coroutineScope {
    repeat(64) {
        launch(Dispatchers.IO) {
            Thread.sleep(1000)
        }
    }
}

두 함수 중 어떤 함수가 먼저 종료될까??

  • ❗️두 함수에서 다른점은 launch()에 전달되는 Dispatcher뿐이다.

두 함수의 차이점을 설명하지 못하겠다면 이 글을 읽어보자. 정답은 마지막에 공개하겠다.
이미 Dispatcher에 대해 충분히 알고 있어도 다시 읽어보자. 반복 학습은 언제나 최고다.

Dispatcher란?

Dispatcher는 사전적으로 운행 관리원 이라는 뜻을 가지고 있다. 운행관리원? 코틀린에서 운행 관리원은 무엇을 뜻할까?

공식문서에는 코루틴이 실행될 스레드스레드 풀을 결정한다고 나와있다.

스레드 풀이란?

스레드를 효율적으로 사용할 수 있도록 관리하는 방법이다.

  • 스레드 풀은 일정 갯수의 스레드를 가지고 있다.
  • 매 작업마다 스레드를 생성/해제하지 않고 작업이 완료된 대기 상태의 스레드를 재활용한다.
  • 각 Dispatcher는 각기 다른 스레드 풀을 사용하는데, 스레드풀마다 가지고 있는 스레드 갯수가 다르다.

1. Dispatchers.Default

가장 기본적으로 사용되는 디스패처이다.

  • 코드를 실행하는 컴퓨터의 CPU 코어 갯수만큼의 스레드가 있는 스레드 풀을 사용한다.
    ex) 4코어 8스레드 CPU -> 스레드 4개
  • CPU 집약적인 작업 (정렬, 연산) 등에 적합하다.
suspend fun defaultDispatcher() = coroutineScope {
    val count = AtomicInteger(0)
    repeat(64) {
        launch {
            count.incrementAndGet()
            println("${Thread.currentThread().name} $count")
            Thread.sleep(1000)
        }
    }
}

실행 컴퓨터 기준 코어 수는 8개이다. (Apple M1)
따라서 코루틴은 한 번에 최대 8개까지 실행될 수 있다.
64 / 8 == 8, 코루틴 하나당 스레드를 1초 정지시키기 때문에, 총 8초가 걸렸다

suspend fun main(): Unit = coroutineScope {
    measureTime{ defaultDispatcher() }.also(::println)
}

참고로 코드는 이렇게 작성된 main()에서 실행된다.

2. Dispatchers.IO

데이터베이스 접근, 서버 요청 등 입출력 작업에 적합한 디스패처이다.

  • 기본적으로 64개의 스레드를 가지고 있는 스레드풀을 사용한다.
  • 만약 컴퓨터의 CPU 코어 갯수가 이보다 많다면, 코어 갯수만큼의 스레드를 사용한다.
suspend fun iODispatcher() = coroutineScope {
    val count = AtomicInteger(0)
    repeat(64) {
        launch(Dispatchers.IO) {
            count.incrementAndGet()
            println("${Thread.currentThread().name} $count")
            Thread.sleep(1000)
        }
    }
}

Default 디스패처와 다르게 1초만에 끝났다.
스레드 64개로 64개의 코루틴을 한 번에 실행할 수 있기 때문이다.

만약 65개의 코루틴을 실행한다면, 남은 하나의 코루틴을 실행할 스레드가 부족하게 된다.
따라서 스레드의 작업이 끝날 때까지 기다려야 하기 때문에, 1초가 더 걸린다.

  • 이는 Dispatchers.Default일 때도 마찬가지이다.

3. Dispatchers.Main

메인스레드에서 동작하는 디스패처이다.

  • 안드로이드에서 사용하는 경우 UI 스레드인 메인 스레드에서 동작한다.
  • 별도의 의존성이 없는 경우 실행되지 않는다.
suspend fun mainDispatcher() = coroutineScope {
    val count = AtomicInteger(0)
    repeat(64) {
        launch(Dispatchers.Main) {
            count.incrementAndGet()
            println("${Thread.currentThread().name} $count")
            Thread.sleep(1000)
        }
    }
}

이 코드도 실행해 보려고 했지만, 다음과 같은 오류가 발생하며 실행되지 않았다.

Exception in thread "main" java.lang.IllegalStateException: Module with the Main dispatcher had failed to initialize. For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used
	at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.missing(MainDispatchers.kt:115)
	at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.isDispatchNeeded(MainDispatchers.kt:96)
	at kotlinx.coroutines.test.internal.TestMainDispatcher.isDispatchNeeded(TestMainDispatcher.kt:27)
	at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:317)

kotlin-coroutines-test 의존성이 필요하고, Dispatchers.setMain()으로 Main 디스패처를 정의해주어야 한다고 한다.

4. Dispatchers.Unconfined

실행하는 스레드의 스레드 풀을 이용하는 디스패처이다!

suspend fun main(): Unit = withContext(CoroutineName("Thread1")) {
    launch(Dispatchers.Unconfined) {
        println(Thread.currentThread().name + " " + "1")

        delay(1000)

        println(Thread.currentThread().name + " " + "3")
    }
}
  • delay() 호출 전까지는 메인스레드에서 수행되었다.
  • delay() 호출 후 코루틴이 Default 디스패처에서 수행되며 스레드가 바뀌었다.

스레드 개수 제한하기

코루틴을 실행하는 스레드 갯수를 제한할 수 있다.

val dispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher()
println(dispatcher)

자바의 Executors를 이용하여 스레드 갯수 제한을 건 스레드 풀을 생성하는 방식이다.
그래서 dispatcher를 출력해보면 CoroutineDispatcher가 아닌

> java.util.concurrent.ThreadPoolExecutor@2530c12[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

스레드 풀이 출력되는걸 볼 수 있다.

suspend fun main(): Unit = coroutineScope {
    val dispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher()
    launch(dispatcher) {
        repeat(64) {
            launch {
                Thread.sleep(1000L)
                println(Thread.currentThread().name)
            }
        }
        println(dispatcher)
    }
}

따라서 위 코드는 64개의 코루틴이 총 8번에 걸쳐 실행되기 때문에 약 8초가 걸린다.

스레드 풀 안에 active threads = 8로 활성화된 스레드 갯수를 볼 수 있다.
실행중인 스레드도 전에 봤던 Dispatcher-worker-n이 아닌 pool-1을 통해 스레드 풀에서 관리하는 스레드를 이용하는 것을 볼 수 있다.

저 방법은 코루틴에 함수가 나오기 전에 사용하던 방법이다.
우리는 더 똑똑하게

val dispatcher = Dispatchers.Default.limitedParallelism(4)

limitedParallelism()을 사용하여 디스패처에 스레드 제한을 추가할 수 있다.

suspend fun main(): Unit = coroutineScope {
    val dispatcher = Dispatchers.Default.limitedParallelism(4)
    launch(dispatcher) {
        repeat(64) {
            launch {
                Thread.sleep(1000L)
                println(Thread.currentThread().name)
            }
        }
        println(dispatcher)
    }
}

Dispatchers.Default에 제한을 추가했는데, 64 / 4 == 총 16초가 걸렸다.

그리고 Dispatchers.Default 대신 Dispatchers.Default에 스레드 제한이 추가된 LimitedDispatcher를 사용하는 것을 볼 수 있었다.

val ioDispatcher = Dispatchers.IO.limitedParallelism(64)
val defaultDispatcher = Dispatchers.Default.limitedParallelism(8)

현재 사용하는 스레드 갯수와 똑같이 제한을 걸어보았다.

> LimitedDispatcher@311d617d
> Dispatchers.Default

Dispatchers.IO는 LimitedDispatcher로 찍히는 반면 Default는 그대로 찍혔다.

여러 궁금한 경우가 생겨서 직접 테스트를 해보았다. 글이 길어질 것 같아 코드는 따로 첨부하지 않았다.

  1. Default에 스레드 16개 제한 걸기
    -> 16개 제한을 걸고 실행시켰는데 한 번에 8개의 코루틴만 동작했다.
  2. IO에 스레드 128개 제한 걸기
    -> 한 번에 128개의 코루틴이 동작했다.

그래서 추론해본 결과는 다음과 같다.

Default.limit()은 단순히 스레드 제한만 추가, IO.limit()은 새로운 디스패처 생성

그냥 추론한거라 맞을지는 모르겠다.

스레드 갯수를 제한하면 오히려 효율성이 떨어지는거 아닌가?

하지만 여러 스레드가 공유자원에 동시에 접근하는 문제를 예방하기 위해 사용할 수 있다.

suspend fun main(): Unit = coroutineScope {
    val defaultDispatcher = Dispatchers.Default.limitedParallelism(1)
    var count = 0
    launch(defaultDispatcher) {
        repeat(1_000_000) {
            launch {
                count++
            }
        }
    }
    delay(1000L)
    println(count)
}

위의 예시처럼 여러 코루틴에서 특정한 공유자원에 접근해야 할 때 경쟁 상태(race condition) 를 방지하기 위해 사용할 수 있다.

그 외에도 사용자가 버튼을 중복 클릭하는 경우를 방지하거나, 데이터베이스에서 최대 동시 접근 가능 수를 제한하거나 하는 등 여러 경우에 사용할 수 있을 것 같다.

결론

Dispatcher를 상황에 알맞게 사용하는 것이 중요하다.

스레드 갯수를 제한하는 경우에는 limitedParallelism()을 사용하자.
그리고 짐작했겠지만 맨 위 문제는 ioDispatcher()가 더 빨리 끝난다.

참고자료

0개의 댓글