코루틴 플로우 컨텍스트, 버퍼링, 결합, 예외 및 완료처리, 런칭 정리

SSY·2022년 12월 15일
0

Flow

목록 보기
2/5
post-thumbnail

목차
1. 플로우 컨텍스트
2. 플로우 버퍼링
3. 플로우 결합
4. 플로우 예외 및 완료처리
5. 플로우 런칭

1. 플로우 컨텍스트

우리는 플로우를 사용할 때, 코루틴과 함께 사용하곤 한다. 그리고 하나의 코루틴 빌더(launch, async, withContext 등...)내에서 Flow객체를 반환받게 된다.

이때 반환되어지는 Flow도 어떤 스레드에서 돌것인지에 대한 context를 가지게 되는데, 이때 이 Flow context는 자신을 호출한 코루틴의 context를 그대로 가지게 된다. 그리고 이를 context preservasion즉, 콘텍스트 보존이라 부른다. 코드로 한 번 이해해보도록 하자.

fun foo(): Flow<Int> {
    return flow {
       Log.i("flowTest", "Flow 시작, currentThread :${Thread.currentThread().name}")
       for (i in 1..3) {
           emit(i)
       }
    }
}

fun main() {
    runBlocking {
        foo().collect {
            Log.i("flowTest", "collect : $it, currentThread :${Thread.currentThread().name}")
        }
    }
}

위 결과를 보면 알겠지만, 코루틴을 시행한 곳의 스레드는 Main스레드였다. 그리고 해당 코루틴 안에서의 Flow내부도 Main스레드로 찍혀 있다. 이로써, Flow의 context는 자신을 호출한 코루틴의 context를 그대로 가져가는 걸 알 수 있다.

하지만 코루틴을 호출한 스레드와 Flow내부의 스레드를 다르게 해줘야 하는 경우가 있을 수 있다. 가령, 연산이 많이 필요한 작업이여서 Dispatchers.Default를 사용해야 하는 경우나, 네트워크 통신이 필요하여 Dispatchers.IO를 사용해야 하는 경우가 있다. 그럼 이렇게 생각할 수 있겠다.

코루틴을 호출한 스레드와 Flow내부의 스레드를 다르게 해줘야 하는 경우?
1. Dispatchers.Default -> 연산이 많이 필요한 작업
2. Dispatchers.IO -> 네트워크, 파일 입출력 작업

그래서 다음과 같이 잘못 생각할 수 있다

"Flow내부 context는 코루틴의 context를 따라간다고 했으니...

Flow를 호출해주는 코루틴의 콘텍스트를 바꾸어 주면 되겠구나!"

fun foo(): Flow<Int> {
    return flow {
        withContext(Dispatchers.Default) { // Flow의 context를 바꾸기 위해 이녀석을 넣어줌.
            Log.i("flowTest", "Flow 시작, currentThread :${Thread.currentThread().name}")
            for (i in 1..3) {
                emit(i)
            }
        }
    }
}
fun main() {
    runBlocking {
        foo().collect {
            Log.i("flowTest", "collect : $it, currentThread :${Thread.currentThread().name}")
        }
    }
}

라고 생각하고 Flow객체 내부 코루틴의 콘텍스트를 지정해줄 수도 있겠다. 하지만 이를 빌드해보면 아래와 같은 결과가 나온다.

코루틴의 context를 바꾸어주기 위해서는 flowOn함수를 사용하라고 말해주고 있다. 그럼 이쯤에서 감이 잡힐거라 생각한다. Flow의 context를 바꾸어주기 위해선 코루틴의 context를 바꾸어주는 것이 아니다. flowOn함수를 사용해서 바꾸어주는 것이다. 아래는 개선된 코드이다.

Flow의 Context를 바꾸는 법?
flowOn함수를 사용

