안드로이드 코루틴 취소와 타임아웃, 서스펜딩함수, 코루틴컨텍스트와 디스패처, CHE와 슈퍼바이저잡 정리

SSY·2022년 12월 15일
0

Coroutine

목록 보기
2/8
post-thumbnail

1. 취소와 타임아웃

1. 취소

실행중인 코루틴을 취소시키는 방법은 간단하다. launchasync로부터 반환받는 Job객체에 .cancel를 호출해주는 것이다.

fun cancellingCoroutineExecution() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancel()
    job.join()
    println("main: Now I can quit.")
}

위 코드는 1개의 코루틴 내, 1,000개의 반복문 0.5초 간격으로 실행시키고 있으며, 코루틴이 시작된 후, 1.3초 후에 이를 중단한다. 취소 결과, 순조롭게 종료됨을 확인 가능하다.

Log
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
내용을 입력하세요.

하지만 취소할 때, 간과하지 말아야할 게 있다. 그것은 코루틴을 안정적으로 취소하기 위해선 내부에 suspend function을 사용해야만 한다는 것이다. 그렇지 않을 경우 취소가 안된다.

아래 코드의 경우, launch{}블럭 내, suspend function을 사용하지 않은 연산을 진행하고 있으며, 위와 동일하다. 0.5초 간격으로 딜레이를 주고 있으며, 코루틴 시작 이후 1.3초 후 취소 명령을 내리고 있다. 하지만 결과는 어떨까

변경된 점
반복하는 launch블록 내, deleay()같은 suspend function을 사용하지 않는다.

fun cancellationIsCooperative() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 10) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
  
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

그리고 결과는? 예상했듯, 코루틴이 중단되지 않는다.

Log
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
job: I'm sleeping 5 ...
job: I'm sleeping 6 ...
job: I'm sleeping 7 ...
job: I'm sleeping 8 ...
job: I'm sleeping 9 ...
main: Now I can quit.

여기서 주목해야할 점은 바로, cancel()을 호출해주었는데도 불구하고, launch블록 내부가 중단되지 않고 끝까지 실행되었다는 점이다. 뿐만 아니라 이러한 점은 단순히 코드 겉면만 보고는 무엇이 문제인지 진단하는 것은 불가능하다는 것이다. 겉보기엔 전혀 이상이 없다. 왜 이런 문제가 발생하는 걸까?

코틀린 공식 문서에 보면 다음과 같이 답을 내려주고 있다.

[Kotlin 공홈]
However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled

