[Android] 상태 관리 특화 Coroutine

성승모·2025년 10월 20일

Android

목록 보기
5/8

Channel

개요

ChannelBlockingQueue와 같은 컨셉을 지닌다. 하지만 스레드를 막는 대신에 suspend 시킨다는 코루틴의 특성을 가진다.

produce

 보통 생성자-소비자 패턴으로 많이 만들어진다. 또한 이를 직접 만드는 것은 귀찮기 때문에 코루틴 빌더인 produce를 제공한다. 이를 통해 간편하게 생성자-소비자 패턴을 구현할 수 있다.

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")

close()를 통해 닫을 수 있다. 이 때 Channel은 close token을 받게 되고, 해당 토큰 또한 queue에 들어간다. 따라서, close()를 호출하더라도 소비자 쪽에서는 호출 전 생성된 요소들을 모두 활용할 수 있도록 보장되어있다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close()
}
for (y in channel) println(y)
println("Done!")

Channel<T>

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

 하지만 보통 이용하는 것은 Channel<T>이다. produce를 이용한 것은 사용자의 행위를 기다리는 실제 앱 사용에는 맞지 않는다. 따라서, SendChannelReceiveChannel를 모두 갖는 Channel<T>이 사용자의 행위를 Send 하고 이를 Receive 하여 이벤트를 적절히 처리하면 된다. 생성자의 파라미터를 확인해보자.

fun <E> Channel(
    capacity: Int = RENDEZVOUS, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, 
    onUndeliveredElement: (E) -> Unit? = null
): Channel<E>
  • capacity: 채널의 버퍼 용량이다.
  • onBufferOverflow
    ⸰ SUSPEND: 버퍼가 꽉 차면 멈춘다.
    ⸰ DROP_OLDEST: 멈추지 않고 제일 오래된 값을 버린다.
    ⸰ DROP_LATEST: 멈추지 않고 제일 최신의 값을 버린다.
  • onUndeliveredElement: 값이 보내지지 않았을 때 어떻게 처리할지 정하는 선택적인 람다이다.

다음은 SendChannel에서 송신 시 사용할 수 있는 함수에 대해 알아보자.

  1. send(element: E)
    : 값을 무조건 보내야하며, suspend하여 발신한다. 코루틴이 suspending되긴 하지만 송신을 보장할 수 있다.
  2. trySend(element: E)
    : 동기적으로 작동하며, 송신 결과를 Result 객체로 반환한다.

다음은 ReceiveChannel에서 수신 시 사용할 수 있는 함수에 대해 알아보자.

  1. receive(element: E)
    : 값을 무조건 받아야하며, suspend하여 대기한다. 코루틴이 suspending되긴 하지만 수신을 보장할 수 있다.
  2. tryReceive(element: E)
    : 동기적으로 작동하며, 수신 결과를 Result 객체로 반환한다.

 또한 close()는 위에선 produce에서 설명했듯이 token을 보내 close 호출 전 있었던 요소들을 이용할 수 있다. 반면에 cancel()은 채널 자체를 즉시 취소하며 버퍼 내 모든 값들을 없애기 때문에 주의해서 사용해야 한다.

Flow

interface Flow<out T>

flow { ... }

 위 빌더를 통해 간편하게 Flow를 만들 수 있다. 또한 Flow는 Cold Stream이기 때문에 수집 행위(Collect)가 있어야 작동한다. 또한 작동은 각 수집 행위마다 독립적으로 존재한다. 따라서, Cold Flow는 매 collect 시 마다 블록 내부가 재실행되므로, 데이터 소스(fetch, DB call 등)가 반복 실행됨에 유의해야 한다.

// Flow 생성
fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    // 첫번째 수집 행위 시작
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    // 두번째 수집 행위 시작
    flow.collect { value -> println(value) } 
}

위 코드의 결과는 다음과 같다.

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

Channel vs Flow
: Flow가 Cold Stream인 것에 비해 Channel은 Hot Stream이다. 수집 행위(Channel의 경우 receive)가 없어도 send()한 요소들은 Channel에 존재하고 있다.

연산자

 연산자를 적용해 수신한 값을 가공하여 새로운 Flow로 만들 수 있다.

  1. 변환
  • map: 각 요소를 변환 ex) flow.map { it * 2 }
  • transform: emit과 함께 다양한 로직 가능 ex) flow.transform { emit(it); emit(it*2) }
  • 그 밖에 map 시리즈 많음.
  1. 필터링
  • filter: 조건에 맞는 값만 통과 ex) flow.filter { it > 0 }
  • distinctUntilChanged: 연속 중복 제거 ex) flow.distinctUntilChanged()
  • take: 지정 개수만 가져오기 ex) flow.take(5)
  • drop: 건너뛰기 ex) flow.drop(3)
  1. 결합
  • zip: 두 Flow 결합 ex) flow1.zip(flow2) { a,b -> a+b }
  • combine: 두 Flow를 최신값 기준으로 결합 ex) flow1.combine(flow2) { a,b -> a*b }
  1. 수집
  • collect: 최종 소비 ex) flow.collect { println(it) }
  • collectLatest: 이전 collect는 취소하고 최신 값만 수집 ex) flow.collectLatest { process(it) }
  • launchIn(scope): Flow를 특정 CoroutineScope에서 실행 ex) flow.onEach { ... }.launchIn(viewModelScope)
  1. 스케줄링/버퍼
  • sample: 지정 시간 간격으로 최신값만 emit ex) flow.sample(1000L)
  • debounce: 마지막 emit으로부터 지연 후 전달 ex) flow.debounce(300L)

SharedFlow

interface SharedFlow<out T> : Flow<T> 

abstract val replayCache: List<T>
abstract suspend override fun collect(collector: FlowCollector<T>): Nothing

 Hot Flow로 요약할 수 있다. 모든 수신자 사이에서 브로드 캐스트로 데이터를 송수신하며 각각의 수신자들은 모든 값을 받는다. Channel과 가장 큰 차이는 여러 명의 수신자가 존재할 수 있다는 것이다. 프로퍼티 중 replayCache는 buffer 크기를 나타내며, Channel의 capacity와 유사하다.

 또한 구독이 상과없는 무한 스트림이기 때문에 종료라는 것이 없다. 그렇기 때문에 launchIn(scope) 함수를 이용할때는 scope의 라이프사이클에 유의하여 사용해야 하며, launchIn(scope)로 collect할 때 scope의 생명주기(LifecycleScope, ViewModelScope)에 의존하여 메모리 누수를 방지해야 한다.
또한 toList, toSet 같은 종단 연산자들을 SharedFlow에 적용하게 되면 무한한 크기를 갖게 될 수 있기 때문에 절대 사용하지 말아야 한다. 반면에 절단 연산자는 크기를 제한하기 때문에 사용해도 괜찮다.

StateFlow

interface StateFlow<out T> : SharedFlow<T> 

 똑같이 Hot Flow이며, 무한 스트림 특성을 갖는다. 하지만 항상 상태를 최신 값 하나만 갖고 있는 Flow이며, 초기 값이 필요하다. 또한 같은 값이 방출될 경우 자동으로 무시하며 이는 distinctUntilChanged와 유사하다. 따라서, SharedFlow는 이벤트를 위해서 많이 사용하고, StateFlow는 상태 관리를 위해 많이 사용된다. collectAsState()로 State로 바꾸어 Compose에서 이용 가능하다.

val uiState = MutableStateFlow(UiState())
val state by uiState.collectAsState()
profile
안녕하세요!

0개의 댓글