[코루틴에 관한 정리] 블로그에 퍼진 흔한 오해들

4

어떤 블로그들을 확인하다 보면, 다소 혼란스러운 말들로 적어놓은것들을 볼 수 있습니다.
이런 몇몇 것들을 직접 정리하면서 해소하고자 합니다.

1. repeatOnLifecycle 의 오해

repeatOnLifecycle을 사용할때, 파라미터의 state가 어떻게되든 onStop일때 cancel이 된다고 생각할때가 있습니다.
하지만 state에 따라 종료되는 위치도 달라집니다.

    suspendCancellableCoroutine<Unit> { cont ->
        // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
        // cancels when it falls below that state.
        val startWorkEvent = Lifecycle.Event.upTo(state)
        val cancelWorkEvent = Lifecycle.Event.downFrom(state)
        val mutex = Mutex()
        observer = LifecycleEventObserver { _, event ->
            if (event == startWorkEvent) {
                // Launch the repeating work preserving the calling context
                launchedJob = this@coroutineScope.launch {
                    // Mutex makes invocations run serially,
                    // coroutineScope ensures all child coroutines finish
                    mutex.withLock {
                        coroutineScope {
                            block()
                        }
                    }
                }
                return@LifecycleEventObserver
            }
            if (event == cancelWorkEvent) {
                launchedJob?.cancel()
                launchedJob = null
            }
            if (event == Lifecycle.Event.ON_DESTROY) {
                cont.resume(Unit)
            }
        }
        this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
    }
} finally {
    launchedJob?.cancel()
    observer?.let {
        this@repeatOnLifecycle.removeObserver(it)
    }
}

위의 코드는 repeatOnLifeCycle입니다.

내부적으로 suspendCancellableCoroutine을 사용하고 있습니다.

       val startWorkEvent = Lifecycle.Event.upTo(state)
        val cancelWorkEvent = Lifecycle.Event.downFrom(state)
        val mutex = Mutex()
        observer = LifecycleEventObserver { _, event ->
            if (event == startWorkEvent) {
                // Launch the repeating work preserving the calling context
                launchedJob = this@coroutineScope.launch {
                    // Mutex makes invocations run serially,
                    // coroutineScope ensures all child coroutines finish
                    mutex.withLock {
                        coroutineScope {
                            block()
                        }
                    }
                }
                return@LifecycleEventObserver
            }
            //여기 주목
            if (event == cancelWorkEvent) {
                launchedJob?.cancel()
                launchedJob = null
            }
            if (event == Lifecycle.Event.ON_DESTROY) {
                cont.resume(Unit)
            }

cancelWorkEvent일때 해당 job을 cancel 시킵니다.

cancelWorkEvent는 아래와 같은 코드로 되어있습니다.

        public static Event downFrom(@NonNull State state) {
            switch (state) {
                case CREATED:
                    return ON_DESTROY;
                case STARTED:
                    return ON_STOP;
                case RESUMED:
                    return ON_PAUSE;
                default:
                    return null;
            }
        }

downFromstate가 뭔지에따라 어떤 생명주기에서 job을 cancel 시킬지 정해줍니다.

suspendCancellableCoroutine 은 상위 코루틴이 cancel 되거나 resumeWithException, resume등이 호출될때까지 suspend됩니다.
이를 통해 결국 startWorkEvent와 맞는 생명주기일때 job을 relaunch 시키고,
cancelWorkEvent일때 해당 job을 cancel시킵니다.

            if (event == Lifecycle.Event.ON_DESTROY) {
                cont.resume(Unit)
            }
        }
        this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
    }
} finally {
    launchedJob?.cancel()
    observer?.let {
        this@repeatOnLifecycle.removeObserver(it)
    }
}

이 행위에서 빠져나오는 구간은 ON_DESTROY가 될때 뿐이며, resume을 받아 suspendCancellableCoroutine을 종료하여 아래의 finally구문으로 이동합니다.

finally를 통해 observer를 안전하게 종료시킵니다.

2. Hotflow는 not complete하고 Cold flow는 complete

ColdFlow인 Flow에서 상속하고 있는 AbstractFolw는 추상클래스입니다.