즉, 코루틴 cancel()을 온전하게 진행하기 위해서는 코루틴 Job의 상태를 판단할 수 있는 isActive 필드를 통해서 검사를 진행해주거나 suspend function을 사용해야 한다는 것이다.(확장함수를 사용하면 그 내부에서 isActive'를 검사하기에 온전히 종료시킬 수 있는 것이다.)

Coroutine 안전한 종료를 위한 필수 지식
1. isActive를 사용하여 검사를 진행하거나
2. suspend function을 사용해야 한다.

fun makingComputationCodeCancellable() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        
        // ⭐️ `isActive`를 통한 코루틴 실행 여부 검사
        while (isActive) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

위 코드를 보면 알겠지만, isActive를 사용하여 조건을 체크해주고 있다.

fun makingComputationCodeCancellableUsingYield() = runBlocking {
   
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 20) {
        
        	// ⭐️ `yield()`를 통한 코루틴 자원 이관
            yield()
            
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
  
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

또는 suspend확장 함수인 yield를 사용한 결과 종료에 문제가 없음을 확인할 수 있다.

Log
// reulst
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

2. 타임아웃

정확히 몇 초 후에 코루틴을 종료시키고 싶을때가 있다. 그럴 땐 withTimeout() or withTimeoutOrNull 사용을 고려해봄직 하다.

2.1. withTimeout

아래 코드를 보면 withTimeout 인자값에 1300L을 넣었다. 이는 정확히 1.3초 이후에 해당 코루틴을 종료해달라는 뜻이다. 그리고 try - catch 문으로 감싼 이유는, 해당 코루틴이 종료되면 TimeoutCancellationException을 리턴하기 때문이다.

runBlocking {
    try {
        withTimeout(1300L) {
            repeat(1000) { i ->
                Log.i("studyTest", "I'm sleeping $i ....")
                delay(500L)
            }
        }
    } catch (e: TimeoutCancellationException) { }
}

2.2. withTimeoutOrNull

withTimeout 함수와 동작은 똑같다. 다만 다른점이 있다면, 해당 함수는 코루틴 작업이 취소된다 하더라도 Exception을 던지지 않는다는 것이다. 그리고 OrNull이름에 알맞게 null을 반환한다. (그래서 try_catch문으로 감쌀 필요가 없다.)

runBlocking {
    val result = withTimeout(1300L) {
        repeat(1000) { i ->
            Log.i("studyTest", "I'm sleeping $i ....")
            delay(500L)
        }
    }
}

2. 서스펜딩 함수

suspend함수는 Coroutine Builder -> suspend함수 내에서 사용되는 하나의 비동기 작업 단위라고 볼 수 있다. 다음과 같이 말이다.

2.1. 코루틴 빌더 내에서 suspend함수를 사용하는 경우

CoroutineScope(Dispatchers.IO).launch {
    val userInfo = getUserInfo()
}

2.2. suspend함수 내에서 suspend함수를 사용하는 경우

suspend fun suspendProcess_A(): String {
    return "A_Result"
}
suspend fun suspendProcess_B(value: String): String {
    return "B_Result"
}
suspend fun suspendProcess_C(value: String): String {
    return "C_Result"
}
suspend fun getUserInfo() {
    val result_a = suspendProcess_A()
    val result_b = suspendProcess_B(
        value = result_a
    )
    suspendProcess_C(
        value = result_b
    )
}

그리고 이러한 작업들은 '하나의 비동기 작업 단위'가 되어 순차적으로 처리될 수 있게 된다. 그 처리 방법은 일전에 정의해두었던 CPS방식으로 빌드가 된다.
참고 : [코틀린 코루틴] 코루틴의 중단과 재개: CPS

3. CoroutineContext와 Dispatcher

3.1. CoroutineContext

CoroutineContext란, 코루틴의 전반적인 상태 정보를 알려주는 Element들의 집합이다. 이러한 Element에는 Coroutine의 에러를 처리하는 ExceptionHandler, 이름을 정의하는 CoroutineName, 코루틴의 상태정보에 접근할 수 있는 Job, 코루틴의 동작 스레드를 결정하는 Dispatcher들이 operator plus 연산자를 통해 조합될 수 있다. 이는 아래와 같이 도식화가 가능하다.

위 그림을 보면 luaunch 블럭 내, CoroutineContext가 들어가고 있는데 이는 여러 요소들이 + 연산자로 더해지고 있는걸 확인할 수 있다. 위에서 설명한 4가지 요소들을 바로 ‘Element’들이라고 한다. 그럼 코루틴 컨텍스트를 좀 더 싶도깊게 이해하기 위해 코드를 직접 확인해보자.

public operator fun <E : Element> get(key: Key<E>): E?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

public operator fun plus(context: CoroutineContext): CoroutineContext = ...impl...

public fun minusKey(key: Key<*>): CoroutineContext

a. get()

이 함수는 오퍼레이터 함수로, 이미 설정되어 있는 Coroutine Context를 반환한다.

class MyCoroutineScope: CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.IO +
                Job() +
                CoroutineName("hello_AOS_Study") +
                CoroutineExceptionHandler { _, exception ->
            Log.i("ExceptionLog", "코루틴 익셉션 발생!")
        }
}

MyCoroutineScope 클래스는 CoroutineScope를 상속받고, 그에 대한 멤버인 operator fun get()을 사용해서 custom한 Coroutine Context를 지정하고 있다.

또한 custom한 Coroutine Context정의는 상속을 활용하는 것만이 아닌, Coroutine Builder내부 파라미터로 주입이 가능하다.

// ⭐️ Coroutine Scope Provider의 파라미터로도 `Coroutine Context`주입이 가능하다.

// ⭐️⭐️ 하지만 Coroutine Builder 내부에 `Coroutine Context`가 새로 주입될 경우, 이를 우선시한다.
CoroutineScope(Dispatchers.IO).launch(
    Job() +
    CoroutineName("hello_AOS_Study") +
    CoroutineExceptionHandler { _, exception ->
            Log.i("ExceptionLog", "코루틴 익셉션 발생!")
    } +
    Dispatchers.Default
)

즉, 위와 같이 설정을 한 후, 코루틴 작업을 실행시키려 할때, 내부적으로 get오퍼레이터 함수를 호출함으로써 ‘현재 설정되어 있는 콘텍스트’를 가져다 사용하는 것이다.

b. fold()

CoroutineScope(Dispatchers.IO).launch(
    Job() +
        CoroutineName("hello_AOS_Study") +
        CoroutineExceptionHandler { _, exception ->
            Log.i("ExceptionLog", "코루틴 익셉션 발생!")
        }
)

위에서도 말했다시피, Coroutine Context는 Coroutine Scope Provider와 Coroutine Builder 2군데에서 설정이 가능하다. 하지만 그럴 경우, fold()메서드를 사용해 이를 누적한 새로운 Coroutine Context를 만들게 된다.

위 코드는 Coroutine Scope Provider에 우선적으로 Dispatchers(Coroutine Context의 Element 중, 1가지)를 설정해 주었다. 그리고 나서, Coroutine Builder에 CoroutineName, CoroutineExceptionHandler를 주입하고 있다. 즉, 위처럼 초기 다른 부분에서 Element를 설정해줄 경우, 내부적으로 fold()를 호출Coroutine Context를 조합하고 반환해준다.

c. plus()

같은 예제를 사용하면 이해가 더욱 쉬워서 또 써볼까 한다.

CoroutineScope(Dispatchers.IO).launch(
    Job() +
    CoroutineName("hello_AOS_Study") +
    CoroutineExceptionHandler { _, exception ->
        Log.i("ExceptionLog", "코루틴 익셉션 발생!")
    }
)

plus오퍼레이터 함수는 직관적으로 알 수 있듯이, 코루틴 콘텍스트 Element들을 더해주는 함수이다. 위처럼 +를 사용하면 CoroutineContext에 내부적으로 각각의 Element들을 더해준다. 그리고 더해진 콘텍스트를 반환해준다.

d. minusKey()

이 또한 직관적으로 알 수 있듯이, 코루틴 콘텍스트 Element들을 제거해주는 녀석이다. 예제를 통해 알아보자.

val exceptionHandler = CoroutineExceptionHandler { _, exception ->
            Log.i("ExceptionLog", "코루틴 익셉션 발생!")
        }
        
val context = 
        Dispatchers.IO + 
        Job() +
        CoroutineName("hello_AOS_Study") +
        exceptionHandler

val resultContext = context.minusKey(exceptionHandler.key)

GlobalScope.launch(resultContext) {
    ...
}

즉, -를 사용해 코루틴 콘텍스트 Element들을 선택적으로 제거할 수 있다. 이렇게 코루틴콘텍스트에 정의되어 있는 4가지 함수들을 통해서 코루틴 콘텍스트에는 어떤 Element들이 있는지, 그리고 이러한 Element들을 서로 더해줄 수 있고, 뺴줌이 가능하다.

3.2. CoroutineDispatcher

그렇다면 이제 Elememnt를 구성하는 요소 중 하나. 디스패쳐에 대해 알아볼까 한다. 디스패처란 무엇일까? 사전적 의미로 찾아보면, 디스패처는 '보내다'라는 뜻을 가지고 있다.

이 즉, 하나의 작업(=스레드)를 특정 스레드 또는 스레드풀로 보낸다는 것을 의미한다. 그러기에 코루틴 디스패처를 사용하여 컨텍스트를 구성하는 방법은 크게 두 가지가 있다.

CoroutineDispatcher를 사용해 Context구성하는 방법?
a. 특정 스레드로 제한하기
b. 스레드풀을 만들고 그 곳에 제한하기

a. 특정 스레드로 제한하기

특정 스레드로 제한하기 위한 코루틴 디스패처들은 다음의 것들이 있다.

디스패처 종류
Dispatchers.IO,
Dispatchers.Main,
Dispatchers.Default,
Dispatchers.Unconfined

a.1. Dispatcher.Main

이는 코루틴 작업을 메인스레드에서 작업하겠다는 의미이다. 즉, 안드로이드 관점으로 보자면 UI스레드에서 작업을 하겠다는 의미이다.

CoroutineScope(Dispatchers.Main).launch {
    println("현재 스레드 : [${Thread.currentThread().name}]")
}

또한 동일한 의미로 코루틴 빌더에 콘텍스트를 넘겨주지 않는 경우가 있다.(아래의 경우) 그런 경우엔 해당 코루틴을 호출한쪽의 스레드를 실행시키게 된다. (launch의 경우 runBlocking으로부터 실행되고 있다. 또한 runBlocking의 경우 Main으로부터 실행되고 있다. 즉 메인스레드에서 돌아가게 된다)

runBlocking {
    launch {
        println("현재 스레드 : [${Thread.currentThread().name}]")
    }
}

a.2. Dispatchers.IO

이는 코루틴 작업을 백그라운드에서 작업하겠다는 의미이다. 로컬DB작업, 서버 작업, 파일 입출력 작업을 할때 주로 사용한다. 결과는 예상 가능하니 코드는 첨부하지 않겠다.

a.3. Dispatchers.Default

해당 디스패쳐는 기기가 보유한 CPU의 코어수와 동일하다. 예를 들어, 6코어라면 최대 6개의 스레드만 돌아간다.

repeat(100) {
    CoroutineScope(Dispatchers.Default).launch {
        println("현재 스레드 : [${Thread.currentThread().name}]")
    }
}

Dispatcher.Default이상하면서도 신기방구한 점
이걸 내가 8코어 핸드폰으로 테스트를 했다. 근데.. 그보다 더 많은 스레드 갯수에서 돌아갔다.. (10개?) 실제 측정은 더 많은 스레드가 돌아가는게 보이긴 했지만 그럼에도 불구하고 실무 코딩시엔 최대 스레드 갯수를 보수적으로 잡음이 안전해보인다.

a.4. Dispatchers.Unconfined

이는 코루틴 콘텍스트를 한정하지 않는다는 뜻이다. 즉, 한정하지 않으니 콘텍스트가 바뀔 수 있다는 뜻이고, 이는 실행되는 스레드의 위치가 바뀔 수 있다는 뜻이다. (그래서 조심해야한다)

예를 들어, 처음에 Dispatchers.Unconfined를 지정하고 이를 호출한 스레드는 메인 스레드라고 가정해보자. 그리고 그 이후, 코루틴의 첫 번째 중단 지점(suspend)을 만나고 이때, context가 바뀌었을 때, 바뀐 콘텍스트의 스레드로 코루틴이 실행된다는 뜻이다. 코드를 통해 알아보도록 하자.

runBlocking {
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
}

위 코드를 보면 동시에 두개의 코루틴을 실행시켜주고 있다. 첫 번째의 코루틴은 콘텍스트를 Dispatchers.Unconfined로 지정했다. 그리고 이녀석을 통해 실행된 코루틴은 당연히 메인에서 처음엔 돌고 있다.

하지만, 첫 번째 중단 지점(delay)를 만난 직후, 작동하는 스레드 위치가 바뀐것을 확인할 수 있다, 여기서 delay함수는 DefaultExecutor스레드에서 돌게 되는데, 이녀석을 만나자 마자 바로 바뀌어버린 것이다. 위의 코드와 로그를 찬찬히 읽어보면 이해가 갈거라 생각한다.

a.5. newSingleThreadContext

위처럼 디스패처를 사용해서 스레드를 지정할 수도 있고, context에 newSingleThreadContext를 넣어줌으로써 새로운 스레드를 지정할 수도 있다. 다음과 같이 말이다.

fun main() {
    runBlocking {

        val dispatcher = newSingleThreadContext(name = "ServiceCall")
        val task = GlobalScope.launch(dispatcher) {
            printCurrentThread()
        }
        task.join()
    }
}

a.6. newFixedThreadPoolContext

위의 경우는 우리가 어떤 스레드에서 코루틴을 시작할지를 결정할 수 있었다. 하지만 우리가 작동시킬 스레드풀의 갯수를 지정하고 그 안에서 코루틴을 실행시키는 작업 또한 가능하다. 아래와 같이 말이다. 또한 이는 부하분산을 알아서 해주기에 따로 할 작업은 없다

val dispatcher = newFixedThreadPoolContext(4, "mypool")
repeat(100) {
    GlobalScope.launch(dispatcher) {
        Log.i("스레드테스트", "start in ${Thread.currentThread().name}")
        delay(1000)
        Log.i("스레드테스트", "resume in ${Thread.currentThread().name}")
    }
}

위 사진을 봐서 알겠지만, 최대 스레드풀을 4개로 한정해주었다. 그리고 100번 반복문을 돌렸다. 예상했듯, 스레드풀은 4개안에서만 돌아간다는걸 알 수 있다.

만약 위 스레드풀 갯수를 4개에서 8개로 올리면?
가용스레드풀은 8까지만 로그에 찍힐 것이다.

4. CEH와 슈퍼바이저잡

4.1. CHE

CEH의 뜻은 Coroutine Exception Handling이란 뜻이다. 이름에 맞게 코루틴을 사용하다 오류가 났을 때 에러를 핸들링한다는 뜻이다. 다음과 같은 구조의 코루틴이 있다고 해보자

위와 같은 상황에서 자식 코루틴에 에러가 생기면 어떻게 될까? Child Coroutine1에 에러가 생긴다고 가정하면, 그 에러는 Parent Coroutine에게 전달된다. 그리고 부모는 이 에러를 모든 자식들에게 전파시키게 된다. 그 결과 전체 코루틴은 멈추게 되는 것이다. 아래와 같이 말이다.

suspend fun main() {
    CoroutineScope(Dispatchers.IO).launch {
        val firstChildJob = launch(Dispatchers.IO) {
            throw AssertionError("첫 째 Job이 AssertionError로 인해 취소됩니다.")
        }

        val secondChildJob = launch(Dispatchers.Default) {
            delay(1000)
            println("둘 째 Job이 살아있습니다.")
        }

        firstChildJob.join()
        secondChildJob.join()
    }.join()
}

그러므로, 자식 코루틴에서 에러가 생긴다고 해도, 전체 코루틴을 멈추지 않게 하기 위한 작업이 필요하다. 그 작업을 하기 위해선 SupervisorJob을 알 필요가 있다.

4.2. SupervisorJob

SupervisorJob을 사용하면 해당 자식 코루틴의 에러는 부모로 전파되지 않는다. 그리고 이는 코루틴 빌더(async, launcher, withContext...)에 context를 지정하는 자리에 함께 쓰일 수 있다. 아래와 같이 말이다.

suspend fun main() {
    val supervisor = SupervisorJob()

    CoroutineScope(Dispatchers.IO).launch {
        val firstChildJob = launch(Dispatchers.IO + supervisor) {
            throw AssertionError("첫 째 Job이 AssertionError로 인해 취소됩니다.")
        }

        val secondChildJob = launch(Dispatchers.Default) {
            delay(1000)
            println("둘 째 Job이 살아있습니다.")
        }

        firstChildJob.join()
        secondChildJob.join()
    }.join()
}

profile
불가능보다 가능함에 몰입할 수 있는 개발자가 되기 위해 노력합니다.

0개의 댓글