public fun <T> flowOf(vararg elements : T) : Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
}
만약 flow 도중 collect
함수를 사용할 때
fun <T> flow (
block : suspend FlowCollector<T>.() -> Unit
) : Flow<T> = object : Flow<T>() {
override suspend fun collect(collector: FlowCollector<T>){
collector.block()
}
}
fun interface FlowCollector<in T> {
suspend fun emit(value :T)
}
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>
}
class FakeUserApi : UserApi {
private val users = List(20) { User("User$it") }
private val pageSize: Int = 3
override suspend fun takePage( pageNumber: Int
): List<User> {
delay(1000) //
return users
.drop(pageSize * (pageNumber))
.take(pageSize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // suspending
emitAll(users.asFlow())
} while (users.isNotEmpty()) }
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay(1000) // suspending
it.name == "User7"
}
println(user)
}
output
Fetching page 0
Checking User(name=User0)
Checking User(name=User1)
Checking User(name=User2)
Fetching page 1
Checking User(name=User3)
Checking User(name=User4)
Checking User(name=User5)
Fetching page 2
Checking User(name=User6)
Checking User(name=User7)
User(name=User7)
receiver를 기다리지않고 독립적인 coroutine을 통해 user의 정보를 받아오는 것을 확인할 수 있다.
un allUsersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // suspending
users.forEach{send(it)}
} while (users.isNotEmpty()) }
위의 코드에서 allUsersFlow 메소드의 부분만 channelFlow와 send로 바꾸어준다.
output
Fetching page 0
Checking User(name=User0)
Fetching page 1
Checking User(name=User1)
Fetching page 2
Checking User(name=User2)
Fetching page 3
Checking User(name=User3)
Fetching page 4
Checking User(name=User4)
Fetching page 5
Checking User(name=User5)
Fetching page 6
Checking User(name=User6)
Fetching page 7
Checking User(name=User7)
User(name=User7)
flow를 통해 구현한 것과 다르게 send 1번에 main문이 1번씩 도는 것을 알 수 있다.
channelFlow
---implements--> ProducerScope
---implements--> CoroutineScope
그렇기 때문에 우리는 channelFlow
를 launch
블록 통해서 새로운 코루틴을 생성할 수 있다.