[코틀린 코루틴] 코루틴의 중단과 재개: CPS

SSY·2023년 12월 25일
0

Coroutine

목록 보기
6/7
post-thumbnail

시작하며

이전 포스팅에서 Coroutine이란 경량 스레드 라이브러라고 말씀드렸습니다. Co + Routine이라는 어원만 놓고 보면 병렬성 작업 처리 라이브러리가 아닌지 착각할 수 있지만, 이전 포스팅에서 샘플 코드를 활용하여 1개의 스레드 위에서 여럿 코루틴 작업이 진행된다는걸 확인했고, 이를 통해 동시성 라이브러리임을 확신할 수 있었습니다.

이것이 가능한 이유는, Kotlin은 중단 함수를 만났을 시 적절한 '중단'과 '재개' 지점을 기억하고 1개의 스레드 위에서 효율적인 Context Switching을 하기 때문이라고 말씀드렸는데요. 이는 CPS 컴파일 방식때문에 그렇습니다.

1. CPS 정의

CPS란 Continuation Passing Style의 약자입니다. 이는 최 상위에 존재하는 Continuation이라는 1개의 객체를 suspend함수 마지막 파라미터에 주입을 시작으로, 재귀 호출 및 연속적으로 전달한다는 의미입니다.

그러기에 CPS방식으로 컴파일이 진행되는 코루틴들은 각각의 스레드에서 병렬성 작업을 진행시키는 것이 아닌, 1개의 스레드에서 여럿 작업들이 ContextSwitching을 되어가며 진행되는 동시성 작업이라 할 수 있는 것입니다.

그렇다면 Continuation이라는 객체가 각각의 suspend함수에 어떻게 전달되는지 알아보도록 하겠습니다.

2. Continuation객체가 suspend함수에 전달되는 방식?

CPS를 처음 들으시는 분들은 Continuation객체를 처음 들어봤을거라 생각합니다. 그러기에 더더욱, 자신은 Continuation객체를 만든 적도 없을 뿐더러, suspend함수에 Continuation객체를 전달한 적 또한 없다고 생각하실 수 있습니다.

하지만 Kotlin은 suspend키워드가 붙은 함수를 만나면 특별한 방식으로 컴파일을 시킵니다. 그것이 바로, 중단 함수 마지막 파라미터에 Continuation타입의 파라미터를 생성한다는 의미이며, 이를 통해 Continuation객체를 중단 함수 내부로 전달한다는 의미입니다.

[컴파일 전]

suspend fun printUser(token: String) {
    println("Before")
    val userId = getUserId(token) // 중단 함수
    println("Got UserId : $userId")
    val userName = getUserName(userId, token) // 중단 함수
    println(User(userId, userName))
    println("After")
}

[컴파일 후]

fun printUser(token: String, continuation: Continuation) {
    println("Before")
    val userId = getUserId(
        token, 
        continuation // 새로 생성된 continuation파라미터
    ) // 중단 함수
    println("Got UserId : $userId")
    val userName = getUserName(
        userId, 
        token, 
        continuation // 새로 생성된 continuation파라미터
    ) // 중단 함수
    println(User(userId, userName))
    println("After")
}

위를 통해 주목해야할 부분은 printUser메서드의 시그니쳐 입니다. 첫 번째로 suspend키워드가 사라졌습니다. 두 번째로 마지막 파리미터에 Continuation타입 파라미터가 추가되었습니다.

즉, 중단함수는 컴파일되는 즉시, Continuation타입 파리미터의 추가와 동시에 suspend키워드가 사라지게 됩니다. 이로 인해 결국 중단함수도 일반 함수와 동일하게 동작한다는 점이 크게 주목해야할 부분이며, 이런 중단함수의 '중단'과 '재개'를 Continuation객체를 통해 상태 관리를 한다는 점이 가장 주목해야할 점이기도 합니다.

3. Continuation객체의 생성

Continuation객체로 printUser메서드 내부의 '재개'지점과 '상태값'을 반드시 조회할 수 있어야 합니다. printUser중단 함수를 통해 의사 코드를 작성해보겠습니다. 우선, Kotlin 컴파일러는 최 상단에 PrintUserContinuation객체로 Continuation기본 객체를 래핑합니다.

