코루틴 동시성 문제

James_·2022년 5월 21일
0

코루틴

목록 보기
4/4
post-custom-banner
import kotlin.system.*
import kotlinx.coroutines.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
// 출력
48 ms동안 100000개의 액션을 수행했습니다.
Counter = 92363 // 값이 실행될때 마다 다름

위 예제에서 100개의 coroutine을 띄우고 각 코루틴은 전달받은 action을 1000번 수행한다.

값이 보장되게 하려면 어떻게 해야할까?

Volatile

자바의 volatile 키워드 또는 코틀린의 @Volatile 애노테이션을 변수 선언시 지정할 수 있다. 사전적 의미로는 ‘휘발성의’라는 뜻을 가지며, 변수 선언시 volatile을 지정하면 값을 메인 메모리에만 적재하게 된다.
출처: 찰스님 블로그
어떤 스레드에서 변경을 해도 다른 스레드에게 값을 영향을 준다.

하지만 이 Volatile을 붙인다고 해서 결과가 100,000을 보장하지 않는다.
가시성은 한 쪽에서 수정했을때 다른 쪽에서 값을 볼 수 있다.
동시에 읽고 수정해서 생기는 문제를 해결하지 못한다.

AtomicInteger

스레드에서 안전한 자료구조

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}
56 ms동안 100000개의 액션을 수행했습니다.
Counter = 100000

incrementAndGet 메서드로 값을 올리고 가져오는 것을 볼 수 있다.
A 스레드에서 값을 변경하고 있으면 그 때 다른 스레드에서 값을 변경하지 못하게 한다.
문제를 해결할 수 있지만 다른 상황에 적합하지 않을 수 있다.

스레드 한정

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

var counter = 0
val counterContext = newSingleThreadContext("CounterContext")

fun main() = runBlocking {
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

newSingleThreadContext라는 새로운 코루틴 컨텍스트를 만들었다.
특정 스레드를 하나 만들어서 그것만 쓰게 한다.
항상 10만을 출력하는 것을 보장할 수 있다.

뮤텍스

뮤텍스는 상호배제(Mutual exclusion)의 줄임말
운영체제 공부할 때 볼 수 있는 뮤텍스이다.
공유 상태를 수정할 때 임계 영역(Critical Section)을 이용하며, 임계 영역을 동시에 접근하는 것을 허용하지 않는다.

...

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

mutex를 만들고 mutex.withLock으로 간단하게 수정할 수 있다.
여러 스레드 중 한 스레드만 진입하고 다른 스레드는 기다렸다가 한 스레드에서 작업이 완료될 떄까지 기다린 다음 counter를 더하게 된다.

Actor

독점적으로 자료를 가지며 그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근하게 만든다.

먼저 sealed class를 생성한다.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

sealed class는 외부에서 확장이 불가능한 클래스이다.
IncCounter object,싱글톤으로 인스턴스를 만들 수 없다. 액터에게 값을 증가시키기 위한 신호로 쓰인다.
GetCounter는 값을 가져올 때 사용하며 CompletableDeferred를 이용해 받아온다.
여기서 CompletableDeferred는 나중에 값을 받겠다라는 의미로 이해하면 좋을 것 같다.
그리고 확장함수를 하나 만든다.

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // 액터 안에 상태를 캡슐화해두고 다른 코루틴이 접근하지 못하게 합니다.
	// Suspension Point 값이 올 때까지 잠들어있다가 값이 오면 꺠어난다.
    for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있습니다.(recieve)
        when (msg) {
            is IncCounter -> counter++ // 증가시키는 신호.
            is GetCounter -> msg.response.complete(counter) // 현재 상태를 반환합니다.
        }
    }
}

channel은 송신 측에서 값을 send하여 수신 측에서 receive 할 수 있다.

fun main() = runBlocking<Unit> {
    val counter = counterActor()
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }

    val response = CompletableDeferred<Int>()
    // 채널에게 값을 알려다고 요청
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close()
}

CompletableDeferred라서 값이 언제 오는지 알 수 없기 때문에 await으로 기다려준 후 출력한다.
프로그램이 끝나기 전에 channel close 메서드로 닫아준다.

profile
Android 개발자
post-custom-banner

0개의 댓글