// data 모듈
fun checkCurrentUser(uid: String): Flow<Boolean> = callbackFlow {
dataSource.getUser(uid).addOnSuccessListener { snapshot ->
if (snapshot.exists()) {
trySend(true)
} else {
dataSource.createUser(uid).addOnCompleteListener { result ->
trySend(result.isSuccessful)
}
}
}.addOnFailureListener {
trySend(false)
}
awaitClose()
}
callback 안에서 flow를 전달해야 할 때 사용하며 사용하며, send 대신 trySend를 사용한다.
awaitClose()
를 사용하지 않으면 오류가 발생한다.
java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
callbackFlow 블록의 끝에는 'awaitClose'을(를) 사용해야 합니다.
그렇지 않으면 외부 취소 시 콜백/수신기가 누출될 수 있습니다.
callbackFlow 설명
Using awaitClose is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed.
flow collection이 취소될 때 메모리 누수를 방지하려면 awaitClose를 사용해야 합니다. 그렇지 않으면 flow collector가 이미 완료된 경우에도 콜백이 계속 실행될 수 있습니다.
결국 확인은 못했지만 멘토님의 말씀으로는 따로 종료하지 않으면 계속 살아있게 된다고 한다.
awaitClose의 내부 코드
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
try {
suspendCancellableCoroutine<Unit> { cont ->
invokeOnClose {
cont.resume(Unit)
}
}
} finally {
block()
}
}
공식 문서에 있는 callbackFlow의 사용 예시
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
trySendBlocking(value)
.onFailure { throwable ->
// Downstream has been cancelled or failed, can log here
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
/*
* Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose { api.unregister(callback) }
}
작업이 완료되면 flow를 종료해서 메모리 누수를 방지해야 한다.
종료 방법은 close와 cancel이 있는데 무엇을 어떤 상황에 사용해야 할까?
db.collection("").get()
.addOnSuccessListener { }
.addOnFailureListener { }
.addOnCompleteListener { }
.addOnCanceledListener { }
public fun close(cause: Throwable? = null): Boolean
public fun CoroutineScope.cancel(cause: CancellationException? = null)
// kotlinx.coroutines.JobCancellationException: ProducerCoroutine was cancelled; job=ProducerCoroutine{Cancelled}@944b4c7