
JVM 계열 라이브러리(RxJava, Reactor)가 있음에도 불구하고 왜 코루틴을 배워야할까요?
Kotlin이라는 언어기반으로 개발되어 멀티플랫폼을 지원합니다.
쓰레드의 생성 비용, 컨텍스트 스위칭 비용을 줄일 수 있습니다.
콜백 패턴을 완화할 수있습니다.
// 콜백 패턴의 예
fun showNews() {
getConfig { conf ->
getNews(conf.newsId) { news ->
getUserInfo(news.userId) {
// TODO
}
}
}
}
// 콜백 패턴 완화
suspend fun showNews() {
val config = aysnc { getConfig() }
val news = async { getNews(config.await().newsId) }
val userInfo = aysnc { getUserInfo(news.await().userId) }
}
RxJava와 리액티브 스트림RxJava, Reactor같은 라이브러리는 쓰레드 전환과 동시성 처리를 지원합니다.
fun onCreate() {
dispoasables += getNewsFromApi()
.observeOn(AndroidSchedulers.mainThread())
.map { news ->
news.sortedByDescending { it. publishedAt }
}.subscribe {
view.showNews(it)
}
}
이는 콜백보다는 좋은 방법이지만, 러닝 커브가 높고, 구현하기가 아주 복잡합니다.(오퍼레이터가 너무 복잡하고 Pair 등으로 묶고 관리하는 등)
코루틴은 특정 지점에서 멈추고 이후에 재개할 수 있습니다.
코루틴을 활용하면 하나의 쓰레드에서 각각의 코루틴을 병렬로 실행로 실행할 수 있습니다.
책에서는 코루틴을 중단했다가 다시 실행할 수 있는 컴포넌트라고 칭합니다.
RxJava와 달리 코루틴은 기존 Spring코드와 크게 변화가 없습니다. 하지만 멀티 쓰레드와는 달리 저렴합니다.
@OptIn(ExperimentalTime::class)
@Test
fun coroutineTest() = runTest {
var value = 0
val time = measureTimedValue {
List(100_000) {
async {
++value
delay(1000L)
}
}.awaitAll()
}
println(value)
println(time.duration)
}
// 출력
100000
961.539600ms
@OptIn(ExperimentalTime::class)
@Test
fun threadTest() {
var value = 0
val time = measureTimedValue {
repeat(100_000) {
thread {
++value
Thread.sleep(1000L)
}
}
}
println(value)
println(time.duration)
}
// 출력
100000
21.864589500s
코틀린은 동시성 프로그래밍을 최대한 쉽고 저렴하게 구현할 수 있도록 도와줍니다.
코틀린의 시퀀스는
List나Set과 같은 컬렉션과 비슷한 개념이지만, 코틀린과 정지함수를 활용해 값을 지연(Lazy)처리 합니다.

sequence를 통해 빌드하거나 만들어 낼 수 있으며 yield로 값을 배출합니다.
@Test
fun main() = runTest {
val seq = sequence {
println("Gnerating 1")
yield(1)
println("Gnerating 2")
yield(2)
println("Gnerating 3")
yield(3)
}
for (num in seq) {
println(num)
}
}
// 출력
Gnerating 1
1
Gnerating 2
2
Gnerating 3
3
시퀀스의 yield는 정지 함수로 구현되어 있습니다.
public abstract suspend fun yield(value: T)
따라서
yield(1)에서 1을 방출하고 함수가 정지됩니다. (코틀린의 특징 : 정지 시점을 예측할 수 있음) 이에 따라main함수와sequence제너레이터가 번갈아 가면서 정지, 수행 됩니다.
함수가 정지될 수 있는 이유는
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
suspendCoroutineUninterceptedOrReturn 함수는 현재 코루틴의 Continuation을 획득하여 유저의 block()을 마저 수행합니다. 만약 block에서 COROUTINE_SUSPENDED를 반환한다면 이 코루틴은 반환됩니다.
suspendCoroutineUninterceptedOrReturn함수는 취소가능한 모든 코루틴에서 많이 사용됩니다.쓰레드를 이를 구현하기에는 막대한 비용이 듭니다. 코투린을 활용하면 더 빠르고 간단하게 정지가 가능합니다.
중단은 모든 코루틴의 필수적인 요소입니다. 코루틴은 중단될 때 Continuation을 반환합니다. Continuation은 게임의 체크포인트와 비슷합니다. 이를 통해 멈췄던 곳에서 코루틴을 실행할 수 있습니다.
코루틴은 어떤 자원도 사용하지 않고, 다른 쓰레드에서 시작할 수 있고, Continuation 객체는 직렬화와 역직렬화되며 다시 실행됩니다.
코틀린의 코루틴은 stackful과 stackless한 코루틴의 짬뽕입니다. suspend 함수를 실행시키면 일반 함수와 같이 자바 스택프레임에 쌓이지만, 정지하지 않고 함수가 종료된다면 일반 함수처럼 JVM에서 해당 스택을 unwind합니다.
함수가 일시 중단되면 동작 과정이 약간 바뀝니다. 그 시점에서 자바 메서드가 반환되고 JVM 스택이 unwind됩니다. 스택이 사라지고, 콜 체인이 구축되는 동안 heap메모리 위에 Linked list Of Continuation가 생성됩니다. 모든 일시 중단 함수 호출은 스택메모리가 아니라 힙 위의 Java 객체로 바이트코드 수준에서 구현됩니다.
Continuation을 재개하면 가장 앞쪽의 있는 함수를 호출하여 시작됩니다. 함수가 반환되기를 원할 때는 정상적인 방법으로 반환되지 않고 대신 호출자(Caller)의 Continuation을 resume합니다. 이는 호출자에게서도 반복되므로 코루틴의 일시 중단, 실행 로직이 구현될 수 있습니다.

