Flow and Channel examples

참치돌고래·2022년 12월 15일
0

Understanding flow builder

public fun <T> flowOf(vararg elements : T) : Flow<T> = flow {
	for (element in elements) {
    	emit(element) 
       	}
    }
}

만약 flow 도중 collect 함수를 사용할 때

fun <T> flow (
	block : suspend FlowCollector<T>.() -> Unit
    ) : Flow<T> = object : Flow<T>() {
    	override suspend fun collect(collector: FlowCollector<T>){
        	collector.block()
    }
}

fun interface FlowCollector<in T> {
	suspend fun emit(value :T)
}

Flow를 Channel 처럼 사용하기

data class User(val name: String)
interface UserApi {
    suspend fun takePage(pageNumber: Int): List<User>
}
class FakeUserApi : UserApi {
    private val users = List(20) { User("User$it") }
    private val pageSize: Int = 3
    override suspend fun takePage( pageNumber: Int
    ): List<User> {
        delay(1000) //
            return users
            .drop(pageSize * (pageNumber))
            .take(pageSize)
    }
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
    var page = 0
    do {
        println("Fetching page $page")
        val users = api.takePage(page++) // suspending
        emitAll(users.asFlow())
    } while (users.isNotEmpty()) }
suspend fun main() {
    val api = FakeUserApi()
    val users = allUsersFlow(api)
    val user = users
        .first {
            println("Checking $it")
            delay(1000) // suspending
            it.name == "User7"
        }
    println(user)
}

output

Fetching page 0
Checking User(name=User0)
Checking User(name=User1)
Checking User(name=User2)
Fetching page 1
Checking User(name=User3)
Checking User(name=User4)
Checking User(name=User5)
Fetching page 2
Checking User(name=User6)
Checking User(name=User7)
User(name=User7)

receiver를 기다리지않고 독립적인 coroutine을 통해 user의 정보를 받아오는 것을 확인할 수 있다.

ChannelFlow 통해서 구현하기

un allUsersFlow(api: UserApi): Flow<User> = channelFlow {
    var page = 0
    do {
        println("Fetching page $page")
        val users = api.takePage(page++) // suspending
        users.forEach{send(it)}
    } while (users.isNotEmpty()) }

위의 코드에서 allUsersFlow 메소드의 부분만 channelFlow와 send로 바꾸어준다.

output

Fetching page 0
Checking User(name=User0)
Fetching page 1
Checking User(name=User1)
Fetching page 2
Checking User(name=User2)
Fetching page 3
Checking User(name=User3)
Fetching page 4
Checking User(name=User4)
Fetching page 5
Checking User(name=User5)
Fetching page 6
Checking User(name=User6)
Fetching page 7
Checking User(name=User7)
User(name=User7)

flow를 통해 구현한 것과 다르게 send 1번에 main문이 1번씩 도는 것을 알 수 있다.
channelFlow ---implements--> ProducerScope ---implements--> CoroutineScope
그렇기 때문에 우리는 channelFlowlaunch 블록 통해서 새로운 코루틴을 생성할 수 있다.

profile
안녕하세요

0개의 댓글