A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.
→ 공식 문서 설명에서도 볼 수 있듯이, 비동기적으로 생성되는 value는 한개지만, 이 value의 묶음을 반환하기 위해서는 다른 객체가 필요한데, 그 객체를 flow라고 이야기하고 있다.
→ Sequence는 Flow와 비슷하지만, Thread Blocking
fun simple(): Flow<Int> = flow {
val v = Random.nextInt()
println("Flow started $v")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
suspend fun usefulFunction1(): Int {
delay(2000)
return 1
}
suspend fun usefulFunction2(): Int {
delay(1000)
return 2
}
suspend fun usefulFunction3(): Int {
delay(3000)
return 3
}
suspend fun getFlow(): Flow<Int> = flow {
coroutineScope {
emit(usefulFunction1())
emit(usefulFunction2())
emit(usefulFunction3())
}
}
fun flowTest() {
runBlocking {
val f = getFlow()
f.collect {
println(it)
}
}
}
suspend fun getFlow(): Flow<Int> = flow {
coroutineScope {
launch {
emit(usefulFunction1())
}
launch {
emit(usefulFunction2())
}
launch {
emit(usefulFunction3())
}
}
}
getFlow함수의 내부에서 emit을 launch로 감싸면, 비동기로 실행해주지 않을까?
그럼 async를 걸고 deferred함수에 await을 해서 emit하는건 어떨까?
suspend fun getFlow(): Flow<Int> = flow {
coroutineScope {
val v1 = async { usefulFunction1() }
val v2 = async { usefulFunction2() }
val v3 = async { usefulFunction3() }
emit(v1.await())
emit(v2.await())
emit(v3.await())
}
}
fun flowTest() {
runBlocking {
val f = getFlow()
f.take(1).collect {
delay(2100)
println(it)
}
}
}
suspend fun usefulFunction(i: Int): Int {
if (i==1) {
// when i==1, really long running job
delay(10_000)
}
delay(2000)
println("ready $i")
return i
}
suspend fun getFlow(): Flow<Int> = channelFlow {
for(i in 1..1000) {
launch {
send(usefulFunction(i))
}
}
}
usefulFunction 함수들이 모두 실행된다. 즉 back pressure가 적절하게 작동하지 않게 된다.
val s = Semaphore(5)
suspend fun getFlow(): Flow<Int> = channelFlow {
for(i in 1..100) {
launch {
s.withPermit {
send(usefulFunction(i))
}
}
}
}
fun flowTest() {
runBlocking {
val f = getFlow()
f.collect {
delay(1000)
println(it)
}
}
}
A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
- 왜냐하면 channelFlow는 기본적으로 default buffer size(64)개 만큼의 원소를 미리 send 하는 것을 허용한다. (즉 channel의 사이즈가 64) 따라서 이 채널의 크기를 줄여야 정말 우리가 원하는 만큼의 함수만 미리 실행될 것이다.
- 이 buffer size를 조절하기 위해서는 channelFlow의 끝에 .buffer(
buffer_size)를 추가하면 된다.
suspend fun usefulFunction(i: Int): Int {
if (i==1) {
delay(10_000)
}
delay(2000)
println("ready $i")
return i
}
val s = Semaphore(5)
suspend fun getFlow(): Flow<Int> = channelFlow {
for(i in 1..100) {
launch {
s.withPermit {
send(usefulFunction(i))
}
}
}
}.buffer(0)
fun flowTest() {
runBlocking {
val f = getFlow()
f.collect {
delay(2100)
println(it)
}
}
}
ready 라는 로그는 5개 초과로 연속해서 쌓이지 않는다.suspend fun usefulFunction(i: Int): Int {
if (i==1) {
delay(10_000)
}
delay(2000)
println("ready $i")
return i
}
suspend fun getFlow(): Flow<Int> = flow {
coroutineScope {
for(i in 1..100) {
emit (
async { usefulFunction(i) }
)
}
}
}.buffer(5).transform { emit(it.await()) }
fun flowTest() {
runBlocking {
val f = getFlow()
f.collect {
delay(2100)
println("collect $it")
}
}
}
// Super fast provider
fun getFlow() = flow {
for (i in 1..100) {
delay(1)
println("emit $i")
emit(i)
}
}
suspend fun usefulFunction(i: Int) {
delay(100)
println("useful $i")
}
fun flowTest() {
val f = getFlow()
runBlocking {
f.collect {
usefulFunction(it)
}
}
}
var runCount = 0
suspend fun usefulFunction(i: Int) {
runCount++
if (i==1) {
delay(10_000)
}
delay(1000)
println("useful $i")
if (runCount > 10) {
throw Exception("Function is not available!!")
}
runCount--
}
fun flowTest() {
val f = getFlow()
runBlocking {
f.collect {
launch {
usefulFunction(it)
}
}
}
}
fun flowTest() {
val f = getFlow()
val s = Semaphore(5)
runBlocking {
f.collect {
// does not suspended
launch {
s.withPermit {
usefulFunction(it)
}
}
}
}
}
fun flowTest() {
val f = getFlow()
val s = Semaphore(5)
runBlocking {
f.collect{
s.acquire() // suspended when resource not available
launch {
usefulFunction(it)
s.release() // release it
}
}
}
}
var runCount = 0
suspend fun usefulFunction(i: Int): Int {
runCount++
println("useful $i start")
delay(1000)
if (runCount > 10) {
throw Exception("Function is not available!!")
}
println("useful $i end")
runCount--
return i
}
suspend fun worker(c: Channel<Int>) {
while(true) {
try {
val v = c.receive()
usefulFunction(v)
println("done $v")
} catch (_: ClosedReceiveChannelException) {
break
}
}
}
fun flowTest() {
val f = getFlow()
val c = Channel<Int>(1)
runBlocking {
repeat(5){
launch {
worker(c)
}
}
f.collect {
c.send(it)
}
c.close()
}
}