fun foo(): Flow<Int> {
    return flow {
        Log.i("flowTest", "Flow 시작, currentThread :${Thread.currentThread().name}")
        for (i in 1..3) {
            emit(i)
        }
    }
}
fun main() {
    runBlocking {
        foo()
            .flowOn(Dispatchers.Default)
            .collect {
            Log.i("flowTest", "collect : $it, currentThread :${Thread.currentThread().name}")
        }
    }
}

실행 결과는...!?

결과를 보면 바로 이해가 간다. 코루틴을 호출하는 스레드는 메인이고, Flow가 수행되는 곳은 Default디스패쳐인것을 확인할 수 있다. 즉, 이렇게 코루틴과 Flow의 context를 바르게 해주고 싶을땐, flowOn()메소드를 사용하면 된다.

2. 플로우 버퍼링

버퍼링이란 말은 우리 일상에서도 많이 쓰인다. 유튜브를 보면서 영상 버퍼링을 우린 많이 경험했다. 즉, 데이터들을 수신받는데 있어 작업이 오래 걸릴 경우, 방출이 완료된 데이터는 미리 받고 저장해 줌으로써 시간을 단축해줄 수 있다. 즉, 자료구조의 Queue와 동일하게 동작한다 보면 된다. 아래의 코드로 설명을 시작해볼까 한다.

Flow Buffer 적용 X

