코틀린 코루틴 2.12 - 디스패처

Seogi·2025년 7월 29일

Kotlin

목록 보기
16/27

디스패처를 이용해 코루틴이 실행되어야 할 스레드(또는 스레드 풀)를 결정할 수 있다.
그리고 디스패처를 정하는 것은 CoroutineContext이다.

기본 디스패처

디스패처를 설정하지 않으면 기본적으로 설정되는 디스패처는 CPU 집약적인 연산을 수행하도록 설계된 Dispatchers.Default이다.

이 디스패처는 CPU 개수와 동일한 수(최소 두 개 이상)의 스레드 풀을 가지고 있다.

suspend fun main() = coroutineScope {
    repeat(1000) {
        launch { // 또는 launch(Dispatchers.Default) {
            // 바쁘게 만들기 위해 실행
            List(1000) { Random.nextLong() }.maxOrNull()

            val threadName = Thread.currentThread().name
            println("Running on thread: $threadName")
        }
    }
}

// Running on thread: DefaultDispatcher-worker-1  
// Running on thread: DefaultDispatcher-worker-5  
// Running on thread: DefaultDispatcher-worker-7  
// Running on thread: DefaultDispatcher-worker-6  
// Running on thread: DefaultDispatcher-worker-11  
// Running on thread: DefaultDispatcher-worker-2  
// Running on thread: DefaultDispatcher-worker-10  
// Running on thread: DefaultDispatcher-worker-4  
// ...

runBlocking은 디스패처가 설정되어 있지 않으면 자신만의 디스패처를 사용하기 때문에 Dispatchers.Default가 자동으로 선택되지 않는다.

기본 디스패처 제한하기

Dispatchers.DefaultlimitedParallelism을 사용하면 디스패처가 같은 스레드 풀을 사용하지만 같은 시간에 특정 수 이상의 스레드를 사용하지 못하도록 제한할 수 있다.

메인 디스패처

  • 안드로이드에서 메인 스레드는 UI와 상호작용하는 데 사용하는 유일한 스레드이다.
  • 메인스레드가 블로킹되면 전체 애플리케이션이 멈춘다.
  • 메인스레드에서 코루틴을 실행하려면 Dispatchers.Main을 사용하면 된다.

단위 테스트에서 메인 디스패처를 사용하고 싶으면 Dispatchers.setMain(dispatcher)로 디스패처를 설정해야 한다.

class SomeTest {

    private val dispatcher = Executors
        .newSingleThreadExecutor()
        .asCoroutineDispatcher()

    @Before
    fun setup() {
        Dispatchers.setMain(dispatcher)
    }

    @After
    fun tearDown() {
        // 메인 디스패처를
        // 원래의 Main 디스패처로 되돌린다.
        Dispatchers.resetMain()
        dispatcher.close()
    }

    @Test
    fun testSomeUI() = runBlocking {
        launch(Dispatchers.Main) {
            // ...
        }
    }
}

안드로이드에서는 기본 디스패처로 메인 디스패처를 주로 사용한다.

만약, 시간이 오래 걸리는 I/O작업이나 블로킹 함수가 있는 라이브러리를 사용하는 경우 어떻게 해야 할까?

Dispatchers.IO는 이런 상황에서 필요한 디스패처이다.

IO 디스패처

Dispatcher.IO는 파일을 읽고 쓰는 경우, shared preference를 사용하는 경우, 블로킹 함수를 호출하는 경우처럼 I/O연산으로 스레드를 블로킹할 때 사용하기 위해 설계되었다.

suspend fun main() {
    val time = measureTimeMillis {
        coroutineScope {
            repeat(50) {
                launch(Dispatchers.IO) {
                    Thread.sleep(1000)
                }
            }
        }
    }
    println(time) // ~1000
}

Dispatchers.IO는 64개로 제한이 된다. 따라서, 위 코드의 수행시간은 1초밖에 걸리지 않는다.

Dispatchers.DefaultDispatchers.IO는 같은 스레드 풀을 공유한다. 이는 최적화 측면에서 중요한 사실인데, 스레드는 재사용되고 다시 배분될 필요가 없다.

스레드의 한도는 독립적이기 때문에 다른 디스패처의 스레드를 고갈시키는 경우는 없다.

