깊게 Flow를 알지 못하고, collect만 하면 값이 온다고만 생각했다.
역시 깊게 들어가보니 알것이 많고 어렵지만 알고나니 뿌듯한게 있다.
Flow는 ColdStream
과 HotStream
으로 나뉜다.
Cold Stream
- (누군가 소비하기 시작하면 데이터를 발행한다)응답에 대해 열정적이지 못하고 차갑다. - Lazy
- ONE CUSTOMER(하나의 소비자) -UniCast
- 주로 데이터 베이스를 읽거나, URL의 서버 통신값을 받아올 경우 ColdStream으로 구현한다.
<예시>
- 각 청취자 당 CD 1장 -> 구독자 1명당 스트림 1개
- 청취자가 재생을 중단하면 CD가 멈춘다 -> 구독자가 연결을 끊으면 스트림은 종료된다
- 음악을 듣기 시작하는 시점에 관계없이, 음악은 모든 청취자에게 동일하다 -> 방출되는 데이터는 구독하는 시점에 관계없이 모든 구독자에게 동일하다
Hot Stream
- MoreCustomer(다수의 소비자) - MultiCast
- 구독하는 사람이 없을경우에도 데이터를 발행한다. 아주열정적이다. - eager
<예시>
- 모든 청취자를 위한 1개의 방송국이 있다 -> 모든 구독자를 위한 1개의 스트림이 있다
- 아무도 듣지 않아도 라디오는 계속해서 컨텐츠를 생성한다 -> 구독자가 없어도 스트림은 데이터를 방출한다.
- 라디오를 듣는 순간에 따라 청취자가 듣는 음악은 달라진다 -> 구독자가 구독을 시작하는 순간에 따라 받는 데이터는 달라진다.
위의 사진을 확인해보자. 일단 파트 1에서는 Flow만을 다룰것이다.
우리는 flow을 만들때 flow{ }
라는것을 감싸므로써 만들거나 생성자
를 이용해 만들기도 한다.
Flow에 collect(중단 함수)
를 호출하면서, FlowCollector<T>
에 전달되엇
Flow가 시작(소비한다)시키며, FlowCollect
의 emit()
함수를 통해 데이터를 전달받는다.
//공변
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
//반변
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
이제 우리는 함수를 호출 할수있다.
이때 알아볼 것은 flow{}
빌더인데,
1. flow 빌더는 No return, No parameter 함수인 suspend lamdablock
을 전달하여 SafeFlow
라는 객체가 생겨난다.
이 SafeFlow는 중단함수(Collect)를 멤버함수로 갖게되는데, 이 중단 함수를 호출 할경우,
Collect()
lamda Block이 FlowCollecter
형태로 래핑이 되고,
collect를 호출한 시점의 CoroutineContext
와 FlowCollector
를 멤버로 하는 SafeCollector로 래핑된다.
이제 SafeFlow에서 SafeCollector를 수신자로 호출한다.
더 간략하게 표현하자면, 아래와 같다.
flow의 collect는 중단함수이기 때문에, 코루틴에서 호출되어야 한다.
Flow의 collect를 호출 할때 코루틴이 중단되고, flow.emit()이 호출될때마다
코루틴이 재개된다.(resumed)
Flow
에서 Dispathers.IO
로 작동하게할것이다.flow {
(1..5).forEach { emit(it) }
}
.flowOn(Dispatchers.IO) // Change upstream dispatcher
.collect { value ->
println(value)
}
자 한번 확인해보자. 일단 flow를 통해 safeFlow가 만들어졌고, FlowCollect로 매핑이 되었다.
하지만 flowOn 연산자로 인해 IO디스패처를 사용하도록 지정되었는데 Channel
이라는 것이 추가되어 이곳에서 호출받게 된다.
이후 계속..