fun foo(): Flow<Int> {
    return flow {
        for (i in 1..3) {
            delay(1000)
            emit(i)
        }
    }
}
fun main() {
    runBlocking {
        val time = measureTimeMillis {
            foo().collect {
                delay(3000)
            }
        }
        Log.i("flowTest", "collect time: $time")
}

총 12초가 걸렸다. 위 로직의 흐름은 다음과 같다.

로직의 흐름
foo로부터 1초 후 아이템 방출 -> 3초동안 아이템 수집 ->
foo로부터 1초 후 아이템 방출 -> 3초동안 아이템 수집 ->
foo로부터 1초 후 아이템 방출 -> 3초동안 아이템 수집

즉, (1 + 3) * 3이되니 총 12초가 되는 것이다. 하지만, 플로우 버퍼를 사용한다면, 아이템 수집 작업이 안끝났다 하더라도 퍼버에 이미 받아둔 데이터를 바로바로 수집할 수 있게 해준다.

[ Flow Buffer 적용 O ]

fun foo(): Flow<Int> {
    return flow {
        for (i in 1..3) {
            delay(1000)
            Log.i("flowTest", "emit: $i")
            emit(i)
        }
    }
}
fun main() {
    runBlocking {
        val time = measureTimeMillis {
            foo()
            .buffer()
            .collect {
                Log.i("flowTest", "collect: $it start")
                delay(3000)
                Log.i("flowTest", "collect: $it end")
            }
        }
        Log.i("flowTest", "collect time: $time")
}

즉 위는 다음과 같이 동작하는 것이다.

로직의 흐름
1초 후, 아이템 방출 ->
방출받은 아이템 수집 ->
collect에서 추가 처리(3초) ->
기다리는 동안, 1초 후 아이템 방출 ->
기다리는 동안, 1초 후 아이템 방출 ->
collect에서 추가 처리(3초) ->
collect에서 추가 처리(3초)

따라서 위의 버퍼 기능은 다음과 같이 정의할 수 있다.

buffer()는 언제 사용할까? 딱 정해준다
아이템을 방출하는 시간 < 아이템을 수집하는 시간

조금 더 Scoping해서 정리해보면 다음과 같다.

buffer()는 언제 사용할까? More Scoped
Flow의 방출 시간 < collec내부 연산 시간

이럴 경우, 이미 방출된 데이터를 버퍼에 저장해두고, 데이터 수집작업(=collect)가 끝났을 때, 버퍼에 저장된 데이터를 바로 수집할 수 있는 것이다.

3. 플로우 결합

우리는 보통 코루틴을 사용하며 두개 이상의 Flow들을 병합해야 할 때도 있다. 소제목에서도 그렇듯이 두개 이상의 Flow를 결합하여 새로운 데이터 구조를 좀 더 적은 코드로 만들 수 있는 것이다. 이번에 알아볼 '결합 연산자'는 총 2가지 이다.

Flow 결합 연산자
1. Combine
2. Zip

3.1. Combine

combine 영어 단어 뜻에서도 그렇듯, '결합'이라는 뜻이 있다. 예를 들어 다음의 두 코드가 있다고 가정해보자.

fun foo() {
    val intFlow = flowOf(1, 2, 3).onEach {
        delay(1000)
    }
    val charFlow = flowOf('A', 'B', 'C').onEach {
        delay(1500)
    }
}

우선 첫 번째 Flow. 'intFlow'의 경우는 1초마다 데이터를 방출한다. 그리고 두 번째 Flow. 'charFlow'는 1.5초마다 데이터를 방출한다. 그렇다면 이제 위 두개의 Flow를 통해서 데이터를 방출시켜 보자.

fun foo() {
    runBlocking {
        val intFlow = flowOf(1, 2, 3).onEach {
            delay(1000)
        }
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.combine(charFlow) { num, char ->
            "$num/$char"
        }.collect {
            Log.i("FlowTest", "$it")
        }
    }
}

당연한 결과가 나왔다. 이렇게 나올 수 있는 마블 흐름도를 그려보면 다음과 같다.

3.2. Zip

이 연산자도 위의 combine과 사용법이 '완전 똑같'다. 다만 다른점이 한 가지 있는데, 그것은 두개의 Flow값을 기다렸다가 한 번에 방출한다는 점이다. 똑같이 아래의 코드를 사용하고자 한다.

fun foo() {
    val intFlow = flowOf(1, 2, 3).onEach {
        delay(1000)
    }
    val charFlow = flowOf('A', 'B', 'C').onEach {
        delay(1500)
    }
}

intFlow는 1초, charFlow는 1.5초마다 방출한다. 그리고 이제 방출해보자. 그리고 Build!

fun foo() {
    runBlocking {
        val intFlow = flowOf(1, 2, 3).onEach {
            delay(1000)
        }
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.zip(charFlow) { num, char ->
            "$num/$char"
        }.collect {
            Log.i("FlowTest", "$it")
        }
    }
}

자, zip연산자를 보면 위와 같이 데이터가 세 번만 방출되었다. 그리고 이 마블 차트를 그려보면 다음과 같다.

그림을 좀 개떡같이 그리긴 했지만,,,, 내 스스로의 공부용이기도 하고, 사실 자세히 보면 이해에 무리는 없다!(ㅋㅋㅋ)

4. 플로우 예외 및 완료처리

우리는 try-catch문을 다음과 같은 방식으로 사용하곤 한다.

try {
    // 실행 도중 에러가 날 수 있는 코드부분
} catch (e: Exception) {
    // 에러가 발생했을때 처리하는 부분
} finally {
    // 에러가 나든 상관 없이, 꼭 실행되어야 하는 부분
}

그리고 이러한 처리는 코루틴 Flow를 사용하면서도 동일하게 처리할 수 있다. 아까 써본 코드에서 조금만 변형을 해보자.

fun foo() {
    runBlocking {
        val intFlow = flowOf(1, 2, 3).onEach {
            Log.i("FlowTest", "익셉션 시작")
            throw Exception("플로우 익셉션")
            // delay(1000)
        }
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.zip(charFlow) { num, char ->
            "$num/$char"
        }
            .catch { cause ->
                Log.i("FlowTest", "catch들어옴 : ${cause.message}")
            }
            .onCompletion { cause ->
                Log.i("FlowTest", "onCompletion들어옴 : ${cause?.message}")
            }
            .collect {
                Log.i("FlowTest", "collect들어옴 : $it")
        }
    }
}

첫 시작하기 직전에 '익셉션 시작'이라는 로그를 찍어주었다. 그 후, 강제로 예외를 던졌다. 그러면 당연하게도 catch블록이 호출된다. 그 후, 최종적으로 onCompletion블록을 타게 된다. 같은 맥락으로 익셉션이 발생하지 않는 경우엔 collect -> onCompletion블록이 타는걸 확인할 수 있다.

fun foo() {
    runBlocking {
        val intFlow = flowOf(1, 2, 3).onEach {
            // Log.i("FlowTest", "익셉션 시작")
            // throw Exception("플로우 익셉션")
            delay(1000)
        }
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.zip(charFlow) { num, char ->
            "$num/$char"
        }
            .catch { cause ->
                Log.i("FlowTest", "catch들어옴 : ${cause.message}")
            }
            .onCompletion { cause ->
                Log.i("FlowTest", "onCompletion들어옴 : ${cause?.message}")
            }
            .collect {
                Log.i("FlowTest", "collect들어옴 : $it")
        }
    }
}

다만, 여기서 약간 주의해야할 점이 있다. 그것은 바로 ,'catch'와 'onCompletion'의 위치이다.

강조! catch와 onCompletion위치!!

fun foo() {
    runBlocking {
        val intFlow = flowOf(1, 2, 3).onEach {
            Log.i("FlowTest", "익셉션 시작")
            throw Exception("플로우 익셉션")
              delay(1000)
        }
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.zip(charFlow) { num, char ->
            "$num/$char"
        }
            .onCompletion { cause ->
                Log.i("FlowTest", "onCompletion들어옴 : ${cause?.message}")
            }
            .catch { cause ->
                Log.i("FlowTest", "catch들어옴 : ${cause.message}")
            }
            .collect {
                Log.i("FlowTest", "collect들어옴 : $it")
        }
    }
}

위의 코드는 onCompletion을 먼저 호출한 후, catch블록을 호출해 주었다. 그래서 onCompletion과 catch에서 모두 예외 메시지를 받는걸 확인할 수 있다. (하지만 우린 이미 확인했다. catch를 통해 이미 예외처리를 했다면, onCompletion엔 null을 리턴한다는 것을.)

catch와 onCompletion정리
catch블록에서 에러처리가 이뤄졌는가? 그렇다면 그 이후엔 에러가 전달되지 않는다!

5. 플로우 런칭

플로우 런칭은 우리가 '플로우를 처음에 어떻게 생성하는지?'를 다룬다. 우선 앞전에서 우린 flowOf를 아래와 같은 형식으로 다루었었다.

val intFlow = flowOf(1, 2, 3)
val charFlow = flowOf('A', 'B', 'C')

하지만 이러한 방식 말고도 다음을 사용하여 플로우를 만들 수 있다.

Flow만드는 법
1. asFlow()
2. flow { ... }
3. channelFlow { ... }
4. MutableStateFlow
5. MutableSharedFlow

5.1. asFlow()

(1...3).asFlow()
    .collect{ ... }

앞전에서 다루었던 코드와 간단히 결합해보았다. 이해에 어려움이 없을거라 생각한다.

5.2. flow { ... }

기존 intFlow = flowOf(1, 2, 3)부분을 numbers()라는 함수로 치환했다. 동일한 코드이다.

fun numbers(): Flow<Int> = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}
fun foo() {
    runBlocking {
        val intFlow = numbers()
        val charFlow = flowOf('A', 'B', 'C').onEach {
            delay(1500)
        }
        intFlow.zip(charFlow) { num, char ->
            "$num/$char"
        }
            .onCompletion { cause ->
                Log.i("FlowTest", "onCompletion들어옴 : ${cause?.message}")
            }
            .catch { cause ->
                Log.i("FlowTest", "catch들어옴 : ${cause.message}")
            }
            .collect {
                Log.i("FlowTest", "collect들어옴 : $it")
        }
    }
} 

5.2. ChannelFlow

5.3. MutableStateFlow

5.4. MutableSharedFlow

profile
불가능보다 가능함에 몰입할 수 있는 개발자가 되기 위해 노력합니다.

0개의 댓글