목차
1. 시작하며
2. Mutex
3. Actor
지금 당장에 진행하고 있는 프로젝트에는 적용되어 있는 부분은 아니다. 하지만 비동기 라이브러리를 다루려고 하고, 제대로 다루고자 한다면, 여러가지 스레드가 동시에 도는 환경에서 이를 안전하게 제어할 수 있는 '공유 자원에 대한 동시성 처리'에 대해 알 필요가 있다고 본다.
그럼 동시성 처리에 대해 알아가기 전, 문제가 될 수 있는 가상의 경우를 한번 봐보자.
private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) {
action()
}
}
}
jobs.forEach { it.join() }
}
Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}
fun main {
var counter = 0
runBlocking {
CoroutineScope(Dispatchers.IO).massiveRun {
counter++
}
Log.i("coroutineLog", "$counter 번 반복문을 돌았다~")
}
}
우선, 메인을 먼저 봐보자. 메인은 IO디스패쳐에서 돌고 있다. 그리고 알다시피, IO디스패쳐의 경우 최대 64개까지 생성이 가능하다. 즉, 한 번에 여러가지의 스레드들이 돌 수 있다는 것이고, 그러한 스레드들이 하나의 자원(여기서는 counter이며, 이를 공유자원이라 한다.)에 접근하려다 문제가 발생한다는 거다.
그리고 동시성 처리에 대해 잘 알지 못하는 사람은 위의 결과값이 당연히 10,000이 찍힐거라고 생각할것이다. 나도 그랬었다. 하지만, 위 코드를 빌드해보면 결과는 다음과 같다.
내 생각
현재 내가 진행했던, 진행하고 있는 프로젝트엔 위와 같은 멀티스레딩 코드를 작성하진 않았다. 하지만 위와 같은 상황이 발생한다면...? 정말 생각만해도 끔찍하다. 그럼 그날부로 퇴근은 고사하고 몇 주 아니 몇 달이나 고생길이 열릴 수도 있다 생각한다. 미래를 위해 꼭 익혀두는게 좋을듯 하다.
그럼 이 문제를 어떻게 해결해야 하는 걸까? 그것은 바로, 공유자원에 대해 적절한 동기화 처리를 해주는 것이다. 코루틴에서는 이러한 동기화 처리 방법 두 가지를 제공한다. 하나는 뮤텍스, 그리고 하나는 액터이다. 하나하나 알아보자.
출처 입력
아래는 공유자원에 접근을 제한하는 뮤텍스를 적용한 코드이다.
private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) {
action()
}
}
}
jobs.forEach { it.join() }
}
Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}
fun main {
val mutex = Mutex()
var counter = 0
runBlocking {
CoroutineScope(Dispatchers.IO).massiveRun {
mutex.withLock {
counter++
}
}
Log.i("coroutineLog", "$counter 번 반복문을 돌았다~")
}
}
위 코드에서 바뀐 점은 IO디스패쳐 내부 블록에 공유자원을 mutex.withLock으로 동기화 설정을 해주었다는 점이다. 그리고 여러 스레드가 돈다고 해도 해당 자원에는 오로지 하나의 스레드만 접근할 수 있게 되었다. 그리고 그에 대한 결과는 아래와 같다.
액터의 경우는 공유자원에 접근하기 위해 'Channel'을 사용한다. 그리고 그러한 채널은 Queue형태로 이루어져 있기에 sequental한 접근을 보장한다. 우선 코드는 아래와 같다.
private suspend fun CoroutineScope.massiveRun(action: suspend() -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) {
action()
}
}
}
jobs.forEach { it.join() }
}
Log.i("coroutineLog", "${n * k}번 반복문을 돌 예정, 연산 시간 : $time ms")
}
fun main() {
var counter = 0
runBlocking {
val actorCounter = actor<ACTION> {
for (msg in channel) {
when (msg) {
ACTION.INCREATE -> counter ++
ACTION.DECREASE -> counter --
}
}
}
CoroutineScope(Dispatchers.IO).launch {
repeat(2000) {
actorCounter.send(ACTION.INCREATE)
}
repeat(300) {
actorCounter.send(ACTION.DECREASE)
}
}.join()
Log.i("coroutineLog", "결과값 : $counter")
}
}
코드에 대해 간단하게 설명하자면, 여러 스레드에서 동시에 counter변수에 접근해서 덧셈과 뺄셈을 하려한다. 하지만, 이는 여러 스레드에서 접근할 수 있는 엄연한 공유자원이다. 그래서 이를 actor안에서만 동작할 수 있도록 설정해 두었다. 그리고 이를 조작하기 위해선 Channel에 메시지를 보내면(send)된다. 그럼 actor내부는 이러한 메시지를 받아서 공유자원을 변경시킨다.