Flow
- reactive programming framework
- 여러 값을 순차적으로 내보낼수 있는 유형
- Flow를 사용하여 데이터베이스에서 실시간 업데이트를 수신할 수 있다
- 코루틴 기반으로 빌드되어 비동기 식으로 계산할수 있는 데이터 스트림
- 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들수 있다
1. Data Stream
1.1. 생산자
- 스트림에 추가되는 데이터를 생성
- 코루틴 덕분에 비동기적으로 데이터가 생성될 수 있다
1.2. 중계자
- 스트림에 내보내는 각각의 값이나 스트림 자체를 수정할 수 있다
1.3. 소비자
- Repository = UI 데이터 생산자
- UI는 데이터를 표시하는 소비자
- 생산자와 소비자 사이의 레이어는 다음 레이어의 요구사항에 맞게 조정하기 위해 데이터 스트림을 수정하는 중개자의 역할
2. Flow 생성
- flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있음
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews)
delay(refreshIntervalMs)
}
}
}
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
- 데이터 소스는 고정된 간격으로 최신 뉴스를 자동으로 가져옵니다
- 정지 함수는 연속된 값을 여러개 반환할 수 없기 때문에 데이터 소스의 요구사항을 충족하는 흐름을 만들고 반환합니다.
fetchLatestNews
네트워크 요청이 완료될때까지 정지하고 난 다음 결과를 스트림으로 내보냅니다
- flow builder 에서 생산자가 다른 CoroutineContext 의 결과를 emit 할 수 없기 때문에
새 코루틴을 만들거나 withContext 블록으로 다른 CoroutineContext에서 emit 하지말고 callbackFlow
같은 다른 빌더를 사용하세요
3. Stream 수정
- 데이터 스트림에 적용되는 경우 값이 향후에 사용될때까지 실행되지 않을 작업 체인을 설정하는 함수
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
}
4. Flow Collect
- collect 는 정지 함수이므로 코루틴 내에서 실행되어야 합니다
- collect 를 호출하는 코루틴은 흐름이 종료될 때까지 정지될 수 있습니다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews.collect { favoriteNews ->
}
}
}
}
- 고정된 간격으로 최신 뉴스를 새로고침하고 네트워크에 요청 결과를 내보내는 생산자가 트리거 됩니다
- while(true) 로 항상 활성 상태이기 때문에 viewmodel이 삭제되어 viewmodelscope가 취소될때 데이터 스트림이 종료됩니다.
- 수집이 종료 될 경우
- collect coroutine이 취소 된 경우 기본 생성자도 중지됩니다
- 생산자가 emit을 완료한 경우 데이터 스트림이 종료되고 collect를 호출한 코루틴이 실행을 다시 시작합니다.
- flow collect 가 여러개 있으면 데이터 소스가 서로 다른 고정된 간격으로 최신뉴스를 여러번 가져옵니다.
- 여러 consumer가 동시에 collect할때 flow를 최적화하려면
shareIn
연산자를 사용합니다
5. Exception
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
}
}
}
}
- 예시에서 예외 발생시 새 항목이 수신되지 않아서 collect 람다를 호출하지 않습니다.
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
.catch { exception -> emit(lastCachedNews()) }
}
- catch 항목을 emit 할 수 도 있습니다. 예외가 발생하면 collect 람다가 호출되므로 예외로 인해 새 항목이 스트림에 내보내집니다.
Other CoroutineContext
- 다른 CoroutineContext 에서 값을 emit 할수 없습니다. viewModelScope가 사용하는 Dispatcher.Main에서 작업을 실행하면 안됩니다.
- CoroutineContext를 변경하려면 중간 연산자
flowOn
을 사용합니다
flowOn
은 업스트림 흐름의 CoroutineContext를 변경합니다.
- 생산자 및 중간 연산자가 flowOn 전에 적용 됩니다.
- 다운 스트림 흐름은 영향을 받지 않으며 collect 할때 사용되는 CoroutineContext 에서 실행됩니다.
- flowOn 연산자가 여러개 있는 경우 각 연산자는 현재 위치에서 업스트림 변경
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news ->
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news ->
saveInCache(news)
}
.flowOn(defaultDispatcher)
.catch { exception ->
emit(lastCachedNews())
}
}
- defaultDispatcher를 사용하는 것은 onEach 및 map 연산자 이다
- viewModelScope 내 Dispatchers.Main 에서 실행되는 것은 catch 연산자
Jetpack Flow
- 실시간 데이터 업데이트 및 무제한 데이터 스트림에 사용
- Flow With Room 을 사용하여 데이터베이스 변경 알림을 받을 수 있음
- @Dao(Data Access Object) 를 사용하는 경우 실시간 업데이트를 받으려면 Flow 로 반환해야 합니다
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Callback API
callbackFlow
는 콜백 기반 API를 흐름으로 변환할 수 있는 흐름 빌더
- 다른 CoroutineContext에서 값을 내보내거나 offer 함수를 사용하여 코루틴 외부로 값을 emit 할 수 있습니다
- 내부적으로 callbackFlow는 개념상 Blocking Queue 와 매우 유사한
channel
을 사용합니다
- callbackflow에서 생성된 채널의 기본 용량은 요소 64개 입니다
- 전체 채널에 새 요소를 추가하려는 경우
send
는 새요소를 위한 공간이 생길때까지 생산자를 정지
offer
는 채널에 요소를 추가하지 않고 즉시 false를 반환
Reference