class DiscUserRepository(
    private val discReader: DiscReader
) : UserRepository {
    override suspend fun getUser(): UserData =
        withContext(Dispatchers.IO) {
            UserData(discReader.read("userName"))
        }
}

Dispatchers.IO를 사용하는 가장 흔한 경우는 라이브러리에서 블로킹 함수를 호출해야 하는 경우이다. 이런 경우 withContext(Dispatchers.IO)로 래핑해 중단 함수로 만드는 것이 가장 좋다.

커스텀 스레드 풀을 사용하는 IO 디스패처

Dispatchers.IO에는 limitedParallelism함수를 위해 정의된 특별한 작동 방식이 있다. limitedParallelism 함수는 독립적인 스레드 풀을 가진 새로운 디스패처를 만든다.

이 경우, 스레드 수가 64개로 제한되지 않는다.

suspend fun main(): Unit = coroutineScope {
    launch {
        printCoroutinesTime(Dispatchers.IO)
        // Dispatchers.IO took: 2074
    }

    launch {
        val dispatcher = Dispatchers.IO
            .limitedParallelism(100)
        printCoroutinesTime(dispatcher)
        // LimitedDispatcher@XXX took: 1082
    }
}

suspend fun printCoroutinesTime(
    dispatcher: CoroutineDispatcher
) {
    val test = measureTimeMillis {
        coroutineScope {
            repeat(100) {
                launch(dispatcher) {
                    Thread.sleep(1000)
                }
            }
        }
    }
    println("$dispatcher took: $test")
}

100개의 코루틴이 각각 스레드를 1초씩 블로킹하는 경우를 생각해보자.

이러한 코루틴을 Dispatchers.IO에서 실행하면 2초가 걸린다.

동일한 동작을 limitedParallelism으로 100개의 스레드를 사용하는 Dispatchers.IO에서 실행하면 1초가 걸린다.

limitedParallelism을 아래와 같은 방식으로 생각할 수 있다.

정해진 수의 스레드 풀을 가진 디스패처

Executors 클래스를 사용해 스레드의 수가 정해져 있는 스레드 풀이나 캐싱된 스레드 풀을 만들 수 있다. 이렇게 만들어진 스레드 풀은 ExecutorServiceExecutor 인터페이스를 구현하며, asCoroutineDispatcher 함수를 이용해 디스패처로 변환하는 것도 가능하다.

val NUMBER_OF_THREADS = 20
val dispatcher = Executors
    .newFixedThreadPool(NUMBER_OF_THREADS)
    .asCoroutineDispatcher()

ExecutorService.asCoroutineDispatcher()로 만들어진 디스패처의 가장 큰 문제점은 close 함수로 닫혀야 한다는 것이다. 이를 깜빡하면 스레드 누수가 일어날 수 있다.

또 다른 문제는 정해진 수의 스레드 풀을 만들면 스레드를 효율적으로 사용하지 않는다는 것이다. 사용하지 않는 스레드가 다른 서비스와 공유되지 않고 살아있는 상태로 유지되기 때문이다.

싱글스레드로 제한된 디스패처

다수의 스레드를 사용하는 모든 디스패처에서는 공유 상태로 인한 문제를 생각해야 한다.

var i = 0

suspend fun main(): Unit = coroutineScope {
    repeat(10_000) {
        launch(Dispatchers.IO) { // 또는 Default 디스패처
            i++
        }
    }
    delay(1000)
    println(i) // ~9930
}

위 예제에서 값은 10,000이 되어야 하지만 실제로는 이것보다 작은 값을 갖게 되는데, 이는 동일 시간에 다수의 스레드가 공유 상태를 변경했기 때문이다.

이런 문제를 해결하는 다양한 방법이 있는데, 싱글 스레드를 가진 디스패처를 사용하는 방법이 그 중 하나이다.

val dispatcher = Executors.newSingleThreadExecutor()
    .asCoroutineDispatcher()

싱글 스레드를 사용하면 동기화를 위한 조치가 필요하지 않다. Executors를 사용하여 싱글 스레드 디스패처를 만드는 방법이 대표적이다.

