Coroutine Flow 란? (2)

HEETAE HEO·2022년 9월 14일
0
post-thumbnail

Android에서의 Flow란?

Android에서 저장소는 일반적으로 UI 데이터 생산자입니다. 이때 사용자 인터페이스는 최종적으로 데이터를 표시하는 소비자입니다. UI 레이어가 사용자 이벤트의 생산자이고 계층 구조의 다른 레이어가 이 이벤트를 사용하기도 합니다. 생산자와 소비자 사이의 레이어는 일반적으로 다음 레이어의 요구사항에 맞게 조정하기 위해 데이터 스트림을 수정하는 중재자 역할을 합니다.

Flow 생성 - Producer

Flow를 만들려면 Flow Builder API를 사용해야합니다. flow builder 하수는 emit 함수를 사용하여 새값을 수동으로 데이터 스트림에 내보낼 수 있는 새 흐름을 만듭니다.

Android developers에 올라와 있는 코드를 가지고 예시를 들어보겠습니다.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

해당 코드는 고정된 간격으로 최신 뉴스를 자동으로 가져옵니다. suspend 함수는 연속된 값을 여러 개 반환할 수 없으므로, 데이터 소스가 이러한 요구사항을 충족하는 흐름을 만들고 반환합니다. 이 경우 데이터 소스가 생산자 역할을 합니다.

flow 빌더는 Coroutine 내에서 실행됩니다. 따라서 동일한 비동기 API의 이점을 활용할 수 있지만 몇 가지 제한사항이 적용됩니다.

  • flow는 순차적입니다. 생산자가 Coroutine에 있으므로 suspend 함수를 호출하면 생산자는 suspend 함수가 반환될 때까지 정지 상태로 유지됩니다. 이 예에서 생산자는 fetchLatestNews 네트워크 요청이 완료될 때까지 정지됩니다. 그런 다음에만 결과를 스트림으로 내보냅니다.

  • flow Builder에서는 생산자는 다른 CoroutineContext의 값을 emit할 수 없습니다. 그러므로 새 Coroutine을 만들거나 코드의 withContext()를 사용하여 다른 CoroutineContext에서 emit를 호출할 수 없습니다. 이런 경우 callbackFlow와 같은 다른 flow builder를 사용하여 처리합니다.

스트림 수정 - Intermediary

중개자는 중간 연산자(Intermediary)를 사용하여 값을 소비하지 않고도 데이터 스트림을 수정할 수 있습니다. 이 연산자는 데이터 스트림에 적용되는 경우 값이 향후에 사용될 때까지 실행되지 않을 작업 체인을 설정하는 함수입니다.

저장소 레이어는 중간 연산자 map을 사용하여 데이터가 View에 표시되도록 변환합니다.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

중간 연산자는 시차를 두고 차례로 적용할 수 있어 항목을 흐름에 내보낼 때 느리게 실행되는 작업 체인을 구성할 수 있습니다. 스트림에 중간 연산자를 적용하는 것만으로 flow 수집이 시작되지는 않습니다.

flow 수집 - Consumer

터미널 연산자를 사용하여 값 수신 대기를 시작하는 흐름을 트리거합니다. 내보낼때 스트림의 모든 값을 가져오려면 collect를 사용합니다.

collect는 suspend 함수이므로 Coroutine 내에서 실행해야합니다. 모든 새 값에서 호출되는 매개변수로 람다를 사용합니다. suspend 함수이므로 collect를 호출하는 Coroutine은 flow가 종료될 때까지 정지될 수 있습니다.

저장소 레이어의 데이터를 사용하는 viewModel을 간단히 구현하면 다음과 같습니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

flow를 수집하면 고정된 간격으로 최신 뉴스를 새로고침하고 네트워크 요청 결과를 내보내는 샌산자가 트리거됩니다. 생산자는 while(true) loop로 항상 활성 상태가 유지되므로 ViewModel이 삭제되어 viewModelScope가 취소되면 데이터 스트림이 종료됩니다.

또한 다음과 같은 이유로 flow 수집이 중지될 수 있습니다.

  • 이전 예시에 나온 것처럼 수집된 Coroutine 취소된 경우 이 경우 기본 생산자(producer)도 중지됩니다.

  • 생산자가 항목 방출을 완료한 경우 이 경우 데이터 스트림이 종료되고 collect를 호출한 Coroutine이 실행을 다시 시작합니다.

