주의
아래 내용들은 오류나 잘못된 내용들을 포함할 수 있으므로 주의해주시기 바랍니다.
만약, 잘못된 부분이 있으면 댓글로 말씀해주시면 감사하겠습니다.
Flow는 비동기적 데이터 스트림입니다. Flow는 연속적으로 데이터를 보냅니다.
데이터 스트림은 원어 그대로 Data Stream, 데이터의 흐름을 말합니다.
단순하게 데이터를 송수신하는 것이 아닌 데이터 생산자와 소비자가 Stream으로 연결 되어 지속적으로 데이터를 주고 받을 수 있습니다.
안드로이드의 경우 보통 Model에서 데이터를 생산한 후에 View에서 데이터를 수집하는 형태로 작성합니다.
중간 연산자 map, filter
생산자(Producer)와 소비자(Consumer) 사이에서 데이터를 변환하는 역할을 합니다.
map, filter, take, zip과 같은 메서드가 있습니다.
중간 연산자를 업스트림 흐름에 적용하면 추가 연산자를 적용할 수 있는 다운스트림 흐름을 반환합니다.
중간 연산자는 flow의 코드(flow를 업데이트 하는 코드)를 실행하지 않으며, suspend 메소드가 아닙니다.
중간 연산자는 단순히 중간 연산만 한 후에 결과값을 신속하게 반환합니다. 이는 Cold Flow의 특성입니다.
여기서 잠깐. Cold Flow랑 Hot Flow는 또 뭔 소린가요? 싶은 사람이 있을 겁니다.
이는 데이터의 흐름이 언제 활성화되는지와 몇몇의 소비자들과 연결될 수 있는지의 차이입니다.
Cold Flow(Cold Stream)의 경우, 터미널 연산자(collect, collectLatest, single, reduce, toList 등등)가 불리기 전까지는 활성화 되지 않습니다.
Cold Flow는 단 한명의 구독자만 존재하며 만약 새로운 구독자가 생길 경우 플로우를 다시 만듭니다.
Cold Flow에서의 수집은 생산자 코드(flow를 업데이트하는 코드)를 트리거합니다.
Cold Flow는 collect 될 때마다 동작합니다. 만약, collect하는 소비자가 없으면 동작하지 않습니다.
flow 블록을 사용한 단순한 Flow들은 모두 Cold Flow에 속합니다.
[View의 코드]
package com.example.retrofitandmodule
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import androidx.databinding.DataBindingUtil
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.ViewModelProvider
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import com.example.retrofitandmodule.databinding.ActivityMainBinding
import com.example.retrofitandmodule.viewmodel.MainViewModel
import kotlinx.coroutines.launch
class MainActivity : AppCompatActivity() {
companion object {
private const val TAG = "로그"
}
private lateinit var binding: ActivityMainBinding
private lateinit var mainViewModel: MainViewModel
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = DataBindingUtil.setContentView(this, R.layout.activity_main)
binding.lifecycleOwner = this
mainViewModel = ViewModelProvider.AndroidViewModelFactory(application).create(MainViewModel::class.java)
binding.mainViewmModel = mainViewModel
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
mainViewModel.catFactFlow.collect {
Log.d(TAG,"MainActivity - collect() called")
binding.textviewCatFact.text = it.fact
}
}
}
}
}
repeatOnLifecycle은 라이프사이클이 STARTED 이상일 때, 소비(collect)를 실행합니다.
만약, onStop이 불리면 소비(collect)를 중지하고 flow 또한 생산을 중단합니다.
[ViewModel의 코드]
package com.example.retrofitandmodule.viewmodel
import androidx.lifecycle.ViewModel
import com.example.catfact.Cat
import com.example.retrofitandmodule.model.CatRepository
import kotlinx.coroutines.flow.*
class MainViewModel: ViewModel() {
private val catRepository = CatRepository()
private val _catFactFlow = catRepository.latestCatFact
val catFactFlow: Flow<Cat>
get() = _catFactFlow
}
[Model의 코드]
package com.example.retrofitandmodule.model
import android.util.Log
import com.example.catfact.util.CatRetrofit
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import retrofit2.await
class CatRepository {
companion object {
private const val TAG: String = "로그"
}
private val catApi = CatRetrofit.api
val latestCatFact = flow {
while (true) {
Log.d(TAG,"CatRepository - latestCatFact() called")
val catFact = catApi.getCatInfo().await()
Log.d(TAG,"CatRepository - catFact : ${catFact.fact}")
emit(catFact)
delay(5 * 1000)
}
}
}
위의 코드는 flow 블록을 사용하여 cold flow를 만듭니다.
따라서, 소비자가 없을 때(위의 activity에서는 repeatOnLifecycle을 사용하였으니 onStop이 불렸을 때) upstream flow는 생산을 중단합니다.
따라서, 위의 코드를 실행하면 화면을 껐을 때 소비(collect)와 생산(cold flow)가 중단됩니다.
앱을 실행하고 화면을 끄면 onStop이 불리면서 소비(collect)를 중단하고, cold flow는 생산을 멈춥니다.
Hot Flow(Hot Stream)의 경우, 하나의 흐름을 여러 명의 소비자와 공유할 수 있습니다.
SharedFlow와 StateFlow의 경우가 이에 속합니다.
Hot Flow에서의 수집은 생산자 코드(sharedFlow나 stateFlow를 업데이트하는 코드)를 트리거하지 않습니다.
Hot Flow는 소비자와 상관 없이 늘 동작합니다.
Hot Flow는 생방송 비디오 피드에 사용되어, 모든 사용자가 같은 타임라인(재생시간)을 볼 수 있게 도와줍니다.
아래에는 간단한 예시를 작성해보겠습니다.
[View의 코드]
package com.example.retrofitandmodule
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import androidx.databinding.DataBindingUtil
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.ViewModelProvider
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import com.example.retrofitandmodule.databinding.ActivityMainBinding
import com.example.retrofitandmodule.viewmodel.MainViewModel
import kotlinx.coroutines.launch
class MainActivity : AppCompatActivity() {
companion object {
private const val TAG = "로그"
}
private lateinit var binding: ActivityMainBinding
private lateinit var mainViewModel: MainViewModel
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = DataBindingUtil.setContentView(this, R.layout.activity_main)
binding.lifecycleOwner = this
mainViewModel = ViewModelProvider.AndroidViewModelFactory(application).create(MainViewModel::class.java)
binding.mainViewmModel = mainViewModel
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
mainViewModel.catFactFlow.collect {
Log.d(TAG,"MainActivity - collect() called")
binding.textviewCatFact.text = it.fact
}
}
}
}
}
액티비티에서는 repeatOnLifecycle을 사용하여, view의 라이프사이클이 started일 때만 소비(collect)를 하게 설정했습니다.
소비를 할 때마다, textView의 내용을 최신화합니다.
[ViewModel의 코드]
package com.example.retrofitandmodule.viewmodel
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.example.catfact.Cat
import com.example.retrofitandmodule.model.CatRepository
import kotlinx.coroutines.flow.*
class MainViewModel: ViewModel() {
private val catRepository = CatRepository()
val catFactFlow = catRepository
.latestCatFact
.stateIn(
viewModelScope,
SharingStarted.WhileSubscribed(10 * 1000),
Cat()
)
}
latestCatFact는 flow 블록을 이용하여 생성한 cold flow입니다. 따라서, stateIn 메소드를 통하여 hot flow로 전환했습니다.
hot flow는 소비자와 상관 없이 계속 동작하므로 이를 막기 위해 WhileSubscribed를 설정해줬습니다.
WhileSubscribed를 설정해주면 flow를 collect(소비)하지 않아도 해당하는 시간이 지나면 flow의 생산을 막아줍니다.
위의 코드는 10 * 1000 = 10초로 설정했습니다.
따라서, 위의 코드는 결과적으로 화면이 꺼지거나 홈으로 이동하면 MainActivity에서 collect가 중단되고 10초가 지나면 flow의 생산을 막습니다.
[Model의 코드]
package com.example.retrofitandmodule.model
import android.util.Log
import com.example.catfact.util.CatRetrofit
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import retrofit2.await
class CatRepository {
companion object {
private const val TAG: String = "로그"
}
private val catApi = CatRetrofit.api
val latestCatFact = flow {
while (true) {
Log.d(TAG,"CatRepository - latestCatFact() called")
val catFact = catApi.getCatInfo().await()
Log.d(TAG,"CatRepository - catFact : ${catFact.fact}")
emit(catFact)
delay(5 * 1000)
}
}
}
모델에서는 flow를 사용하여 cold flow를 만들어줍니다.
위 코드는 5초 간격으로 https://catfact.ninja api에서 고양이의 정보를 요청합니다.
터미널 연산자는 collect, single, reduce, toList, launchIn과 같은 연산자들이 속합니다.
이들은 주어진 스코프 내에서 흐름을 수집합니다. 이들은 upstream flow에 적용되고 모든 작업의 실행을 트리거합니다.
이는 스레드를 블록하지 않는 suspend로 실행되며, 실패나 성공 여부에 따라 다르게 완료됩니다.
try {
flow.collect { value ->
println("Received $value")
}
} catch (e: Exception) {
println("The flow has thrown an exception: $e")
}
기본적으로 흐름은 연속적이며 코루틴 내에서 순서대로 실행됩니다.
단, buffer와 flatMapMerge는 동시성을 플로우 내에 제공하는 메소드들이므로 예외입니다.
Flow Builder(흐름 생산자)에게 전달 되는 suspend block을 Producer Block이라고 합니다.
소비자(Consumer)를 선언하는 데에는 2가지 방법이 있습니다.
launchWhenX를 사용할 경우에는 collect 함수는 잠시 중단(suspend) 됩니다.
그러나, 중단될뿐 flow는 백그라운드에서 계속 동작하므로 자원의 낭비로 이어집니다.
repeatOnLifecycle을 사용할 경우에는 코루틴을 중단합니다.
따라서, 화면을 껐을 때도 자원을 아낄 수 있습니다.
https://pluu.github.io/blog/android/androiddevsummit/2021/11/07/ads21-Kotlin-Flows-in-practice/
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
https://developer.android.com/kotlin/flow/stateflow-and-sharedflow
https://developer.android.com/topic/libraries/architecture/coroutines