Kotlin - Flow

DevOks·2021년 7월 14일
3

kotlin

목록 보기
2/2
post-thumbnail

Summary : Coroutine + Flow 개념 정리 위한 문서

Flow (Representing multiple values)


Concepts


  • Flow는 코루틴에서 여러 값을 순차적으로 내보낼 수 있는 비동기식 데이터 스트림 처리.

Feature


  • 코루틴에서 비동기적으로 호출하기 위한 suspend function과 동일하다. 그렇지만 flow에서는 suspend 를 붙일 필요가 없다.
  • Flow는 Cold Stream 이다. Flow 내부 코드는 collect() 가 호출되어야 실행된다.
  • Rxjava의 데이터 스트림처리 및 제공 함수 등 유사한 점이 많다. (ex: combine(),debounce() 등)
  • 데이터베이스,파일등의 저장소에서 데이터 변경시마다 실시간으로 수신할 수 있다.
  • flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit할 수 없다.

API Guide


  1. 생성 - Flow Builders

    • 간단히 flow { ... } 를 사용하여 생성 하거나 flowOf() , asFlow() 등의 함수로 생성 가능하다.
    • channelFlow { ... } , MutableStateFlow, MutableSharedFlow 등으로 도 생성 가능.
    ex1) fun flowBuilder() : Flow<String> = flow {
    
    		for(i in 1..3) {
    			delay(1000L)
    			emit("new Data $i")
    		}
    
    }.flowOn(Dispatchers.Main)
    
    flowBuilder.collect { value -> println(value) }
    
    ex2) flowOf("1", "2", "3", "4").collect { value -> println(value) }
    
    ex3) (1..3).asFlow().collect { value -> println(value) }
  2. 제약 - Flow Constraints

    • Context 특성과 예외 투명성(Exception transparency) 의 특성을 이해하고 구현해야 한다.
    1. Context 특성

      • 동일한 코루틴 Context에서 데이터가 방출 및 수집이 되어야만 한다.
      • Context를 변경이 필요한 경우 유일한 연사자인 flowOn() 을 사용 한다. 여러개의 코루틴에서 수집과 방출이 필요 할때는 channelFlow {}를 사용 하면 된다.
    2. 예외 투명성(Exception transparency)

      • 예외처리는 try { ... } catch 블록으로 래핑하지 않고 , catch() 연산자로 처리해야 한다.
      • 업스트림 flow에서 발생하는 예외만 catch 하도록 설계되었다.
      simple()
          .onEach { value ->
              check(value <= 1) { "Collected $value" }                 
              println(value) 
          }
          .catch { e -> println("Caught $e") }
          .collect()
  3. 기능 - Flow Functions

    • Context

      • flowOn : 이 Flow가 실행되는 context를 지정된 context로 변경하여 동작 하도록 한다.
        (context 보존속성으로 인하여 emit을 하는 context와 수신하는 context가 다르면 IllegalStateException이 발생한다.)
    • Creator

      • launchIn : 데이터 수집을 다른 scope 코루틴에서 수행 할 수있게 해주는 flow 연산자.
      • produceIn : 지정된 flow를 수집하도록 produce 코루틴을 생성 하는 flow 연산자. flow.produceIn(scope)
      • shareIn : ColdFlow를 지정된 코루틴 범위에서 시작되는 HotSharedFlow로 변환하여 여러 다운스트림 구독자와 업스트림 흐름의 단일 실행 인스턴스에서 방출을 공유하고 지정된 수의 Replay 값을 새 구독자에게 발행.
      • stateIn : ColdFlow를 지정된 코루틴 범위에서 시작되는 HotStateFlow로 변환하여 업스트림 흐름의 단일 실행 인스턴스에서 가장 최근에 내보낸 값을 여러 다운스트림 구독자와 공유.
    • Collection Teriminal Operator

      • collect : 방출되는 스트림 데이터를 수집 하는 action 코드를 작성 하는데 사용되는 정지 함수. 코루틴 내에서 실행 되어야 함.
      • collectindexed : collect와 동일하고, index가 추가되어 원하는 index에 맞는 작업이 가능함.
      • collectLatest : 반복적으로 스트림이 방출되면서, 이전에 방출된 action 코드가 취소되고, 새로운 action 블록이 실행 됨. 즉 마지막 action 만 이 완전히 수행 됨.
      • single : 1개의 데이터만 수집하고 중단함. 스트림이 2 번째 방출될 시 에는 exception 발생.
      • singleOrNull : single과 동일하지만 , 에러발생시 null이 반환되어지는것이 다름.
      • first : Flow에서 방출되는 첫 번째 값을 반환한 다음 Flow를 취소하는 터미널 연산자. Flow가 비어 있는 경우에는 NoSuchElementException 예외 발생.
      • firstOrNull : Flow에서 방출되는 첫 번째 값을 반환한 다음 Flow를 취소하는 터미널 연산자. Flow가 비어 있으면 null을 반환한다.
         fun flowBuilder() : Flow<String> = flow {
      
                 for(i in 1..3) {
                     delay(1000L)
                     emit("new Data $i")
                 }
      
         }.flowOn(Dispatchers.Main)
      
         flowBuilder().collect {
                 println("collect : $it")
         }
         ----
         fun flowBuilder() : Flow<String> = flow {
             emit("new Data $i")
         }
         val singleData = flowBuilderOneEmit().single() // emit 1회가 일어나는 flow에서만 사용.
      
         val firstData = flowBuilderOneEmit().first() // 첫번째 방출 되는 값을 수집하고 중단. 
    • Composing multiple flows

      • zip : 기준 Flow(this)를 기준으로 other flow와 인자로 제공된 transform 함수를 사용하여 other flow와 함께 현재 Flow(this)의 값을 압축합니다. 두 Flow중 하나가 완료되는 즉시 Flow가 완료되고 나머지 Flow는 취소된다.

        val flow = flowOf(1, 2, 3).onEach { delay(10) }
        val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
        flow.zip(flow2) { i, s -> i.toString() + s }.collect {
            println(it) // Will print "1a 2b 3c"
        }
      • combine : 각 Flow 별로 가장 최근에 방출된 값을 조합하여 transform 함수와 함께 값을 방출하는 Flow를 반환한다.

        val flow = flowOf(1, 2).onEach { delay(10) }
        val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
        flow.combine(flow2) { i, s -> i.toString() + s }.collect {
            println(it) // Will print "1a 2a 2b 2c"
        }
      • combineTransform : 각 Flow 별로 가장 최근에 방출된 값을 처리하는 transform 에 의해 조합된 값을 방출하는 Flow를 반환한다.

        val flow = requestFlow()
        val flow2 = searchEngineFlow()
        flow.combineTransform(flow2) { request, searchEngine ->
            emit("Downloading in progress")
            val result = download(request, searchEngine)
            emit(result)
        }
    • Flattening flows

      • flatMapConcat : Flow 사이를 시퀀셜하게 연결하는 operator로, 내부의 predicate Flow가 완료되어야만 다음 수집이 시작된다.

        fun requestFlow(i: Int): Flow<String> = flow {
            emit("$i: First")
            delay(500) // wait 500 ms
            emit("$i: Second")
        }
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapConcat { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            }
      • flatMapMerge : flatMapConctat과 동일하게 predicate Flow를 수행하지만, 순서를 보장하지 않고 외부, 내부의 Flow가 각각 (동시에) 수행되는 형태로 동작한다.

        fun requestFlow(i: Int): Flow<String> = flow {
            emit("$i: First")
            delay(500) // wait 500 ms
            emit("$i: Second")
        }
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapMerge { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            }
      • flatMapLatest : collectLatest와 비슷하게 동작한다. 새로운 emit발생시에 이전 대기중이거나 동작중인 Flow는 취소 된다.

      • flattenConcat : flatMapConcat 동일하게 flattening한다. 단, transform 처리가 없다.

      • flattenMerge : flatMapMerge 동일하게 flattening한다. 단, transform 처리가 없다.

    • Transform Operator

      • transform : 주어진 Flow의 각 값에 변환 함수를 적용합니다.변환의 수신기는 Flow Collector이므로 변환은 방출된 요소를 변환하거나 건너뛰거나 여러 번 방출할 수 있는 유연한 함수이다.

      • transformLatest : transform 과 동일하지만 , 기존 Flow가 새로 값을 방출하면 이전 transform 블록이 취소된다.

      • transformWhile : 지정된 predicate 함수가 true를 반환하는 동안 지정된 flow의 각 값에 transform 함수를 적용한다.

      • map : 원래 흐름의 각 값에 transform 함수를 적용한 결과를 포함하는 Flow를 반환한다.

      • mapLatest : map 과동일한 처리 후 Flow를 반환. 하지만, 기존 Flow가 새 값을 방출하면 이전 값에 대한 변환 블록 계산이 취소된다.

        flow {
            emit("a")
            delay(100)
            emit("b")
        }.mapLatest { value ->
            println("Started computing $value")
            delay(200)
            "Computed $value"
        }
        will print "Started computing a" and "Started computing b", 
        but the resulting flow will contain only "Computed b" value.
      • mapNotNull : 원래 흐름의 각 값에 transform 함수를 적용한 결과중 null이 아닌경우만 포함하는 Flow를 반환한다.

      • withIndex : 값 및 해당 인덱스(0부터 시작)를 포함하여 각 요소를 IndexedValue로 래핑하는 Flow를 반환한다. 반환값 = Flow<IndexedValue>

    • Buffering

      • conflate : 통합 채널을 통해 스트림 방출을 통합하고 별도의 코루틴에서 수집을 수행합니다. 그 결과 수집기가 느려서 스트림 방출이 일시 중단된 적은 없지만 수집기는 항상 가장 최근에 방출된 값을 얻는다.

        val flow = flow {
            for (i in 1..30) {
                delay(100)
                emit(i)
            }
        }
        Applying conflate() operator to it allows a collector that delays 1 second 
        on each element to get integers 1, 10, 20, 30:
        
        val result = flow.conflate().onEach { delay(1000) }.toList()
        assertEquals(listOf(1, 10, 20, 30), result)
      • buffer : Flow의 emit 과 collect의 순차처리가 별도의 코루틴으로 채널을 이용해서 처리하므로, 전체 처리 시간을 감소 시킬 수 있다.

        flowOf("A", "B", "C")
            .onEach  { println("1$it") }
            .buffer()  // <--------------- buffer between onEach and collect
            .collect { println("2$it") }
    • Filter Operator

      • filter : 지정된 predicate와 일치하는 값만 포함하고 있는 Flow를 반환한다.

      • filterInstance : 지정된 유형의 인스턴스인 값만 포함하는 Flow를 반환한다.

      • filterNot : 지정된 predicate와 일치하지 않는 값만 포함하고 있는 Flow를 반환한다.

      • filterNotNull : null이 아닌 값만 포함하고 있는 Flow를 반환한다.

      • take : 첫 번째 카운트 요소를 포함하는 Flow를 반환. 카운트 요소가 소비되면 원래 Flow가 취소된다.

      • takeWhile : 지정된 predicate( 조건)을 충족하는 첫 번째 요소가 포함된 흐름을 반환합니다.반환되는 Flow에는 predicate가 false를 반환한 요소가 포함되지 않는다.

      • distinctUntilChanged : 동일한 값의 이후의 모든 반복된 값은 필터링되는 Flow를 반환한다

      • distinctUntilChangedBy : 동일한 키의 모든 후속 반복이 필터링되는 Flow를 반환. 여기서 입력 인자인 keySelector 함수로 추출되는 Flow이다.

      • debounce : 원래 흐름을 미러링하는 Flow를 반환하지만 지정된 시간 제한 내에 새 값이 뒤에 오는 값은 필터링합니다. 최신 값이 항상 방출된다.

      • drop : 인자로 주어진 count 수 만큼의 값은 무시하는 Flow를 반환합니다.

      • reduce : 첫번째 값부터 시작하여 계속해서 값을 누적하고 최종 누적된 값을 반환한다.

      • fold : 초기 입력값을 인자로 받고, 그 값에 계속해서 값을 누적하고 최종 누적된 값을 반환한다.

      • count : Flow의 조건에 따른 카운트를 반환한다. predicate 인자를 넣어주면 predicate 조건에 해당하는 카운트만 반환.

      • sample : 주어진 샘플링(period) 기간 동안 원래 흐름에서 방출된 최신 값만 방출하는 Flow를 반환한다.

        flow {
            repeat(10) {
                emit(it)
                delay(110)
            }
        }.sample(200)
        // will produces the following emissions
        // 1, 3, 5, 7, 9
      • scan : 방출된 기존 flow 값으로 입력된 초기값을 포함하여, 지정된 action을 통해 처리된 새로운 Flow를 반환한다.

        flowOf(1, 2, 3)
        .scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
        // will produce [[], [1], [1, 2], [1, 2, 3]]
    • Actions

      • onStart : Flow가 수집되기 전에 지정된 action을 먼저 수행하는 Flow를 반환한다.

      • onEach : 스트림이 방출되기전에 지정된 action을 수행하는 Flow를 반환한다.

      • onCompletion : Flow가 완료,취소 또는 예외 발생시에 지정된 action을 호출 하는 Flow를 반환한다.

      • onEmpty : Flow가 스트림을 방출하지 않는경우에 , 지정된 action에서 값을 직접 방출할 수 있는 Flow를 반환한다.

      • retry : 지정된 predicate와 일치하는 예외가 발생되는경우 지정된 retries:최대 재시도 횟수까지 수집을 재시도한다

      • retryWhen : 지정된 predicate와 일치하는 예외가 발생되는 경우와 인자로 들어오는 attempt(시도 횟수) 로 직접 재시도를 결정합니다 return boolean (true 반환시에만 재시도 수행)

        flow.retryWhen { cause, attempt ->
            if (cause is IOException) {    // retry on IOException
                emit(RetryWrapperValue(e))
                delay(1000)                // delay for one second before retry
                true
            } else {                       // do not retry otherwise
                false
            }
        }
      • cancellable : 각 배출물에 대한 취소 상태를 확인하는 흐름을 반환하고 흐름 수집기가 취소된 경우 해당 취소 원인을 발생시킵니다. Flow Builder 및 SharedFlow의 모든 구현은 기본적으로 취소할 수 있다.

Documents


References


profile
[Android] Software Engineer

0개의 댓글