다른 중간 연산자를 통해 지정되지 않은 경우 흐름의 상태는 콜드 및 지연입니다. 즉, 흐름에서 터미널 연산자가 호출될 때마다 생산자 코드가 실행됩니다. 이전 예시에서 흐름 수집기가 여러 개 있으면 데이터 소스가 서로 다른 고정된 간격으로 최신 뉴스를 여러 번 가져옵니다. 여러 소비자가 동시에 수집할 때 흐름을 최적화하고 공유하려면 shareIn 연산자를 사용합니다.

예외처리

생산자 구현은 서드 파티 라이브러리에서 가져올 수 있습니다. 따라서 예기치 않은 예외가 발생할 수 있습니다. 이러한 예외를 처리하려면 catch 중간 연산자를 사용해야합니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

예외가 발생하는 경우 새 항목이 수신되지 않았기 때문에 collect 람다가 호출되지 않습니다.

catch는 항목을 흐름에 emit할 수도 있습니다. 대신 예제 저장소 레이어는 캐시된 값을 emit할 수 있습니다.

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

다음의 코드에서는 예외가 발생하면 collect 람다가 호출되므로 예외로 인해 새 항목이 스트림에 내보내집니다.

다른 CoroutineContext에서 실행하기

기본적으로 flow builder의 생산자는 수집하는 코루틴의 CoroutineContext에서 실행됩니다.
앞에서 언급한 것처럼 다른 CoroutineContext에서 값을 emit 할 수 없습니다. 이 동작은 경우에 따라 원하지 않는 동작일 수 도있습니다. 예를 들어 주제 전체에 사용된 예에서 저장소 레이어는 viewModelScope가 사용하는 Dispatchers.Main에서 작업을 실행하면 안됩니다.

흐름의 CoroutineContext를 변경하려면 중간 연산자 flowOn을 사용합니다. flowOn은 업스트림 흐름의 CoroutineContext를 변경합니다. 즉, 생산자 및 중간 연산자가 flowOn 전에 적용됩니다.

다운 스트림 흐름(flowOn 이후의 중간 연산자 및 소비자)은 영향을 받지 않으며 흐름에서 collect하는 데 사용되는 CoroutineContext에서 실행됩니다. flowOn 연산자가 여러 개 있는 경우 각 연산자는 현재 위치에서 업스트림을 변경합니다.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

해당 코드에서 onEach 및 map 연산자는 defaultDispatcher를 사용하는 반면 catch 연산자와 소비자는 viewModelScope에 사용되는 Dispatchers.Main에서 실행됩니다.

데이터 소스 레이어가 I/O 작업을 수행하므로, I/O 작업에 최적화된 Dispatcher를 사용해야 합니다.

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack 라이브러리의 Flow

Flow는 많이 사용되는 Android 서드 파티 라이브러리인 다수의 Jetpack 라이브러리에 통합됩니다.

Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 아주 적합합니다.

Room에 Flow를 사용하여 데이터베이스 변경 알림을 받을 수 있습니다.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

해당 코드는 Example 테이블이 변경될 때마다 데이터베이스의 새 항목이 포함된 새 목록이 내보내집니다.

콜백 기반 API를 흐름으로 변환

callbackFlow는 콜백 기반 API를 흐름으로 변환할 수 있는 Flow builder 입니다.

API를 Flow로 변환하고 Firestore 데이터베이스 업데이트를 수신 대기하려면 다음과 같이 코르르 사용할 수 있습니다.

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow builder와 달리 callbackFlow에서는 send 함수를 사용하여 다른 CoroutineContext에서 값을 내보내거나 offer 함수를 사용하여 Coroutine 외부로 값을 내보낼 수 있습니다.

내부적으로 callbackFlow는 개념상 차단 큐와 매우 유사한 채널을 사용합니다. 채널은 버퍼링 가능한 요소의 최대 요소의 최대 수인 용량으로 구성됩니다.

callbackFlow에서 생성된 채널의 기본 용량은 요소 64개 입니다. 전체 채널에 새 요소를 추가하려는 경우 send는 새 요소를 위한 공간이 생길 때까지 생산자를 정지하는 반면 offer는 채널에 요소를 추가하지 않고 즉시 false를 반환합니다.

profile
Android 개발 잘하고 싶어요!!!

0개의 댓글