여기서 collect가 어떻게 작동하는지 단면을 엿볼수 있습니다.

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    

여기서 봐야할것은 tryfinally입니다.
연산자에 있는 emitcollect하고나면, 알아서 종료됩니다.

하지만 HotFlow인 SharedFlow을 확인해봅시다.


   override suspend fun collect(collector: FlowCollector<T>): Nothing {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                var newValue: Any?
                while (true) {
                    newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                    if (newValue !== NO_VALUE) break
                    awaitValue(slot) // await signal that the new value is available
                }
                collectorJob?.ensureActive()
                collector.emit(newValue as T)
            }
        } finally {
            freeSlot(slot)
        }
    }

이전의 coldFlow와 다르게 sharedFlow는 while()문이 있습니다.
이것은 언제 끝나는걸까요?
collectorJob?.ensureActive()에서 에러를 방출하면 끝나게 됩니다.

public fun Job.ensureActive(): Unit {
    if (!isActive) throw getCancellationException()
}

ensureActive는 현재 Job이 활성화가 아닐때 Throw를 던집니다.

결국 HotFlow는 현재 CoroutineScope가 cancel 되었을때, 같이 종료됨을 알수 있습니다.

3.HotFlow에서 Error가 일어난다면 어떻게 될까?

    val state: StateFlow<MainState> = events.receiveAsFlow()
        .scan(MainState(), ::reduceState)
        .shareIn(
            viewModelScope,
            SharingStarted.Eagerly,
            0,
        ).onSubscription {
            throw Exception()
        }
        .catch {
            Log.d("test", "test")
        }.stateIn(viewModelScope, SharingStarted.Eagerly, MainState())

위의 Flow를 다이어그램으로 표현해봅시다.

ShareingStarted.Eagerly에 의해 Channel과 SharedFlow, StateFlow의 producer
Viewmodel.destroy될때까지 계속 살아있게 됩니다.

하지만 여기서 만약 never Complete되지 않는 HotFlow에서 Error가 발생하면 어떻게 될까요?

HotFlow에서 Catch되는 순간 HotFlow는 complete 됩니다.

한번 에러가 발생하면 맨아래의 StateFlow는 아무것도 받을수 없게 됩니다.

4. Flow가 Suspend(정지)되는 시점은 언제일까?

Flow는 collect하는 그 시점부터 suspend 된다.
decomplie로 추적하면서 한번 확인 해보자.

    fun a() {
            flow {
                emit(1)
                emit(2)
                emit(3)
            }
    }

여기에 단순한 flowBuilder가 있다.
이걸 한번 디컴파일 해보자.

