코루틴 뮤텍스, 액터 정리

SSY·2022년 12월 15일
0

Coroutine

목록 보기
3/7
post-thumbnail

목차
1. 시작하며
2. Mutex
3. Actor

1. 시작하며

지금 당장에 진행하고 있는 프로젝트에는 적용되어 있는 부분은 아니다. 하지만 비동기 라이브러리를 다루려고 하고, 제대로 다루고자 한다면, 여러가지 스레드가 동시에 도는 환경에서 이를 안전하게 제어할 수 있는 '공유 자원에 대한 동시성 처리'에 대해 알 필요가 있다고 본다.

그럼 동시성 처리에 대해 알아가기 전, 문제가 될 수 있는 가상의 경우를 한번 봐보자.

private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
    val n = 100
    val k = 1000
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) {
                    action()
                }
            }
        }
        jobs.forEach { it.join() }
    }
    Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}

fun main {
    var counter = 0
    runBlocking {
        CoroutineScope(Dispatchers.IO).massiveRun {
            counter++
        }
        Log.i("coroutineLog", "$counter 번 반복문을 돌았다~")
    }
}

우선, 메인을 먼저 봐보자. 메인은 IO디스패쳐에서 돌고 있다. 그리고 알다시피, IO디스패쳐의 경우 최대 64개까지 생성이 가능하다. 즉, 한 번에 여러가지의 스레드들이 돌 수 있다는 것이고, 그러한 스레드들이 하나의 자원(여기서는 counter이며, 이를 공유자원이라 한다.)에 접근하려다 문제가 발생한다는 거다.

그리고 동시성 처리에 대해 잘 알지 못하는 사람은 위의 결과값이 당연히 10,000이 찍힐거라고 생각할것이다. 나도 그랬었다. 하지만, 위 코드를 빌드해보면 결과는 다음과 같다.

내 생각
현재 내가 진행했던, 진행하고 있는 프로젝트엔 위와 같은 멀티스레딩 코드를 작성하진 않았다. 하지만 위와 같은 상황이 발생한다면...? 정말 생각만해도 끔찍하다. 그럼 그날부로 퇴근은 고사하고 몇 주 아니 몇 달이나 고생길이 열릴 수도 있다 생각한다. 미래를 위해 꼭 익혀두는게 좋을듯 하다.

그럼 이 문제를 어떻게 해결해야 하는 걸까? 그것은 바로, 공유자원에 대해 적절한 동기화 처리를 해주는 것이다. 코루틴에서는 이러한 동기화 처리 방법 두 가지를 제공한다. 하나는 뮤텍스, 그리고 하나는 액터이다. 하나하나 알아보자.

2. Mutex

출처 입력

아래는 공유자원에 접근을 제한하는 뮤텍스를 적용한 코드이다.

private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
    val n = 100
    val k = 1000
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) {
                    action()
                }
            }
        }
        jobs.forEach { it.join() }
    }
    Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}

fun main {
    val mutex = Mutex()
    var counter = 0
    runBlocking {
        CoroutineScope(Dispatchers.IO).massiveRun {
            mutex.withLock {
                counter++
            }
        }
        Log.i("coroutineLog", "$counter 번 반복문을 돌았다~")
    }
}

위 코드에서 바뀐 점은 IO디스패쳐 내부 블록에 공유자원을 mutex.withLock으로 동기화 설정을 해주었다는 점이다. 그리고 여러 스레드가 돈다고 해도 해당 자원에는 오로지 하나의 스레드만 접근할 수 있게 되었다. 그리고 그에 대한 결과는 아래와 같다.

3. Actor

액터의 경우는 공유자원에 접근하기 위해 'Channel'을 사용한다. 그리고 그러한 채널은 Queue형태로 이루어져 있기에 sequental한 접근을 보장한다. 우선 코드는 아래와 같다.

private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
    val n = 100
    val k = 1000
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) {
                    action()
                }
            }
        }
        jobs.forEach { it.join() }
    }
    Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}

fun main() {
    var counter = 0
    runBlocking {
        val actorCounter = actor<ACTION> {
            for (msg in channel) {
                when (msg) {
                    ACTION.INCREATE -> counter ++
                    ACTION.DECREASE -> counter --
                }
            }
        }

        CoroutineScope(Dispatchers.IO).launch {
            repeat(2000) {
                actorCounter.send(ACTION.INCREATE)
            }
            repeat(300) {
                actorCounter.send(ACTION.DECREASE)
            }
        }.join()

        Log.i("coroutineLog", "결과값 : $counter")
    }
}

코드에 대해 간단하게 설명하자면, 여러 스레드에서 동시에 counter변수에 접근해서 덧셈과 뺄셈을 하려한다. 하지만, 이는 여러 스레드에서 접근할 수 있는 엄연한 공유자원이다. 그래서 이를 actor안에서만 동작할 수 있도록 설정해 두었다. 그리고 이를 조작하기 위해선 Channel에 메시지를 보내면(send)된다. 그럼 actor내부는 이러한 메시지를 받아서 공유자원을 변경시킨다.

profile
불가능보다 가능함에 몰입할 수 있는 개발자가 되기 위해 노력합니다.

0개의 댓글