Summary : Coroutine + Flow 개념 정리 위한 문서
생성 - Flow Builders
ex1) fun flowBuilder() : Flow<String> = flow {
for(i in 1..3) {
delay(1000L)
emit("new Data $i")
}
}.flowOn(Dispatchers.Main)
flowBuilder.collect { value -> println(value) }
ex2) flowOf("1", "2", "3", "4").collect { value -> println(value) }
ex3) (1..3).asFlow().collect { value -> println(value) }
제약 - Flow Constraints
Context 특성
예외 투명성(Exception transparency)
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
기능 - Flow Functions
Context
Creator
Collection Teriminal Operator
fun flowBuilder() : Flow<String> = flow {
for(i in 1..3) {
delay(1000L)
emit("new Data $i")
}
}.flowOn(Dispatchers.Main)
flowBuilder().collect {
println("collect : $it")
}
----
fun flowBuilder() : Flow<String> = flow {
emit("new Data $i")
}
val singleData = flowBuilderOneEmit().single() // emit 1회가 일어나는 flow에서만 사용.
val firstData = flowBuilderOneEmit().first() // 첫번째 방출 되는 값을 수집하고 중단.
Composing multiple flows
zip : 기준 Flow(this)를 기준으로 other flow와 인자로 제공된 transform 함수를 사용하여 other flow와 함께 현재 Flow(this)의 값을 압축합니다. 두 Flow중 하나가 완료되는 즉시 Flow가 완료되고 나머지 Flow는 취소된다.
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.zip(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2b 3c"
}
combine : 각 Flow 별로 가장 최근에 방출된 값을 조합하여 transform 함수와 함께 값을 방출하는 Flow를 반환한다.
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it) // Will print "1a 2a 2b 2c"
}
combineTransform : 각 Flow 별로 가장 최근에 방출된 값을 처리하는 transform 에 의해 조합된 값을 방출하는 Flow를 반환한다.
val flow = requestFlow()
val flow2 = searchEngineFlow()
flow.combineTransform(flow2) { request, searchEngine ->
emit("Downloading in progress")
val result = download(request, searchEngine)
emit(result)
}
flatMapConcat : Flow 사이를 시퀀셜하게 연결하는 operator로, 내부의 predicate Flow가 완료되어야만 다음 수집이 시작된다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
flatMapMerge : flatMapConctat과 동일하게 predicate Flow를 수행하지만, 순서를 보장하지 않고 외부, 내부의 Flow가 각각 (동시에) 수행되는 형태로 동작한다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
flatMapLatest : collectLatest와 비슷하게 동작한다. 새로운 emit발생시에 이전 대기중이거나 동작중인 Flow는 취소 된다.
flattenConcat : flatMapConcat 동일하게 flattening한다. 단, transform 처리가 없다.
flattenMerge : flatMapMerge 동일하게 flattening한다. 단, transform 처리가 없다.
Transform Operator
transform : 주어진 Flow의 각 값에 변환 함수를 적용합니다.변환의 수신기는 Flow Collector이므로 변환은 방출된 요소를 변환하거나 건너뛰거나 여러 번 방출할 수 있는 유연한 함수이다.
transformLatest : transform 과 동일하지만 , 기존 Flow가 새로 값을 방출하면 이전 transform 블록이 취소된다.
transformWhile : 지정된 predicate 함수가 true를 반환하는 동안 지정된 flow의 각 값에 transform 함수를 적용한다.
map : 원래 흐름의 각 값에 transform 함수를 적용한 결과를 포함하는 Flow를 반환한다.
mapLatest : map 과동일한 처리 후 Flow를 반환. 하지만, 기존 Flow가 새 값을 방출하면 이전 값에 대한 변환 블록 계산이 취소된다.
flow {
emit("a")
delay(100)
emit("b")
}.mapLatest { value ->
println("Started computing $value")
delay(200)
"Computed $value"
}
will print "Started computing a" and "Started computing b",
but the resulting flow will contain only "Computed b" value.
mapNotNull : 원래 흐름의 각 값에 transform 함수를 적용한 결과중 null이 아닌경우만 포함하는 Flow를 반환한다.
withIndex : 값 및 해당 인덱스(0부터 시작)를 포함하여 각 요소를 IndexedValue로 래핑하는 Flow를 반환한다. 반환값 = Flow<IndexedValue>
Buffering
conflate : 통합 채널을 통해 스트림 방출을 통합하고 별도의 코루틴에서 수집을 수행합니다. 그 결과 수집기가 느려서 스트림 방출이 일시 중단된 적은 없지만 수집기는 항상 가장 최근에 방출된 값을 얻는다.
val flow = flow {
for (i in 1..30) {
delay(100)
emit(i)
}
}
Applying conflate() operator to it allows a collector that delays 1 second
on each element to get integers 1, 10, 20, 30:
val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)
buffer : Flow의 emit 과 collect의 순차처리가 별도의 코루틴으로 채널을 이용해서 처리하므로, 전체 처리 시간을 감소 시킬 수 있다.
flowOf("A", "B", "C")
.onEach { println("1$it") }
.buffer() // <--------------- buffer between onEach and collect
.collect { println("2$it") }
Filter Operator
filter : 지정된 predicate와 일치하는 값만 포함하고 있는 Flow를 반환한다.
filterInstance : 지정된 유형의 인스턴스인 값만 포함하는 Flow를 반환한다.
filterNot : 지정된 predicate와 일치하지 않는 값만 포함하고 있는 Flow를 반환한다.
filterNotNull : null이 아닌 값만 포함하고 있는 Flow를 반환한다.
take : 첫 번째 카운트 요소를 포함하는 Flow를 반환. 카운트 요소가 소비되면 원래 Flow가 취소된다.
takeWhile : 지정된 predicate( 조건)을 충족하는 첫 번째 요소가 포함된 흐름을 반환합니다.반환되는 Flow에는 predicate가 false를 반환한 요소가 포함되지 않는다.
distinctUntilChanged : 동일한 값의 이후의 모든 반복된 값은 필터링되는 Flow를 반환한다
distinctUntilChangedBy : 동일한 키의 모든 후속 반복이 필터링되는 Flow를 반환. 여기서 입력 인자인 keySelector 함수로 추출되는 Flow이다.
debounce : 원래 흐름을 미러링하는 Flow를 반환하지만 지정된 시간 제한 내에 새 값이 뒤에 오는 값은 필터링합니다. 최신 값이 항상 방출된다.
drop : 인자로 주어진 count 수 만큼의 값은 무시하는 Flow를 반환합니다.
reduce : 첫번째 값부터 시작하여 계속해서 값을 누적하고 최종 누적된 값을 반환한다.
fold : 초기 입력값을 인자로 받고, 그 값에 계속해서 값을 누적하고 최종 누적된 값을 반환한다.
count : Flow의 조건에 따른 카운트를 반환한다. predicate 인자를 넣어주면 predicate 조건에 해당하는 카운트만 반환.
sample : 주어진 샘플링(period) 기간 동안 원래 흐름에서 방출된 최신 값만 방출하는 Flow를 반환한다.
flow {
repeat(10) {
emit(it)
delay(110)
}
}.sample(200)
// will produces the following emissions
// 1, 3, 5, 7, 9
scan : 방출된 기존 flow 값으로 입력된 초기값을 포함하여, 지정된 action을 통해 처리된 새로운 Flow를 반환한다.
flowOf(1, 2, 3)
.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
// will produce [[], [1], [1, 2], [1, 2, 3]]
Actions
onStart : Flow가 수집되기 전에 지정된 action을 먼저 수행하는 Flow를 반환한다.
onEach : 스트림이 방출되기전에 지정된 action을 수행하는 Flow를 반환한다.
onCompletion : Flow가 완료,취소 또는 예외 발생시에 지정된 action을 호출 하는 Flow를 반환한다.
onEmpty : Flow가 스트림을 방출하지 않는경우에 , 지정된 action에서 값을 직접 방출할 수 있는 Flow를 반환한다.
retry : 지정된 predicate와 일치하는 예외가 발생되는경우 지정된 retries:최대 재시도 횟수까지 수집을 재시도한다
retryWhen : 지정된 predicate와 일치하는 예외가 발생되는 경우와 인자로 들어오는 attempt(시도 횟수) 로 직접 재시도를 결정합니다 return boolean (true 반환시에만 재시도 수행)
flow.retryWhen { cause, attempt ->
if (cause is IOException) { // retry on IOException
emit(RetryWrapperValue(e))
delay(1000) // delay for one second before retry
true
} else { // do not retry otherwise
false
}
}
cancellable : 각 배출물에 대한 취소 상태를 확인하는 흐름을 반환하고 흐름 수집기가 취소된 경우 해당 취소 원인을 발생시킵니다. Flow Builder 및 SharedFlow의 모든 구현은 기본적으로 취소할 수 있다.