코루틴을 제대로 활용함의 끝판왕은 멀티 스레드 환경에서의 Race Condition or Deadlock을 제어하는 것이라 볼 수 있다. 그러기 위ㅐ선 멀티 스레드 환경에서 이를 안전하게 제어할 수 있는 '공유 자원에 대한 동시성 처리'에 대해 알 필요가 있다.
그럼 동시성 처리에 대해 알아가기 전, 문제가 될 수 있는 가상의 경우를 한번 봐보자.
private const val COROUTINE_COUNT = 100
private const val COMPUTATION_COUNT = 1000
private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
val time = measureTimeMillis {
val jobs = List(COROUTINE_COUNT) {
launch {
repeat(COMPUTATION_COUNT) {
action()
}
}
}
jobs.forEach { it.join() }
}
println("${COROUTINE_COUNT * COMPUTATION_COUNT}번 반복문을 돌 예정, 연산 시간 : $time ms")
}
fun main() {
var counter = 0
runBlocking {
CoroutineScope(Dispatchers.IO).massiveRun {
counter++
}
println("$counter 번 반복문을 돌았다~")
}
}
우선, fun main()은 IO디스패쳐에서 돌고 있다. 그리고 알다시피, IO디스패쳐의 경우 최대 64개까지 생성이 가능하다. 즉, 한 번에 여러가지의 스레드들이 돌 수 있다는 것이고, 그러한 스레드들이 공유자원(여기서는 counter)에 접근 및 갱신을 하다 'Race Condition' 문제가 발생한다.
'Race Condition'문제를 염두하지 못할 경우, 윗 코드는 정상 동작(결과값이 10,000이 찍힘)할거라 생각할것이다. 하지만, 위 코드를 빌드해보면? 공유자원 동기화가 되지 않아 결과가 올바르지 못하다.

필자 생각
모바일 앱의 비즈니스 로직은 서버로 위임하는 경우가 많다. 또한 많은 수의 사용자로부터 API호출을 처리해야하는 서버의 경우, 'Race Condition'등을 대비하기 위한 코드 작성이 더 많을 수 있지만 모바일 앱은 그렇지 않는 경우가 많다. 하지만 앱이 커지고 업무 프로세스에 얽힌 비즈니스 로직이 많아진다면 동기화 처리가 필요해질때가 온다.
위와 같은 문제를 'Race Condition'문제라 한다.그럼 이 문제를 어떻게 해결해야 하는 걸까? 그것은 바로, 공유자원에 대해 적절한 동기화 처리를 해주는 것이다. 코루틴에서는 이러한 동기화 처리 방법 두 가지를 제공한다. 하나는 뮤텍스, 그리고 하나는 액터이다. 하나하나 알아보자.
[추가]
StateFlow의 경우, 내부update{}메서드나,AtomicType객체는 이러한 'Race Condition'을 방지하기 위한 작업이 돼있다. 이를 CAS라 한다. 아래를 참고하면 좋다.
참고 : RaceCondition의 개념부터 CAS를 활용한 공유자원 동기화를 알아보자
'Race Condition'방지를 위해선, 공유자원엔 1개의 스레드만 접근 및 작업하도록함이 필요하다. 이를 Mutex로 가능하다.
private const val COROUTINE_COUNT = 100
private const val COMPUTATION_COUNT = 1000
private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
val time = measureTimeMillis {
val jobs = List(COROUTINE_COUNT) {
launch {
repeat(COMPUTATION_COUNT) {
action()
}
}
}
jobs.forEach { it.join() }
}
println("${COROUTINE_COUNT * COMPUTATION_COUNT}번 반복문을 돌 예정, 연산 시간 : $time ms")
}
fun main() {
val mutex = Mutex()
var counter = 0
runBlocking {
CoroutineScope(Dispatchers.IO).massiveRun {
mutex.withLock {
counter++
}
}
println("$counter 번 반복문을 돌았다~")
}
}
위 코드에서 바뀐 점은 IO디스패쳐 내부 블록에 공유자원을 mutex.withLock으로 동기화 설정을 해주었다는 점이다. 그리고 여러 스레드가 돈다고 해도 해당 자원에는 오로지 하나의 스레드만 접근할 수 있게 되었다. 그리고 그에 대한 결과는 아래와 같다.

액터의 경우는 공유자원에 접근하기 위해 Channel을 사용한다. 그리고 그러한 채널은 Queue형태로 이루어져 있기에 sequental한 접근을 보장한다.
enum class ACTION {
INCREATE,
DECREASE
}
fun main() {
var counter = 0
runBlocking {
val actorCounter = actor<ACTION> {
for (msg in channel) {
when (msg) {
ACTION.INCREATE -> counter ++
ACTION.DECREASE -> counter --
}
}
}
CoroutineScope(Dispatchers.IO).launch {
launch {
repeat(2000) {
actorCounter.send(ACTION.INCREATE)
}
}
launch {
repeat(300) {
actorCounter.send(ACTION.DECREASE)
}
}
}.join()
println("결과값 : $counter")
}
}

2개의 Coroutine Builder를 선언했다. 1개는 2000번 반복 및 count값을 증가시키고 있고, 또 다른 1개는 300번 반복 및 count값을 감소시킨다. 즉, 멀티 스레드 환경에서 공유자원인 count를 접근하고 있고 덧셈과 뺄셈을 진행한다.
멀티 스레드 환경에서 공유자원에 적절한 동기화를 위해 1개의 actor를 사용하여 이를 안전하게 처리하고 있다. 그리고 이는 Channel에 메시지 보내는 방식인 send()를 간단하게 호출하고 있다. 이에 대한 결과는 또 다시 actor객체를 사용하여 공유자원을 동기화해주고 있다.