suspend함수는 말 그대로 코루틴을 중단할 수 있는 함수입니다. 중단 함수는 코루틴 내부 또는 다른 suspend 함수에 의해 호출되어야 합니다.
@Test
fun main() = runTest {
println("Before")
someSuspendFunction() // 코루틴 스코프 내부에서 실행 가능
println("After")
}
@Test
fun main() = runTest {
println("Before")
suspendCoroutine<Unit> { }
println("After")
}
// 출력
Before
...
위의 예제는 After은 출력되지 않으며, 코드는 실행된 상태로 유지됩니다.
suspendCoroutine은 현재Continuation인스턴스를 가져오고 현재 실행 중인 코루틴을 suspend합니다.따라서
Continuation.resume및Continuation.resume WithException를 활용해 개발자가 직접 일시 코루틴의 재개 부분을 지정할 수 있습니다.
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}
참고로 suspendCoroutine은 코루틴 스코프나 정지함수 내에서 실행이 가능하지만 내부는 코루틴으로 이루어져 있지 않습니다.

따라서 I/O가 필요 없는 곳, 굳이 suspend되지 않을 곳에서 suspendCoroutine을 활용하는 것을 권장합니다.
@Test
fun main() = runTest {
println("Before")
suspendCoroutine<Unit> { c : Continuation<Unit> ->
println("This coroutines does not switch..")
c.resumeWith(Result.success(Unit))
}
println("After")
}
// 출력
Before
This coroutines does not switch..
After
책의 예제가 다음과 같이 제공하는데 마음에 안듬
@Test
fun main() = runTest {
println("Before")
val user = suspendCoroutine<String> { c ->
requestUser { // DB, Network I/O
c.resumeWith(Result.success(it.name))
}
}
println("After")
}
requestUser가 콜백을 제공할 때 까지 I/O를 제공할 때 까지 코루틴을 멈추고 값을 반환합니다.
중단 함수는 코루틴이 아니고, 단지 코루틴을 중단할 수 있는 함수입니다.
@Test
fun main() = runTest {
println("Before")
suspendCoroutine {
continuation = it
}
continuation?.resume(Unit)
println("After")
}
// 출력
Before
해당 함수는 예상대로 종료되지 않습니다. main()이라는 함수가 아닌 suspendCoroutine을 중단시키기 때문입니다.
suspend function은 코틀린 컴파일러에 의해 상태 머신을 생성합니다.
Continuation 객체는 상태를 나타내는 숫자와 로컬 데이터를 가지고 있습니다.
함수의 컨티뉴에이션 객체가 이 함수를 부르는 다른 함수의 컨티뉴에이션 객체를 장식합니다. 모든 컨티뉴에이션 객체는 실행을 재개하거나 재개된 함수를 완료할 떄 사용되는 콜 스택으로 사용됩니다.
CSP(Continuation Passing Style) 방식suspend fun getUser(): String {
return "해찬"
}
@Nullable
public final Object getUser(@NotNull Continuation $completion) {
return "해찬";
}
코틀린 컴파일러는 컴파일 시 Continuation객체를 마지막 인자로 끼워 넣습니다.
또한 반환값이 Object (Kotlin의 Any)로 변환됨을 알 수 있습니다. 이는 해당 중단 함수가 결과 값("해찬")을 반환할 수도 있지만 중단됬을 때 마커인 COROUTINE_SUSPENDED를 반환할 수도 있기 때문입니다.
suspend fun myFunction() {
println("Before")
delay(1000) // 중단 가능 지점
println("After")
}
컴파일러는 다음과 같은 코드를 아래와 같이 컴파일합니다.
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation)
if (continuation.label == 0) {
println("Before")
// 중단 가능 지점
if(delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
println("After")
return Unit
}
error("Impossible")
}
class MyFunctionContinuation(
val completion: Continuation<Unit>
) {
override val context: CoroutineContext
get() = completion.context
` var label = 0
var result: Result<Any>? = null
override fun invokeSuspended(result: Result<Unit>) {
this.result = result
val res = try {
val r = myFunction(this)
if (r == COROUTINE_SUSPENDED) return
Result.succes(r as Unit)
} catch (e: Throwable) {
Result.failure(e)
}
completion.resumeWith(res)
}
}
만약 값을 반환하는 중단함수가 있다면 모든 값들은 상태 머신에 저장됩니다.
suspend fun main(token: String) {
val userId = getUserId(token)
val uesrName = getUserName(token)
// ...
}
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation)
if (continuation.label == 0) {
val userId = getUserId(token, continuation)
continuation.userId = userId
continuation.label = 1
}
// ...
}
class MyFunctionContinuation(
val completion: Continuation<Unit>
) {
` var label = 0
var result: Result<Any>? = null
var userId: String? = null // 특정 값을 반환..
var userName: String? = null // 특정 값을 반환...
}
함수 a가 함수 b를 호출하면 일반적인 JVM은 스택 자료구조에 주소값을 저장하고 점프를 뜁니다. 코루틴의 경우는 컨텍스트 스위칭을 할 때 해당 주소값을 저장하는 스택이 없습니다. 코루틴의 경우는 Linked List of Continuation 객체가 콜 스택의 역할을 대신합니다.
[의견] : 쓰레드에 상관없이 코루틴이 실행될 때는 힙 메모리 위에 있는
Continuation객체를 가져다가 씁니다. (CoroutineDispatcher가 해당 쓰레드로 객체를 던짐코루틴이 가볍다고 의미하는 이유는 쓰레드의 TCB보다
Continuation의 크기가 작기 때문이 아닐까 생각됩니다.
Linked list Continuation객체라고 생각 -> Continuation객체는 중단이 되었을 떄의 상태(label)과 함수의 지역 변수, 파라미터 필드, 그리고 중단 함수를 호출한 함수가 재개될 위치정보를 가지고 있습니다.
Continuation은 한 객체가 다른 하나를 참조하고, 참조된 객체가 다른Continuation을 참조하는 거대한 양파같은 구조입니다.
AContinuation(
i = 4,
label 1,
completion = BContinuation(
i = 4,
label = 1,
completion = Continuation(
userId = 4,
label = 2,
completion = ...
)
)
)
이러한 콜백은 Continuation의 resume or resumeWith을 활용하여 실행됩니다.
Continuation 구현체 실제 코드
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
// 완료되지 않은 상태에서 Continuation을 재개하면 실패
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
// 상태 머신이 종료되는 중일 때 호출
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// 최상위 컨티뉴에이션 객체에 도달했을 때
completion.resumeWith(outcome)
return
}
}
}
}
resumeWith 함수는 재귀를 통해 현재 코루틴을 재개하고, 필요한 경우 상위 코루틴으로 계속해서 재귀적으로 이동합니다.
함수를 상태로 나누는 것은 숫자를 비교하는 것만큼 쉬운 일
컨티뉴에이션 객체에 상태를 저장하는 것 또한 간단
멀티쓰레드보단 싸다.
코틀린 언어 차원에서는 코루틴을 최소한으로 지원하고, 좀 더 많은 코루틴을 활용하기 위해서는 라이브러리가 필수
| 언어차원에서의 지원 | kotlinx.coroutines 라이브러리 |
|---|---|
| 코틀린 기본 라이브러리에 포함 | 의존성 별도 추가 필요 |
| Continuation, suspendCoroutines 등 기본적인 것들 제공 | launch, async 등 다양한 기능 제공 |
| 직접 사용하기 어렵다. | 직접 사용하기 쉽다. |
https://www.youtube.com/watch?v=YrrUCSi72E8
https://stackoverflow.com/questions/67483210/how-does-method-in-coroutine-block-work-in-kotlin