fun printUser(
    token: String,
    continuation: Continuation<*>
): Any {
    val continuation = continuation as? PrintUserContinuation
        ?: PrintUserContinuation(
        continuation as Continuation<Unit>,
        token
    )
    
    // 다음 코드 계속...
}

하지만 여기서 의문인 점이 있습니다. printUser메서드 또한 어딘가서에 호출되었다는 점입니다. 그리고 호출을 위해선 printUser메서드 파라미터에 Continuation객체의 주입이 필요조건이며, 이 코드의 과정이 어떻게 생겨먹었는지도 대략적으로 알 필요가 있습니다.

4. BaseContinuationImpl객체의 생성

결론부터 말씀드리자면, printUser메서드에는 코루틴 빌더로의 암시적 리시버로부터 참조되는 BaseContinuationImpl객체를 주입받습니다. 그 과정을 설명드려보겠습니다.

우선, printUser메서드 또한 중단함수이기에 반드시 코루틴 빌더 내부에서 실행되었다는 것은 변함없는 사실입니다.

fun main() {
   CoroutineScope(Dispatchers.Main).launch {
       printUser(
           token = "asdf123fsdfa"
       )
   }
}

그리고 위 코드 또한 컴파일 되는 즉시 Continuation 파라미터가 마지막에 뚫립니다. 그 후, 객체의 주입이 이뤄지죠.

fun main() {
   CoroutineScope(Dispatchers.Main).launch {
       printUser(
           token = "asdf123fsdfa"
           continuation = BaseContinuationImpl()
       )
   }
}

하지만 최상위에 선언된 코루틴 빌더는 하위에 어떤 suspend함수가 선언될지 알 수 없습니다. 그러기에 컴파일러는 단순 Continuation인터페이스를 구현한 기본 객체인 BaseContinuationImpl를 넣어주게 됩니다. IntelliJ ByteCode로 디컴파일하여 확인하면 아래와 같이 기본 Continuation구현 객체를 this키워드를 통해 주입하고 있는걸 확인할 수 있습니다.

또한 위 this키워드도 suspend함수 컴파일 시, Continuation파라미터가 뚫렸던 것처럼, 코루틴 빌더 마지막에 Continuation객체를 암시적 리시버로 받는것 또한 확인할 수 있습니다.

하지만 Continuation은 아래 코드를 보면 아시겠지만, 객체가 아닌 인터페이스입니다. 따라서 하위 구현 객체가 어떤 형태인지 알 필요가 있겠습니다.

package kotlin.coroutines

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

조금 더 깊이 들어가기 위해, 대중적으로 쓰이는 launch빌더를 통해 Continuation객체 생성을 추적해 보겠습니다.

제가 작성한 샘플 코드로 코루틴 빌더를 실행시키면 StandaloneCoroutine객체가 만들어 집니다. 그리고 위 객체를 또 다시 타고가면...?

AbstractCoroutine을 상속받는게 보입니다. AbstractCoroutine객체를 또 타고 가볼까요?

그러면 이 클래스는 결국 Continuation 인터페이스를 구현하고 있음을 알 수 있습니다. 즉 이를 통해 Continuation의 첫 번째 구현 객체가 만들어 진다는 것을 알 수 있습니다. (StandaloneCoroutine객체를 의미)

하지만 제가 printUser메서드에 주입되는 Continuation객체는 BaseContinuationImpl이라 말씀드렸던것 기억하시나요? 확인하기 위해 StandaloneCoroutine클래스의 멤버 함수인 start를 추적해보겠습니다. 아래 코드의 2 단계를 타고가보겠습니다.

[StandaloneCoroutine.start메서드 추적]

[CoroutineStart.invoke메서드 추적]

launch빌더 실행 시, 아무 파라미터도 넣지 않았을 경우, DEFAULT옵션이 선택되므로, block.startCoroutineCancellable을 타고가면 됩니다.

