Coroutine의 race condition 개선

강현석·2023년 7월 2일
0

article

목록 보기
9/10

본 내용은 Eliminating coroutine races 을 읽고 입맛대로 정리한 글입니다.


문제

fun main() {
    repeat(100) { GlobalScope.launch { Thread.sleep(100) } } // To make processor busy
    test() // sometimes "ab" and sometimes "ba"
    runBlocking { scope.coroutineContext.job.children.forEach { it.join() } }
}

fun test() {
    a()
    b()
}

val scope = CoroutineScope(Dispatchers.Default)

fun a() = scope.launch {
    print("a")
}

fun b() = scope.launch {
    print("b")
}
  • a와 b를 호출할 때, 어떤 것이 가장 먼저 호출될 것인지 확신할 수 없음
    • 각각의 scope에서 launch를 하기 때문
    • "ab" 혹은 "ba"가 찍힘

일반적인 방법

val scope = CoroutineScope(Dispatchers.Default)
val job: Job? = null

fun a() {
    job = scope.launch {
        print("a")
    }
}

fun b() = scope.launch {
    job?.join()
    print("b")
}
  • 다른 프로세스가 완료될 때까지 한 프로세스를 기다리게 하는 가장 간단한 방법
    • 첫 번째 프로세스를 job에 저장
    • join()을 사용하여 첫 번째 프로세스가 완료될 때까지 기다림
    • 항상 "ab"가 찍힘

Listener와 Handler

상황

  • 하나의 코루틴을 사용해서 이벤트 listener 설정하고, 다른 코루틴을 사용해서 이벤트를 전송해야하는 상황
class BaseViewModel : ViewModel() {
    private val eventFlow = MutableSharedFlow<Event>()

    init {
        eventFlow
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }
}

class SomeViewModel : BaseViewModel() {
    init {
        sendEvent(Event()) // 처리될 것인가 말 것인가?
    }
}
  • SomeViewModel을 init할 때, 부모 생성자가 먼저 호출되고 eventFlow를 listen한 코루틴 시작
    • 이후에 바로 다른 코루틴을 시작해서 이 flow에 Event()를 보냄(sendEvent)

문제점

  • listener가 설정되기 전에 Event가 전송되면, Event를 받지 못할 수 있음
    • SharedFlow는 기본적으로 replay가 0으로 설정되어 있기 때문

해결책

Job ❌

class BaseViewModel : ViewModel() {
    private val eventFlow = MutableSharedFlow<Event>()
    private val eventFlowJob: Job // 👈

    init {
        eventFlowJob = eventFlow // 👈
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            eventFlowJob.join() // 무한 대기!
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }
}

class SomeViewModel : BaseViewModel() {
    init {
        sendEvent(Event())
    }
}
  • 여기에서는 job을 활용하여 개선할 수 없음
    • Event를 수신하는 코루틴이 절대로 완료할 수 없기 때문

replay 🤔

class BaseViewModel : ViewModel() {
    private val eventFlow = 
        MutableSharedFlow<Event>(replay = Int.MAX_VALUE) // 👈

    init {
        eventFlow
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }
}

class SomeViewModel : BaseViewModel() {
    init {
        sendEvent(Event())
    }
}
  • replay를 설정함으로써, 새로운 observer는 이전 이벤트들을 수신할 수 있음
  • 우리의 flow 동작을 변경하기 때문에, 다른 observer를 사용할 계획이 있다면 문제가 발생할 수 있음
    • 각각의 새로운 observer가 과거의 모든 이벤트를 수신하기 때문

CompletableDeferred와 Job 😎

class BaseViewModel : ViewModel() {
    private val eventFlow = MutableSharedFlow<Event>()
    private val eventFlowListenerStarted = CompletableDeferred<Unit>() // 👈

    init {
        eventFlow
            .onSubscription { eventFlowListenerStarted.complete(Unit) } // 👈
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            eventFlowListenerStarted.join() // 👈
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }
}
class BaseViewModel : ViewModel() {
    private val eventFlow = MutableSharedFlow<Event>()
    private val eventFlowListenerStarted = Job() // 👈

    init {
        eventFlow
            .onSubscription { eventFlowListenerStarted.complete() } // 👈
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            eventFlowListenerStarted.join() // 👈
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }
}
  • CompletableDeferred 또는 Job을 활용하여, listener가 설정될 때까지 기다릴 수 있음
  • join을 사용하여 기다리고, complete를 사용하여 완료할 수 있음
  • 가장 좋은 위치는 onSubscription operator
    • 첫 번째 observer가 flow를 subscribe할 때 호출됨
  • onStart operator를 활용할 수도 있음
    • 새로운 observer가 시작될 때 호출됨

subscriptionCount 😅

class BaseViewModel : ViewModel() {
    private val eventFlow = MutableSharedFlow<Event>()

    init {
        eventFlow
            // ...
            .onEach(::handleIntent)
            // ...
            .launchIn(viewModelScope)
    }

    fun sendEvent(event: Event) {
        viewModelScope.launch {
            waitForListener()
            intentFlow.emit(event)
        }
    }

    fun handleIntent(event: Event) {
        // ...
    }

    suspend fun waitForListener() { // 👈
        val subscriptionCount = eventFlow.subscriptionCount
        if (subscriptionCount.value == 0) {
            subscriptionCount
                .filter { it > 0 }
                .first()
        }
    }
}
  • subscriptionCount를 사용하여 해결할 수도 있음
    • 복잡한 방법
profile
볼링을 좋아하는 안드로이드 개발자

0개의 댓글