하지만 디스패처가 스레드 하나를 액티브한 상태로 유지하고 있으며, 더 이상 사용되지 않을 때는 스레드를 반드시 닫아야 한다는 문제점이 있다.

최근에는 병렬처리를 1로 제한한 Dispatchers.DefaultDispatchers.IO를 주로 사용한다.

var i = 0

suspend fun main(): Unit = coroutineScope {
    val dispatcher = Dispatchers.Default
        .limitedParallelism(1)

    repeat(10_000) {
        launch(dispatcher) {
            i++
        }
    }

    delay(1000)
    println(i) // 10000
}

단 하나의 스레드만 가지고 있기 때문에 이 스레드가 블로킹되면 작업이 순차적으로 처리되는 것이 가장 큰 단점이다.

var i = 0

suspend fun main(): Unit = coroutineScope {
    val dispatcher = Dispatchers.Default
        .limitedParallelism(1)

    repeat(10_000) {
        launch(dispatcher) {
            i++
        }
    }

    delay(1000)
    println(i) // 10000
}

제한받지 않는 디스패처

Dispatchers.Unconfined는 스레드를 바꾸지 않는다는 점에서 이전 디스패처들과 다르다.

suspend fun main(): Unit =
    withContext(newSingleThreadContext("Thread1")) {
        var continuation: Continuation<Unit>? = null

        launch(newSingleThreadContext("Thread2")) {
            delay(1000)
            continuation?.resume(Unit)
        }

        launch(Dispatchers.Unconfined) {
            println(Thread.currentThread().name) // Thread1

            suspendCancellableCoroutine<Unit> {
                continuation = it
            }

            println(Thread.currentThread().name) // Thread2

            delay(1000)

            println(Thread.currentThread().name)
            // kotlinx.coroutines.DefaultExecutor
            // (delay가 사용하는 스레드이다.)
        }
    }

이 디스패처가 시작되면 시작한 스레드에서 실행이 된다. 재개되었을 때는 재개한 스레드에서 실행이 된다.

이는 단위 테스트할 때 유용하다. launch를 호출하는 함수를 테스트해야 된다고 생각해보자.
시간을 동기화하는 건 쉽지 않다. 이런 경우 Dispatchers.Unconfined로 다른 디스패처를 대체하여 사용할 수 있다.

모든 스코프에서 제한받지 않는 디스패처를 사용하면 모든 작업이 같은 스레드에서 실행되기 때문에 연산의 순서를 훨씬 쉽게 통제할 수 있다.

성능적인 측면에서 보면 스레드 스위칭을 일으키지 않는다는 점에서 비용이 가장 저렴하다. 하지만 블로킹 호출을 하는데도 실수로 Main 스레드에서 실행한다면 어떻게 될까? 이러한 이유로 현업에서는 사용하기 쉽지 않다.

따라서 코루틴이 실행될 스레드에 대해서 신경 쓸 필요가 없을 때 사용하는 것이 좋다.

메인 디스패처로 즉시 옮기기

코루틴을 배정하는 것에도 비용이 든다.
withContext가 호출되면 코루틴은 중단되고 큐에서 기다리다가 재개된다.
스레드에서 이미 실행되고 있는 코루틴을 배정하면 작지만 불필요한 비용이 든다고 할 수 있다.

suspend fun showUser(user: User) =
    withContext(Dispatchers.Main) {
        userNameElement.text = user.name
        // ...
    }

위 함수가 이미 메인 디스패처에서 호출이 되었다면 다시 배정하는 데 쓸데없는 비용이 발생했을 것이다.

게다가 메인 스레드를 기다리는 큐가 쌓여있었다면 withContext 때문에 사용자 데이터는 약간의 지연이 있은 뒤에 보여지게 된다.

이런 경우를 방지하기 위해 필요할 경우에만 배정을 하는 Dispatchers.Main.immediate가 있다.

suspend fun showUser(user: User) =
    withContext(Dispatchers.Main.immediate) {
        userNameElement.text = user.name
        // ...
    }

메인 스레드에서 위 함수를 호출하면 스레드 배정 없이 즉시 실행된다.

메인 디스패처 외의 다른 디스패처에서는 즉시 배정하는 옵션을 현재 지원하지 않고 있다.

0개의 댓글