Coroutine Flow 란? (1)

쓰리원·2022년 9월 4일
2

Coroutine Flow

목록 보기
1/2
post-thumbnail

1. Flow 감 잡아보기

순차적으로 값을 배출해서, 정상적으로 완료하거나 에러를 던지는 비동기 데이터 스트림 입니다. 일시중단할 수 있는 함수는(Suspending function) 비동기적으로 하나의 값을 반환합니다. 하지만 비동기적으로 계산되어진 값을 여러개를 반환할 때 Flow를 사용합니다.

코루틴 플로우를 사용하여 연속적인 데이터 스트림을 구현하기 위해서 필요한 것은 아래의 3가지가 있습니다.

Producer(생산자)
Intermediary(중간 연산자) - 선택사항
Consumer(소비자)

1. Flow 빌더

flow {}를 사용하는 것이 가장 기본적인 플로우 빌더입니다. emit() 이외에 asFlow()를 통하여 Collection 및 Sequence를 Flow로 변환 할 수 있습니다.

(1..10).asFlow().collect { value ->
    println(value)
}

2. Producer(생산자)

먼저 생산자에서는 데이터를 발행하기 위하여 flow {} 코루틴 블록(빌더)을 생성한 후 내부에서 emit()을 통하여 데이터를 생성합니다. 또한 flow {} 블록은 suspend 함수이므로 delay를 호출할 수 있습니다.

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) // 0 1 2 3..9
        delay(100L) // 100ms
    }
}

3. Intermediary(중간 연산자)

생산자에서 데이터를 발행을 하였다면 중간 연산자는 생성된 데이터를 수정 할 수 있습니다. 코틀린의 컬렉션의 함수와 같이 대표적으로 map(데이터를 원하는 형태로 변환), filter(데이터 필터링), onEach(데이터를 변경후 수행한 결과를 반환) 등이 있습니다.

flowSomethings().filter {
    it % 2 == 0 // 짝수만 필터링
}
  
flowSomethings().map {
    it * 2 // 값을 2배씩
}

4. Consumer(소비자)

생산자에서 데이터를 발행하고, 중간 연산자(선택)에서 데이터를 가공하였다면 소비자에서는 collect()를 이용하여 전달된 데이터를 소비할 수 있습니다.

fun main() = runBlocking {
    flowSomething().map {
        it * 2
    }.collect { value ->
        println(value)
    }
}
// 0 2 4 6 8 .. 18

2. flow 더 알아보기

Flow는 코루틴을 기반으로 빌드되며 여러 값을 제공할 수 있습니다. Flow는 비동기식으로 계산할 수 있는 데이터 스트림의 개념입니다. 내보낸 값은 동일한 유형이어야 합니다. 예를 들어 Flow는 정수 값을 내보내는 흐름입니다.

Flow는 값 시퀀스를 생성하는 Iterator와 비슷하지만 suspend 함수를 사용하여 값을 비동기적으로 생성하고 사용합니다. 예를 들어 Flow는 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들 수 있습니다.

1. 간단한 flow 사용

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) //0 1 2 3 ... 9
        delay(10L)
    }
}

fun main() = runBlocking {
    flowSomething().collect { value ->
        println(value)
    }
}
  
결과

0
1
2
3  
...
 
9

flow 플로우 빌더 함수를 이용해서 코드블록을 구성하고 emit을 호출해서 스트림에 데이터를 흘려 보냅니다. 플로우는 콜드 스트림이기 때문에 요청 측에서 collect를 호출해야 값을 발생하기 시작합니다.

콜드 스트림 - 요청이 있는 경우에 보통 1:1로 값을 전달하기 시작.
핫 스트림 - 0개 이상의 상대를 향해 지속적으로 값을 전달.

2. 플로우 취소

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) //0 1 2 3 ... 9
        delay(100L)
    }
}

fun main() = runBlocking<Unit> {
    val result = withTimeoutOrNull(500L) {
        flowSomething().collect { value ->
            println(value)
        }
        true
    } ?: false
    if (!result) {
        println("취소되었습니다.")
    }
}

withTimeoutOrNull을 이용해 취소할 수 있습니다. withTimeoutOrNull(500L)라고 하게 되면 500ms 가 되었을 때 withTimeoutOrNull이 null을 리턴하기 때문에 결과적으로 false를 반환하게 됩니다.

3. 플로우 빌더 flowOf

flow 이외에도 몇가지 flowOf, asFlow등의 플로우 빌더가 있습니다. flowOf의 경우 여러 값을 인자로 전달해 플로우를 만듭니다.

fun main() = runBlocking<Unit> {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}
  
결과
  
1
2
3
4
5

4. 플로우 빌더 asFlow

asFlow는 컬렉션이나 시퀀스를 전달해 플로우를 만들 수 있습니다.

fun main() = runBlocking<Unit> {
    listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
        println(value)
    }
    (6..10).asFlow().collect {
        println(it)
    }
}
  
결과
  
1
2
3
4
5
6
7
8
9
10

3. flow 연산하기

1. flow와 map

플로우에서 map 연산을 통해 데이터를 가공할 수 있습니다.

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it)
        delay(10L)
    }
}

fun main() = runBlocking {
    flowSomething().map {
        "$it $it"
    }.collect { value ->
        println(value)
    }
}
  
결과
  
0 0
1 1
2 2
  
...
  
9 9

2. flow와 filter

fun main() = runBlocking<Unit> {
    (1..20).asFlow().filter {
        (it % 2) == 0 //짝수만 필터링
    }.collect {
        println(it)
    }
}

결과
  
2
4
6
  
...
  
20

3. filterNot

만약 홀수만 남기고 싶을 때 코드를 홀수에 맞게 수정할 수 도 있습니다. 하지만 짝수를 출력하는 코드를 그대로 두고 filterNot을 사용할 수도 있습니다.

fun main() = runBlocking<Unit> {
    (1..20).asFlow().filterNot {
        (it % 2) == 0
    }.collect {
        println(it)
    }
}

4. transform 연산자

위의 중간 연산자(map, filter)은 요소마다 1개의 변환밖에 하지 못하지만 변환 연산자(transform)은 예시처럼 emit()을 추가하여 요소마다 여러개의 변환이 가능하게 해줍니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.collect {
        println(it)
    }
}
  
1
2
2
4
3
6
4
8
  
...
  

5. take 연산자

take 연산자는 몇개의 수행 결과만 취합니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.take(5)
    .collect {
        println(it)
    }
}
  
결과
  
1
2
2
4
3

6. takeWhile 연산자

takeWhile을 이용해 조건을 만족하는 동안만 값을 가져오게 할 수 있습니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.takeWhile {
        it < 15
    }.collect {
        println(it)
    }
}

7. drop 연산자

drop 연산자는 처음 몇개의 결과를 버립니다. take가 takeWhile을 가지듯 dropWhile도 있습니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.drop(5)
    .collect {
        println(it)
    }
}

4. reference

https://kotlinlang.org/docs/flow.html#flow-cancellation-basics
https://developer.android.com/kotlin/flow?hl=ko

profile
가장 아름다운 정답은 서로의 협업안에 있다.

0개의 댓글