Coroutine에 대해 공부했을 때, Flow와 최대한 분리해서 공부하려니 실사용적 측면에서 크게 와닿지가 않았다.
Coroutine과 항상 함께 나오는 Flow에 대해 공부해보자!
Flow에 대해 검색해보면 어느 블로그에서나 볼 수 있는 사진이다.
위 사진은 데이터 스트림의 3가지 항목이다. 그렇다면 데이터 스트림은 무엇일까?
데이터 스트림(data stream)은 연결지향통신에서, 전송된 정보를 수집하거나 정보를 전송할 때 사용되는 디지털 방식으로 암호화 된 일관된 신호의 흐름을 말한다.
잘 와닿지가 않는다.
flow에서 의미하는 데이터 스트림에 대해 좀 더 알아보고 정리해본 것은
데이터 소비자가 필요시 마다 반복적으로 데이터 요청을 하지 않고, 소비자가 데이터 발행자에게 데이터 변경을 알려달라고 하면 데이터 변경이 있을때 마다 발행자가 소비자에게 데이터를 지속적으로 발행하는 것.
즉 Flow는 Coroutine을 기반으로 빌드 된, 안드로이드 리액티브 프로그래밍에서 사용하는 데이터 스트림이다.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
안드로이드에서 보통 데이터 생산자가 가져오는 데이터는 Room등 에서 가져오는 Local Data와 API를 통해 서버에서 가져오는 Remote Data가 있다.
위의 코드는 Api를 통해 데이터를 가져오고 있다.
우선 Flow는 flow 블록 내부에서 emit()을 통해 데이터를 생성한다.
위의 코드에서 flow{} 내부에 있는 api를 통해 가져온 latestNews라는 데이터를 emit을 통해 생성하고 있다.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
생산자가 발행한 데이터를 그대로 사용하지 않는 경우가 많다.
위의 코드에서는 filer 함수를 사용해 FavoriteTopic만 필터링하고 있다.
위와 같이 filter, map, onEach 등의 함수를 사용해 소비자가 데이터 가공을 거치지 않고 사용할 수 있도록 한다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Flow에서 데이터 소비는 collect를 통해 이루어진다.
collect는 suspend function이기 때문에 CoroutineScope 내에서 실행된다.
그렇기 때문에 CoroutineScope가 취소되면 생산자의 데이터 생산도 중지된다.
그렇다면 Flow에서 발생하는 예외는 어느 부분에서 catch해야 할까?
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
개인적인 생각으로는 생산자, 중간 연산자 어디서든 잡을 수 있을 것 같은데(아닐 수 있음) 위와 같이 공식 문서에서는 소비자에서 예외를 잡고있다.
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
또한 중간 연산자에서 예외 발생시 캐싱된 데이터를 다시 emit 하도록 하는 구조를 권장하는 듯 하다.
안드로이드 공식 문서를 통해 Flow의 개념과 간단한 수준의 사용 방법을 공부했다.
Flow도 발전을 거치며 여러 종류의 flow가 생겼고, 유용하게 사용할 함수들이 많은 것 같다. 다음에는 보다 심화적인 flow를 공부해야겠다.