Flow

노준혁·2023년 2월 8일
0

https://developer.android.com/kotlin/flow
https://velog.io/@eoqkrskfk94/코루틴-Channel채널-Flow플로우


  • Flow
    코루틴에서 Flow는 단일 값만 반환하는 suspend function과는 달리 여러 value들을 순차적으로 내보낼 수 있는 타입.

    • ex) Flow를 사용해 DB로부터 실시간 업데이트를 수신할 수 있음.

    • 코루틴을 기반으로 빌드됨.

    • 여러 값을 제공할 수 있음.

    • 비동기식으로 계산될 수 있는 데이터 스트림의 개념.

    • emit한 값은 동일한 타입이어야 함.

    • ex) Flow는 정수 값을 내보내는 Flow

  • Flow는 value들의 sequence를 생성하는 Iterator와 매우 비슷하지만 suspend functions을 사용하여 value들을 비동기적으로 생성하고 사용함.

    • ex) 기본 스레드를 차단하지 않고 다음 value를 생성할 네트워크 요청을 안전하게 만들 수 있음.

  • 데이터 스트림에는 3가지 항목이 존재.

    • Producer:
      스트림에 추가되는 데이터를 생산함.
      코루틴과의 연계로 Flow는 비동기적으로 데이터가 생산되게 할 수 있음.
    • (Optional) Intermediaries
      스트림에 emit하는 각각의 값이나 스트림 자체를 수정할 수 있음.
    • Consumer
      스트림으로부터 값을 사용.
  • Android에서 Repository는 일반적으로 UI 데이터 Producer.

  • 이때 사용자 인터페이스(UI)는 최종적으로 데이터를 표시하는 Consumer.

  • Producer와 Consumer 사이의 레이어는 일반적으로 다음 레이어의 요구사항에 맞게 조정하기 위해 데이터 스트림을 수정하는 Intermediaries의 역할을 함.


  • Creating a flow

Flow를 만들기 위해 flow builder API를 사용한다.
Flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있는 새로운 flow을 만든다.

ex)
DataSource는 고정된 간격으로 최신 뉴스를 자동으로 가져오는 예.
suspend function은 연속된 값을 여러 개 반환할 수 없으므로, DataSource가 이러한 요구사항을 충족하는 Flow을 만들고 반환함.
이 경우 DataSource가 Producer의 역할을 한다.

class NewsRemoteDataSource(
    private val newsApi: NewsApi, // 최신 뉴스를 가져오는 Api
    private val refreshIntervalMs: Long = 5000 // 고정된 간격
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

  • Modifying the stream

Intermediaries는 Intermediate 연산자를 사용하여 값을 소비하지 않고도 데이터 스트림을 수정할 수 있음. 이 연산자는 데이터 스트림에 적용되는 경우 값이 향후에 사용될 때까지 실행되지 않을 작업 체인을 설정하는 함수의 역할.

ex)
Repository 레이어에서 Intermediate 연산자 map을 사용하여 데이터가 View에 표시되도록 변환하는 예.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Flow에 변환된 결과를 적용하는 favorite latest new를 리턴함.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // favorite topic들을 필터링하는 Intermediate operation
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // 캐시에 latest new들을 저장하는 Intermediate operation
            .onEach { news -> saveInCache(news) }
}

Intermediate 연산자는 시차를 두고 차례로 적용할 수 있어 item을 Flow에 내보낼 때 느리게 실행되는 작업 체인을 구성할 수 있음. 스트림에 Intermediate 연산자를 적용하는 것만으로 flow collection이 시작되지는 않음.

  • flow collection이 시작되도록 하려면 아래 예를 참고.

  • Collecting from a flow

terminal 연산자를 사용하여 값들을 listening하기 시작하는 Flow을 트리거함.
내보낼 때 스트림의 모든 값을 가져오려면 collect함수를 사용한다.

collect는 suspend function이므로 코루틴 내에서 실행해야 함.
모든 새 값에서 호출되는 매개변수로 람다를 사용.
suspend functions이므로, collect를 호출하는 코루틴은 Flow가 종료될 때까지 정지될 수 있음.

ex)
Repository 레이어의 데이터를 사용하는 ViewModel의 예.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch { // 코루틴 내에서 실행되어야 함.
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
            	/*
                // Update View with the latest favorite news
                 code....
                */
            }
        }
    }
}

Flow를 collect하면 고정된 간격으로 최신 뉴스를 새로고침하고 네트워크 요청 결과를 내보내는 Producer가 트리거됨.
Producer는 while(true) 루프로 항상 active 상태가 유지되므로 ViewModel이 삭제되어 viewModelScope가 취소될 때 데이터 스트림이 종료됨.

  • 다음과 같은 이유로 Flow collection이 중지될 수 있음.
    • collect된 코루틴이 취소된 경우, 기본 Producer도 중지됨.
    • Producer가 item들의 emitting을 완료한 경우, 데이터 스트림이 종료되고 collect를 호출한 코루틴이 실행을 다시 시작함.

다른 intermediate operator를 통해 지정되지 않은 경우 Flow의 State는 Cold 및 Lazy임. 즉, Flow에서 terminal 연산자가 호출될 때마다 Producer 코드가 실행됨.
Flow collector가 여러 개 있으면 데이터 소스가 서로 다른 고정된 간격으로 최신 뉴스를 여러 번 가져오게 됨. 만약, 여러 Consumer가 동시에 collect할 때 Flow를 최적화하고 공유하려면 shareIn 연산자를 사용하면 됨.


  • Catching unexpected exceptions
    Producer의 구현은 서드 파티 라이브러리에서 가져옴.
    이에 따라 예기치 않은 예외가 발생할 수 있기에 관련 예외를 처리하려면 catch intermediate 연산자를 사용해야 함.
class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

catch 중간 연산자가 없는 경우, 예외가 발생하면 새 item이 수신되지 않았기 때문에 collect 람다가 호출되지 않는다.
그러나,
catch는 item을 Flow에 emit할 수도 있습니다.

ex)
아래 예시는 cached된 값들을 emit하는 예.

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

이 예에서는 예외가 발생하면 collect 람다가 호출되므로 예외로 인해 새 item이 스트림에 내보내짐.


  • Flows in Jetpack libraries

Flow는 많이 사용되는 Android 서드 파티 라이브러리인 다수의 Jetpack 라이브러리에 통합되어 사용될 수 있음.
Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 적합.
(때문에 LiveData 대신 Flow를 사용할 수 있다.)

Flow with Room을 사용하여 데이터베이스 변경 알림을 받을 수도 있으며,
데이터 액세스 객체(DAO)를 사용하는 경우 실시간 업데이트를 받으려면 Flow 타입으로 반환하면 됨.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example 테이블이 변경될 때마다(Flow<List>) 데이터베이스의 새 item이 포함된 새로운 List가 emitting되어짐.

profile
https://github.com/nohjunh

0개의 댓글