일반적으로 플로우는 콜드 데이터이기에 요청할 때마다 값이 계산된다. 이때, 여러 개의 수신자가 하나의 데이터가 변경되는지 감지하는 경우도 있다.
이럴 때 메일링 리스트와 비슷한 개념인 공유플로우(SharedFlow)를 사용한다.
MutableSharedFlow는 브로드캐스트 채널과 비슷하다.
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// 또는 MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1초 후)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (프로그램은 절대 끝나지 않는다.)
공유플로우를 통해 메시지를 보내면(방출하면) 대기하고 있는 모든 코루틴이 수신하게 된다.
만약 그냥 Flow였다면?
suspend fun main(): Unit = coroutineScope {
val flow = flow {
println("Flow started")
emit("Message1")
emit("Message2")
}
launch {
flow.collect { println("#1 received $it") }
}
launch {
flow.collect { println("#2 received $it") }
}
}
// Flow started
// #1 received Message1
// #1 received Message2
// Flow started
// #2 received Message1
// #2 received Message2
Flow는 콜드데이터 스트림으로 collect 할 때마다 새로 시작한다(각 collector가 독립적으로 데이터를 받음).
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 1)
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
launch {
mutableSharedFlow.collect {
println("#3 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message3")
}
//#1 received Message1
//#1 received Message2
//#2 received Message1
//#2 received Message2
//#3 received Message2
//#1 received Message3
//#2 received Message3
//#3 received Message3
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow는 메시지 보내는 작업을 유지할 수도 있다.
replay 인자(기본값은 0)를 설정하면 마지막으로 전송한 값들이 정해진 수만큼 저장된다.
코루틴이 감지를 시작하면 저장된 값들을 먼저 받게 된다.resetReplayCache를 사용하면 값을 저장한 캐시를 초기화할 수 있다.
플로우는 사용자 액션, 데이터베이스 변경, 또는 새로운 메시지와 같은 변화를 감지할 때 주로 사용한다.
만약, 하나의 플로우로 여러 개의 플로우를 만들고 싶다면 어떻게 해야 할까? SharedFlow가 해결책이며, Flow를 SharedFlow로 바꾸는 가장 쉬운 방법이 shareIn 함수를 사용하는 것이다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
// (1초 후)
// #1 A
// (1초 후)
// #1 B
// #2 B
// (1초 후)
// #1 C
// #2 C
// #3 C
shareIn 함수는 SharedFlow를 만들고 Flow의 원소를 보낸다.
플로우의 원소를 모으는 코루틴을 시작하므로 shareIn 함수는 첫 번째 인자로 코루틴 스코프를 받는다.
세 번째 인자는 기본값이 0인 replay이고, 두 번째 인자인 started는 리스너의 수에 따라 값을 언제부터 감지할 지 결정한다.
즉시 값을 감지하기 시작하고 플로우로 값을 전송한다.
replay 값에 제한이 있고 감지를 시작하기 전에 값이 나오면 일부를 유실할 수 있다.
만약 replay가 0이라면 먼저 들어온 값이 전부 유실된다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
print("Done")
}
// (0.1초 후)
// Done
첫 번째 구독자가 나올 때 감지하기 시작한다.
첫 번째 구독자는 내보내진 모든 값을 수신하는 것이 보장되며, 이후의 구독자는 replay 수만큼 가장 최근에 저장된 값들을 받게 된다.
모든 구독자가 사라져도 업스트림(데이터를 방출하는) 플로우는 액티브 상태지만, 구독자가 없으면 replay 수만큼 가장 최근의 값들만 캐싱한다.
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
}
// (0.1초 후)
// #1 A
// #1 B
// #1 C
// (1초 후)
// #2 D
// #1 D
첫 번째 구독자가 나올 때 감지하기 시작하며, 마지막 구독자가 사라지면 플로우도 멈춘다.
SharedFlow가 멈췄을 때 새로운 구독자가 나오면 플로우가 다시 시작된다.
WhileSubscribed는 기본 값이 0이며, 마지막 구독자가 사라지고 난 뒤 감지할 시간을 나타내는 stopTimeOutMillis와 기본값은 Long.MAX_VALUE이며 멈춘 뒤 리플레이 값을 가지고 있는 시간을 나타내는 replayExpirationMillis라는 설정 파라미터를 가지고 있다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}") // 하나 가져가고 구독 종료
}
launch {
println("#2 ${sharedFlow.take(2).toList()}") // 두개 가져가고 구독 종료
}
delay(3000)
launch {
println("#3 ${sharedFlow.first()}") // 하나 가져가고 구독 종료
}
}
// (3초 후)
// Started
// (1초 후)
// #1 A
// (1초 후)
// #2 [A, B]
// Finished
// (1초 후)
// Started
// (1초 후)
// #3 A
// Finished
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}")
}
launch {
println("#2 ${sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
//Started
//#1 A
//#2 [A, B]
//Finished
//Started
//#3 A
//#3 B
//#3 C
//#3 D
//Finished
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 3100),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}")
}
launch {
println("#2 ${sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
//
//Started
//#1 A
//#2 [A, B]
//#3 C
//#3 D
//Finished
상태플로우는 공유플로우의 개념을 확장시킨 것으로, replay 인자 값이 1인 공유플로우와 비슷하게 작동한다.
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
상태플로우는 value 프로퍼티로 접근 가능한 값을 가지고 있다.
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect { println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect { println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
초기 값은 생성자를 통해 전달되어야 한다.
value 프로퍼티로 값을 얻어올 수도 있고 설정할 수도 있다.
안드로이드에서 상태플로우는 라이브데이터를 대체하는 최신 방식으로 사용되고 있다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
따라서 상태플로우는 뷰모델에서 상태를 나타낼 때 주로 사용된다.
상태플로우의 상태를 감지할 수 있으며, 감지된 상태에 따라 뷰가 보여지고 갱신된다.
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow('X')
launch {
for (c in 'A'..'E') {
delay(300)
state.value = c
// 또는 state.emit(c)
}
}
state.collect {
delay(1000)
println(it)
}
}
// X
// C
// E
상태 플로우는 데이터가 덮어 씌어지기 때문에, 관찰이 느린 경우 상태의 중간 변화를 받을 수 없는 경우도 있다.
위 예제 동작 흐름:
1. collect에서 초깃값 'X'를 수신
2. delay(1000)동안 'A', 'B' 유실 -> 'C'가 최신 값이므로 'C'를 수신
3. delay(1000)동안 'D' 유실 -> 'E'가 최신 값이므로 'E'를 수신
모든 이벤트를 다 받으려면 공유플로우를 사용해야 한다.
suspend fun main(): Unit = coroutineScope {
val state = MutableSharedFlow<String>()
state.emit("X")
launch {
for (c in 'A'..'E') {
delay(300)
state.emit(c.toString())
}
}
state.collect {
delay(1000)
println("state: $it")
}
}
// state: A
// state: B
// state: C
// state: D
// state: E
상태플로우는 현재 상태만 나타내기 때문에, 이전 상태에는 아무 관심이 없을 것이다.
stateIn은 Flow<T>를 StateFlow<T>로 변환하는 함수이고, 중단 함수이다.
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect { println("Received $it") }
}
// (1초 후)
// Produced A
// Listening
// A
// Received A
// (1초 후)
// Produced B
// Received B
// (1초 후)
// Produced C
// Received C
값을 명시하지 않았을 때는 첫 번째 값이 계산될 때까지 기다려야 한다.
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect { println("Received $it") }
}
// Empty
// (2초 후)
// Received Empty
// (1초 후)
// Produced A
// Received A
// (1초 후)
// Produced B
// Received B
stateIn의 두 번째 형태는 중단 함수가 아니지만 초기 값과 started 모드를 지정해야 한다.
started 모드는 shareIn과 같은 옵션을 가진다.
class LocationsViewModel(
private val locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map { it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = emptyList(),
)
// ...
}
주로 하나의 데이터 소스에서 값이 변경된 걸 감지하는 경우에 stateIn 함수를 사용한다.
상태플로우로 상태를 변경할 수 있으며, 뷰가 변화를 감지할 수 있게 된다.