callbackFlow의 awaitClose

ChoiUS·2022년 12월 7일
0

Kotlin

목록 보기
2/2
post-thumbnail

Flow 이미지 출처

callbackFlow 사용

  • 예시
    // data 모듈
    fun checkCurrentUser(uid: String): Flow<Boolean> = callbackFlow {
        dataSource.getUser(uid).addOnSuccessListener { snapshot ->
            if (snapshot.exists()) {
                trySend(true)
            } else {
                dataSource.createUser(uid).addOnCompleteListener { result ->
                    trySend(result.isSuccessful)
                }
            }
        }.addOnFailureListener {
            trySend(false)
        }
        awaitClose()
    }

callback 안에서 flow를 전달해야 할 때 사용하며 사용하며, send 대신 trySend를 사용한다.

send

awaitClose() 를 사용하지 않으면 오류가 발생한다.

java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.

callbackFlow 블록의 끝에는 'awaitClose'을(를) 사용해야 합니다.
그렇지 않으면 외부 취소 시 콜백/수신기가 누출될 수 있습니다.

callbackFlow 설명

Using awaitClose is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed.

flow collection이 취소될 때 메모리 누수를 방지하려면 awaitClose를 사용해야 합니다. 그렇지 않으면 flow collector가 이미 완료된 경우에도 콜백이 계속 실행될 수 있습니다.

결국 확인은 못했지만 멘토님의 말씀으로는 따로 종료하지 않으면 계속 살아있게 된다고 한다.

awaitClose의 내부 코드

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
    check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
    try {
        suspendCancellableCoroutine<Unit> { cont ->
            invokeOnClose {
                cont.resume(Unit)
            }
        }
    } finally {
        block()
    }
}

공식 문서에 있는 callbackFlow의 사용 예시

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback { // Implementation of some callback interface
        override fun onNextValue(value: T) {
            // To avoid blocking you can configure channel capacity using
            // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
            trySendBlocking(value)
                .onFailure { throwable ->
                    // Downstream has been cancelled or failed, can log here
                }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    /*
     * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
        awaitClose { api.unregister(callback) }
    }




종료 방법 (close vs cancel)

작업이 완료되면 flow를 종료해서 메모리 누수를 방지해야 한다.

종료 방법은 close와 cancel이 있는데 무엇을 어떤 상황에 사용해야 할까?

db.collection("").get()
    .addOnSuccessListener {  }
    .addOnFailureListener {  }
    .addOnCompleteListener {  }
    .addOnCanceledListener {  }
  • close
    public fun close(cause: Throwable? = null): Boolean
    • close에 값을 전달하지 않으면 정상 종료로 간주한다.
  • cancel
    public fun CoroutineScope.cancel(cause: CancellationException? = null)
    
    // kotlinx.coroutines.JobCancellationException: ProducerCoroutine was cancelled; job=ProducerCoroutine{Cancelled}@944b4c7
    • 작업을 중지하고 exception을 전달한다.





참고 자료

awaitClose

close

cancel

profile
사람을 위한 개발자

0개의 댓글

관련 채용 정보