이제 위의 마자막 메서드인 resumeCancellableWith()BaseContinuationImpl객체를 반환하는지만 확인하면 됩니다. 우선, BaseContinuationImpl구현체 입니다. (해당 부분은 난독화되어있어 코드로 작성합니다.)

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    return if (this is BaseContinuationImpl) {
        create(
            value = receiver,
            completion = completion,
        )
    } else {
        createCoroutineFromSuspendFunction(completion = completion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

createCoroutineUnintercepted메서드의 리시버인 (suspend R.() -> T)의 형태가 BaseContinuationImpl에 해당하면BaseContinuationImpl을 반환하고 있습니다.

[의문 : (suspend R.() -> T)의 타입이 BaseContinuationImpl이 맞을까?]
그렇습니다. 신기하게도 디버깅 로그를 찍어보면 그렇습니다.

이로 인해 첫 번째로 생성하는 Continuation객체는 BaseContinuationImpl객체임이 확인되었습니다. 그 후, 호출되는 intercepted()resumeCancellableWith()를 통해 결국 resumeWith()가 호출됨으로써 CPS 프로세스가 시작되는것 또한 알 수 있습니다.

intercepted()호출을 통한 BaseContinuationImpl반환

resumeCancellableWith()호출을 통한 resumeWith()의 시작

위와 같은 프로세스를 거침으로써 코루틴 시작시 BaseContinuationImpl객체가 만들어진다는 것을 확인할 수 있으며, 이를 코루틴 빌더 안에서 this로 참조할 수 있다는 것 또한 알 수 있습니다.

그럼 다시 printUser메서드로 돌아오겠습니다. CPS컴파일이 진행되면 해당 메서드의 파라미터에 당연히 BaseContinuationImpl객체가 주입된다는 것을 납득할 수 있게 됩니다.

따라서 printUser메서드는 주입받는 Continuation객체의 타입 판별을 진행합니다. BaseContinuationImpl객체인지? PrintUserContinuation객체인지? 말이지요.

fun printUser(
    token: String,
    continuation: Continuation<*>
): Any {

    // 상위로부터 주입받는 Continuation객체의 구현 타입을 알 수 없다. 
    // 따라서 타입 체크 후, printUser에 알맞는 Continuation객체로 변환해주어야만 한다.
    
    // continuation객체가 재귀 호출을 통해 다시 전달받은 객체라면 그대로 둔다.
    // 그게 아니라면 새로운 PrintUserContinuation객체를 만들어 준다.
    val continuation = continuation as? PrintUserContinuation
        ?: PrintUserContinuation(
        continuation as Continuation<Unit>,
        token
    )
    
    // 다음 코드 계속...
}

그리고 위에 해당하는 PrintUserContinuation객체는 어떤 모습일까요? 아래와 같이 유추해볼 수 있습니다.

5. PrintUserContinuation객체의 형태

[Continuation객체의 상태]
이전에 Continuation객체는 '재개 지점'과 'suspend함수의 내부 상태'를 가진다고 여럿 말씀드렸습니다. 이를 통해, Continuation인터페이스 구현 객체는 아래의 모양새를 유추할 수 있습니다.

class PrintUserContinuation(
   val completion: Continuation<Unit>,
   val token: String
): Continuation<String> {
    override fun val context: CoroutineContext
        get() = completion.context
        
    // 중단 함수의 다음 재개 지점을 저장하는 로컬 프로퍼티    
    var label = 0
    // 중단 함수의 최종 결과를 반환하는 Result타입의 로컬 프로퍼티
    var result: Result<Any>? = null
    // printUser메서드의 내부 상태 저장을 위한 로컬 프로퍼티
    var userId: String? = null
    
    // PrintUserContinuation객체는 resumeWith메서드를 호출함으로써 동시성 작업을 시작시킨다.
    override fun resumeWith(result: Result<String>) {
        this.result = result
        val res = try {
            val r = printUser(token, this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
        } catch (e: Throwable) {
            Result.fauliure(e)
        }
        completion.resumeWith(res)
    }
}

위 코드엔 labeluserId라는 로컬 프로퍼티가 선언되어 있습니다. lebel은 다음 중단 함수가 다시 재개되어야 하는 지점을 의미하며 userId는 printUser메서드 내부에서 가지고 있어야 하는 상태값을 의미합니다.

같은 원리로 Continuation객체는 코루틴 빌더 하위에 존재하는 모든 중단 함수 내부에 정의됩니다.(이는 특정 중단 함수의 전용 Continuation객체라는 의미와도 같습니다.) 그리고 해당 메서드의 label(다음에 재개되어야 하는 지점)과 userId와 같은 내부 상태값들을 가지게 됩니다.

그렇다면 위 PrintUserContinuation메서드를 통해 printUser메서드 전체는 어떻게 컴파일이 진행될까요?

6. printUser메서드의 CPS 컴파일 진행

fun printUser(
    token: String,
    // 1. 재귀호출을 위한 Continuation타입 파라미터의 생성
    continuation: Continuation<*>
): Any {

    // 2. 주입받은 파라미터의 타입을 체크하여 Continuation객체 생성 
    val continuation = continuation as? PrintUserContinuation
        ?: PrintUserContinuation(
        continuation as Continuation<Unit>,
        token
    )
    
    var result: Result<Any>? = continuation.result
    var userId: String? = continuation.userId
    val userName: String
    
    // 3. 주입받은 continuation객체의 다음 재개 지점을 
    // `label`로컬 프로퍼티를 활용해 계산한다. 
    // 이는 continuation객체의 내부 상태값(`label`과 `내부 로컬 프로퍼티`)에 따라 달라진다.
    
    // printUser메서드를 처음 들어왔다면 label은 반드시 0이다. 
    // 아래 지점을 실행한다.
    if (continuation.label == 0) {
        println("Before")
        
        // 다음 재개 지점을 초기화한다.
        continuation.label = 1
        val res = getUserId(token, continuation)
        if (res == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
        result = Result.success(res)
    }
    // printUser메서드를 재귀호출을 통해 들어왔고, getUserId의 결과값을 수신받은 직후라면 label은 1이다,
    // 아래 지점을 실행한다.
    if (continuation.label == 1) {
    
        // continuation객체가 보유한 내부 상태값을 읽어들인다.
        userId = result!!.getOrThrow() as String
        println("Got UserId: $userId")
        
        // 다음 재개 지점을 초기화한다.
        continuation.label = 2
        continuation.userId = userId
        val res = getUserName(userId, token, continuation)
        if (res = COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
        result = Result.success(res)
    }
    // printUser메서드를 재귀호출을 통해 들어왔고, getUserName의 결과값을 수신받은 직후라면 `label`은 2이다.
    // 아래 지점을 실행한다.
    if (continuation.label == 2) {
        userName = result!!.getOrThrow() as String
        println(User(userId as String, userName))
        println("After")
        
        // 모든 계산이 다 끝난다면 Unit을 반환함과 동시에 
        // CPS프로세스를 통한 동시성 작업을 끝낸다.
        return Unit
    }
    error("Impossible")
}

위 코드 주석에 추가적인 설명을 덧붙여보자면 아래와 같습니다.

우선, 여러번 말씀드렸다시피, 중단 함수 마지막 파라미터에 Continuation 타입의 파라미터가 뚫립니다. 그 후, 해당 파라미터로 받은 객체가 최 상위 중단 함수의 전용 Continuation객체(ex. PrintUserContinuation)인지 체크합니다. 만약 아니라면 이는 printUser메서드를 처음 실행시킨 것이므로 새로 생성해줍니다. 아니라면, 기존의 객체를 하위 타입으로 캐스팅을 하고 그대로 사용합니다.

그 후, Continuation객체를 통해 다음 재개지점이 어디인지 알아야 합니다. 만약, printUser메서드를 처음 실행시킨 것이라면, continuation객체의 label필드(다음 실행 지점)는 반드시 0이 되는데, 이떈 최 상위 if문을 실행시킵니다.

[다음 재개 지점의 저장]
continuation.label = 1 와 같은 코드를 통해 다음 재개 지점이 저장되는걸 알 수 있습니다.

반면, continuation객체가 재귀호출을 통해 다시 들어왔을 경우, 이는 label필드는 다음 재개 지점을 반드시 가지고 있습니다. 따라서 1 이상인 경우에 해당하는 조건문만 진행시키게 됩니다.

그 후, label필드가 끝까지 다다랐다면, printUser메서드는 최종적으로 Unit을 리턴하며 동시성 작업을 끝내게 됩니다.

7. Continuation의 CallStack

printUser(), getUserId(), 가 만약 일반함수이며, 순차적으로 호출할 때 콜스택은 어떻게 될까요? 너무 당연하게도 아래 그림과 같은 콜스택이 만들어집니다.

하지만 Continuation을 사용한 콜스택은 어떻게 될까요? 결론부터 말하자면 아래와 같은 콜스택이 만들어집니다.

왜 그럴까요? 우선적으로, Continuation객체는 BaseContinuationImpl객체로 만들어지며, 코루틴 빌더 내부의 메서드 중, resumeWith를 사용하여 코루틴 작업이 시작된다는걸 확인한 바 있습니다. 하지만 이 객체는 중단함수를 만나면서 래핑이 계속 이루어집니다.

main()함수에서 BaseContinuationImpl객체 생성을 시작으로 printUser()를 호출하면 PrintUserContinuation으로 래핑됩니다.(윗 코드로 확인 가능)

더 나아가, printUser()내에서 getUserId()를 호출하면, 어떻게 될까요? PrintUserContinuation()객체가 getUserId()메서드의 continuation파라미터로 들어가 GetUserIdContinuation으로 래핑될 것입니다.

이처럼, 중단함수를 연쇄적으로 호출하게되면 Continuation객체는 양파껍질처럼 지속적으로 래핑이 진행되고, 결국, 대형 Continuation객체가 탄생하게 됩니다.

그럼 이 객체를 시작할땐 어떻게 할까요? 아까 코루틴 빌더 내, startCoroutineCancellable()안의 resumeCancellableWith()객체를 호출하여 최종적으로 resumeWith()를 호출하는 것을 확인할 수 있었습니다. 그리고 이것이야말로 양파껍질처럼 거대해진 Continuation객체를 시작시키는 출발점인 것입니다.

또한 이렇게 거대해진 ContinuationresumeWith()가 끝이나면 자신을 호출했던 continuationresumeWith()를 다시 호출합니다. 아래의 코드처럼 말이지요.

class GetUserIdContinuation(
   val completion: Continuation<Unit>,
   val token: String
): Continuation<String> {

    // resumeWith 이외 코드 생략
    override fun resumeWith(result: Result<String>) {
        this.result = result
        val res = try {
            val r = printUser(token, this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
        } catch (e: Throwable) {
            Result.fauliure(e)
        }
        
        // getUserId의 작업이 끝나면, 
        // 자신을 호출하였던 printUserContinuation의 resumeWith를 호출합니다.
        completion.resumeWith(res)
    }
}

위와 같은 흐름으로 인해, 코루틴의 콜스택은 일반 함수의 콜스택과는 정 반대로 진행된다는 것을 확인할 수 있습니다.

마치며

CPS는 동시성 작업을 수행하기 위해, Continuation객체를 활용 및 재귀호출을 진행하여 여럿 작업을 처리하는 컴파일 방법이라는 것을 알아봤습니다.

또한 Continuation객체가 가지고 있어야만 하는 값은 labelsuspend메서드 내부 상태값이라는 것도 알아봤으며, 재귀호출을 통해 해당 값들이 지속적으로 업데이트 된다는 것 또한 알아봤습니다.

하지만 저 또한 CPS작업 프로세스를 완전히 이해했다고 말씀드리긴 어려울것 같습니다. suspend메서드를 IntelliJ에서 ByteCode로 디컴파일한다 해도, 가독성이 쉽지 않을 뿐더러, 코드를 트래킹하기에도 한계가 있기 때문입니다.

하지만 저의 학습 뿐만 아니라 이 글을 읽으며 공부하시는 분들을 위해 최대한 사실을 담아내려 노력하였습니다. 틀린 부분이나 부족한 부분 있으면 비판은 언제든 환영입니다.

감사합니다.

profile
불가능보다 가능함에 몰입할 수 있는 개발자가 되기 위해 노력합니다.

0개의 댓글