public final void a() {
   FlowKt.flow((Function2)(new Function2((Continuation)null) {
      // $FF: synthetic field
      private Object L$0;
      int label;

      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Integer var10001;
         FlowCollector $this$flow;
         Object var3;
         label25: {
            var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  $this$flow = (FlowCollector)this.L$0;
                  var10001 = Boxing.boxInt(1);
                  this.L$0 = $this$flow;
                  this.label = 1;
                  if ($this$flow.emit(var10001, this) == var3) {
                     return var3;
                  }
                  break;
               case 1:
                  $this$flow = (FlowCollector)this.L$0;
                  ResultKt.throwOnFailure($result);
                  break;
               case 2:
                  $this$flow = (FlowCollector)this.L$0;
                  ResultKt.throwOnFailure($result);
                  break label25;
               case 3:
                  ResultKt.throwOnFailure($result);
                  return Unit.INSTANCE;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            var10001 = Boxing.boxInt(2);
            this.L$0 = $this$flow;
            this.label = 2;
            if ($this$flow.emit(var10001, this) == var3) {
               return var3;
            }
         }

         var10001 = Boxing.boxInt(3);
         this.L$0 = null;
         this.label = 3;
         if ($this$flow.emit(var10001, this) == var3) {
            return var3;
         } else {
            return Unit.INSTANCE;
         }
      }

Flow 내부에서는 label(StateMachine을 이용하여) 순차적으로 값을 처리한다.

    fun c() {
        ""
    }

    fun b() = viewModelScope.launch {
        c()
        c()
    }

우리는 이걸 코루틴을 사용했을때 이런식으로 구조화가 된다는것을 알 수 있었다.

  @NotNull
   public final Job b() {
      return BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object var1) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure(var1);
                  SettingsViewModel.this.c();
                  SettingsViewModel.this.c();
                  return Unit.INSTANCE;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }),

FlowBuilder 내부는 Continuation이 필요한데 우리는 해당Flow를 CoroutineScope로 감싸지 않았다.
정작 필요할때는 Collect할때 Scope()로 감싸게 된다.

5. await를 잘못달면 순차처리?

    val time = measureTimeMillis {
        val a = async {
            log("I'm computing a piece of the answer")
            delay(1000)
            6
        }.await()

        val b = async {
            log("I'm computing another piece of the answer")
            delay(2000L)
            7
        }.await()

//        val  deferredTime = a.await()
//        val deferredTime2 = b.await()


        log("The answer is ${deferredTime * deferredTime2}")
//        log("The answer is ${a.await() * b.await()}")
    }
    log("Completed in $time ms")

위의 코드는 async를 사용했는데도 3초가 걸립니다.
왜일까요? await는 결과값이 나오기까지 Suspend할 수 있기 때문입니다.
await 블록 내부 구현을 확인해봅시다.
suspendCoroutineUninterceptedOrReturn은 상위에서 전달된 코루틴 정보에 접근할수 있도록 만들어진 함수로써 값을 그대로 반환합니다.

여기서 return값으로 COROUTINE_SUSPENDED가 반환된다면, 결과값이 코루틴이 처리된다음 continutation 파라미터를 통해 반환할것이라고 나타내는데요.

    private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
        /*
         * Custom code here, so that parent coroutine that is using await
         * on its child deferred (async) coroutine would throw the exception that this child had
         * thrown and not a JobCancellationException.
         */
        val cont = AwaitContinuation(uCont.intercepted(), this)
        // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
        cont.initCancellability()
        cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
        cont.getResult()
    }

그러한 코드들이 getResult()를 통해 구현되고 있습니다.

    @PublishedApi
    internal fun getResult(): Any? {
        val isReusable = isReusable()
        // trySuspend may fail either if 'block' has resumed/cancelled a continuation
        // or we got async cancellation from parent.
        if (trySuspend()) {
            /*
             * Invariant: parentHandle is `null` *only* for reusable continuations.
             * We were neither resumed nor cancelled, time to suspend.
             * But first we have to install parent cancellation handle (if we didn't yet),
             * so CC could be properly resumed on parent cancellation.
             *
             * This read has benign data-race with write of 'NonDisposableHandle'
             * in 'detachChildIfNotReusable'.
             */
            if (parentHandle == null) {
                installParentHandle()
            }
            /*
             * Release the continuation after installing the handle (if needed).
             * If we were successful, then do nothing, it's ok to reuse the instance now.
             * Otherwise, dispose the handle by ourselves.
            */
            if (isReusable) {
                releaseClaimedReusableContinuation()
            }
            // 완료될떄까지 정지할게요.
            return COROUTINE_SUSPENDED
        }
        // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
        if (isReusable) {
            // release claimed reusable continuation for the future reuse
            releaseClaimedReusableContinuation()
        }
        val state = this.state
        if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
        // if the parent job was already cancelled, then throw the corresponding cancellation exception
        // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
        // before the block returns. This getResult would return a result as opposed to cancellation
        // exception that should have happened if the continuation is dispatched for execution later.
        //부모가 캔슬 되었으니 전파할게요.
        if (resumeMode.isCancellableMode) {
            val job = context[Job]
            if (job != null && !job.isActive) {
                val cause = job.getCancellationException()
                cancelCompletedResult(state, cause)
                throw recoverStackTrace(cause, this)
            }
        }
        //완료 되었으니 값을 반환할께요.
        return getSuccessfulResult(state)
    }
profile
쉽게 가르칠수 있도록 노력하자

1개의 댓글

comment-user-thumbnail
2023년 5월 24일

좋은 글 감사합니다!

답글 달기