[Android][Kotlin] coroutine (2)

D.O·2024년 4월 19일
0

이전 글에 이어서 다시 코루틴에 관하여…

코루틴의 예외 처리

개발 중에 예외는 빈번하게 일어남, 코루틴 내부에서 예외가 발생하면 어떻게 될까?

위 예제와 같이 launch 내부에서 예외가 발생하면 예외가 발생하고 앱이 죽음

코루틴에서 예외를 다루는 방법

try-catch

기본적으로는 에러가 나올 수 있는 로직을 try-catch 구문으로 감싸서 처리 가능
대부분의 에러 처리는 이 방식으로 처리하는 것을 권장 합니다.

주의 할 점으로 async의 경우 블록 안에서 예외가 발생하면 이를 즉시 처리하는게 아니라 나중에 await()가 불리는 시점에 처리함. try-catch를 await() 시점에 사용하면 됨

주의 할점은 launch 블럭 내부에서 발생하는 예외를 외부에서 try-catch로 잡으려하는 경우 예외 처리가 되지 않음. launch 내부에서 발생한 에러를 외부에서 처리하려면 CoroutineExceptionHandler를 활용해야 함

CoroutineExceptionHandler

CoroutineExceptionHandler는 코루틴의 전역 예외 처리자로 사용됩니다.
이 핸들러는 코루틴 컨텍스트에 추가되며, 코루틴 내부에서 처리되지 않은 예외가 발생했을 때 호출

Handler가 적용된 Scope 내부에서 에러가 발생하면 Scope에 설정된 에러 핸들러에서 에러를 받아서 처리 가능

그러나 이 핸들러는 최상위 코루틴에서만 작동하며, 자식 코루틴에서는 예외를 직접 처리해야 합니다.

취소에서 부모 자식 관계

그렇다면 이제 상상해보자. launch 내부에서 여러 launch가 수행되고 있다면 어떻게 될까?
취소와 마찬가지로 에러가 발생하면 자식 코루틴들은 취소

하지만 취소와 다르게 자식에서 에러가 발생하면 부모 코루틴도 취소가 됨 → 취소 요청을 받은 부모 코루틴은 다시 모든 자식 코루틴(결과적으로 형제 코루틴)을 취소
결과적으로 계층 구조 내 모든 코루틴에 에러가 전파

아래 코드의 경우 중간에 가장 밑에 있는 자식에서 예외가 발생하지만 전체 취소되는 예제


import kotlinx.coroutines.*

fun main(): Unit {

    runBlocking {
        launch {
            launch {
                delay(500)
                println("After 500ms")
            }

            launch {
                delay(1000)
                throw Exception()
            }

            launch {
                delay(1500)
                println("After 1500ms")
            }
        }

        launch {
            delay(2000)
            println("After 2000ms")
        }
    }
}

부모 자식 관계로 인해 에러가 전파되기 때문에 1000ms 이후에 Scope 내 모든 코루틴이 취소된다.

SupervisorJob

만약 그렇다면 한 코루틴이 죽어도 다른 코루틴이 죽지 않게 하고 싶다면?

즉, 코루틴의 에러 전파 범위를 제어하고 싶다면 SupervisorJob을 사용 하면 됩니다.

SupervisorJob은 코루틴의 부모-자식 계층에서 예외 처리 방식을 변경하는 데 사용되는 코루틴 작업(Job)의 한 유형입니다.

기본적으로 코루틴의 Job은 자식 코루틴에서 발생한 예외를 부모 코루틴으로 전파합니다. 이는 하나의 자식 코루틴에서 발생한 예외가 다른 코루틴들에게도 영향을 줄 수 있음을 의미합니다. 그러나 SupervisorJob을 사용하면 이러한 동작이 변경되어 자식 코루틴의 실패가 자동으로 다른 자식 코루틴에 전파되지 않습니다.

이는 UI와 같이 여러 비동기 작업이 병렬로 실행되면서, 특정 작업의 실패가 다른 작업의 실행에 영향을 미치지 않아야 하는 상황에서 유용합니다.


fun main(): Unit {

    val handler = CoroutineExceptionHandler { _, e ->
        println("catch e : $e")
    }

    val scope = CoroutineScope(EmptyCoroutineContext  + handler)

    scope.launch {
        delay(500)
        throw Exception("Error")
    }

    scope.launch {
        delay(1000)
        println("after 1000ms")
    }

    Thread.sleep(2000)
}

[result]
catch e : java.lang.Exception: Error

이 경우 첫번째 코루틴의 에러를 처리하더라도 전파가 이루어져서 두번째 코루틴 또한 종료됩니다.

아래 코드의 경우 전파가 부모인 스코프에게 전달이 되더라도 다른 자식들에게 전파하지 않습니다.



fun main(): Unit {

    val handler = CoroutineExceptionHandler { _, e ->
        println("catch e : $e")
    }

    val scope = CoroutineScope(EmptyCoroutineContext + SupervisorJob() + handler)

    scope.launch {
        delay(500)
        throw Exception("Error")
    }

    scope.launch {
        delay(1000)
        println("after 1000ms")
    }

    Thread.sleep(2000)
}

코루틴 동기화

코루틴은 기본적으로 한 스레드 내에서 실행될 때 동기화에 대한 문제가 크게 발생하지 않습니다.

코루틴은 스레드와 달리 경량화된 실행 단위로서, 한 스레드 내에서 실행되는 코루틴들은 코루틴 디스패처(Coroutine Dispatcher)에 의해 관리되고, 같은 스레드에서 순차적으로 실행됩니다

따라서, 한 스레드 내에서는 별도의 동기화 없이도 상태의 일관성을 유지할 수 있습니다.

그러나 코루틴이 여러 스레드에서 실행될 경우, 예를 들어 Dispatchers.DefaultDispatchers.IO를 사용하는 경우, 여러 스레드가 동일한 자원에 접근할 수 있으므로 동기화를 고려해야 합니다.

참고로 메인 스레드는 애플리케이션에서 하나의 스레드로 동작하는 반면 Dispatchers.DefaultDispatchers.IO는 다중 스레드 환경에서 코루틴을 실행하도록 설계되어 있습니다.

Dispatchers.Default:

  • 이 디스패처는 CPU 사용량이 많은 작업을 위해 최적화된 디스패처입니다.
  • 기본적으로 사용 가능한 CPU 코어 수에 비례하는 스레드 풀을 가지고 있습니다.

Dispatchers.IO:

  • 이 디스패처는 입출력 중심의 작업, 예를 들어 네트워크 요청이나 파일 입출력을 처리하기 위해 설계되었습니다.
  • Dispatchers.IO 역시 다중 스레드를 사용하여 입출력 작업을 병렬로 수행할 수 있도록 지원합니다.

아래 예제는 1000000번 count가 오르기를 의도하지만 실제로는 여러 코루틴이 여러 쓰레드에서 하나의 count 변수를 접근하면서 ++ 연산이 제대로 되지 않게 됨


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.EmptyCoroutineContext

fun main() {

    val scope = CoroutineScope(EmptyCoroutineContext)
    var count = 0

    runBlocking {
        val job = scope.launch {

            repeat(1000) {

                launch {
                    repeat(1000) {
                        count++
                    }
                }

            }

        }

        job.join()
        println("count : $count")
    }
}

해당 코드에서 count++는 사실상 세 개의 동작으로 구성되어 있습니다:

  1. count의 현재 값을 읽어옵니다.
  2. 이 값을 1 증가시킵니다.
  3. 증가된 값을 다시 count에 저장합니다.

여러 코루틴이 이 연산을 동시에 수행하면, 어느 하나의 코루틴이 값을 읽고, 또 다른 코루틴도 같은 값을 읽은 후 각각이 값을 변경하고 저장할 때, 원래 기대했던 것보다 적은 증가가 일어날 수 있습니다.

kotlin에서는 여러 방법을 통해 동기화 기능을 제공하는데요

동기화

synchronized

synchronized를 사용하여 동기화를 수행할 수 있습니다.


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.EmptyCoroutineContext

fun main() {

    val scope = CoroutineScope(EmptyCoroutineContext)
    var count = 0

    runBlocking {
        val job = scope.launch {
            repeat(1000) {
                launch {
                    repeat(1000) {
                        synchronized(Unit) {
                            count++
                        }
                    }
                }
            }
        }
        job.join()
        println("count : $count")
    }
}

synchronized는 lock을 통해서 한 번에 단 하나의 쓰레드만 내부 블럭을 실행하도록 합니다.

하지만 synchronized는 블럭 내부에서 중단 함수를 콜 할 수 없고 그로 인해 다른 쓰레드가 블로킹이 되버릴 수 있기 때문에 효율적이지 못 하다고 합니다.

중단 함수를 콜 할 수 없는 이유는 synchronized자바의 동기화 메커니즘에서 유래되었으며, 코틀린에서도 동일하게 스레드 레벨의 동기화를 제공하기때문에 코루틴 작업 단위까지 다룰 수 없다고 합니다.

Aotomic

Atomic 객체는 연산을 할 때 여러 쓰레드가 충돌해도 원자성을 보장해줄 수 있음 즉, 한 번에 완전히 실행되거나 전혀 실행되지 않는 성질을 가진 연산을 말합니다.

Atomic 원리는 내부적으로 Compare and Swap을 사용. 연산을 할 때 예상값과 동일한 경우에만 값을 업데이트, 즉 경합이 일어나서 예상값과 달라지면 업데이트를 취소하고 다시 예상값을 산정해서 업데이트 시도

CoroutineScope에서 실행될 수 있고, 쓰레드간 블로킹이 일어나지 않기 때문에 간단한 공유자원의 경우 권장하지만 여러 라인을 블럭단위로 막아주는 것은 아니기 때문에 복잡한 로직은 원자성을 보조 못 함


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.EmptyCoroutineContext

fun main() {

    val scope = CoroutineScope(EmptyCoroutineContext)
    var count = AtomicInteger(0)

    runBlocking {

        val job = scope.launch {
            repeat(1000) {
                launch {
                    repeat(1000) {
                            count.incrementAndGet()
                    }
                }
            }
        }

        job.join()
        println("count : $count")
    }
}

SingleThreadDispatcher

SingleThreadDispatcher는 Kotlin 코루틴에서 사용하는 디스패처 유형 중 하나로, 모든 코루틴 작업을 단일 스레드에서 실행하도록 강제하는 디스패처


import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() {

    val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    val scope = CoroutineScope(singleDispatcher)
    var count = 0

    runBlocking {

        val job = scope.launch {
            repeat(1000) {
                launch {
                    repeat(1000) {
                            count++
                    }
                }
            }
        }

        job.join()
        println("count : $count")
    }
}

이 방식은 여러 쓰레드를 쓰지 못하기 때문에 성능적인 한계가 있음

Mutex

Mutex를 사용하면 구간에 대해서 다른 쓰레드의 접근을 막을 수 있음 synchronized와 비슷하게 withLock 을 사용 할 수 있음

아까 syncronized 내부에서는 중단 함수 사용이 불가능해서 스레드가 무조건 블로킹된다고 했습니다. mutex의 syncronized에 비해 큰 장점은 경합이 발생한 경우 쓰레드를 블로킹 하는 것이 아니라 중단 시키는 것


import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.EmptyCoroutineContext

fun main() {

    val mutex = Mutex()
    val scope = CoroutineScope(EmptyCoroutineContext)
    var count = 0

    runBlocking {

        val job = scope.launch {
            repeat(1000) {
                launch {
                    repeat(1000) {
                        mutex.withLock {
                            count++
                        }
                    }
                }
            }
        }

        job.join()
        println("count : $count")
    }
}

Semaphore

Mutex와 비슷하게 Semaphore를 사용하면 구간에 대해서 다른 쓰레드의 접근을 막을 수 있습니다.

차이점으로는 접근 쓰레드를 여러개로 설정 할 수 있다는 것입니다.

세마포어 생성자에 숫자로 접근 스레드 숫자를 명시가능 합니다.


import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import kotlin.coroutines.EmptyCoroutineContext

fun main() {

    val mutex = Semaphore(1)
    val scope = CoroutineScope(EmptyCoroutineContext)
    var count = 0

    runBlocking {

        val job = scope.launch {
            repeat(1000) {
                launch {
                    repeat(1000) {
                        mutex.withPermit {
                            count++
                        }
                    }
                }
            }
        }

        job.join()
        println("count : $count")
    }
}

일정 수의 코루틴만이 특정 자원에 접근하도록 제한하는 동기화 도구이다.

동시에 많은 액세스가 가능하지만 제한된 자원을 관리하는데 유용합니다.

Channel

코틀린에서 Channel코루틴 간의 통신을 위해 사용되는 개념입니다

Channel은 코루틴이 서로 데이터를 안전하게 전달할 수 있도록 도와주는 도구로, Channel 자체는 Queue와 비슷하게 동작합니다.

다만 Channel은 Element의 삽입과 추출이 Suspending Function으로서 여러 코루틴이 한 채널에서 쓰레드를 블로킹시키지 않고 안전하게 하나의 Element를 넣거나 가져올 수 있음

이를 이용해 여러 코루틴에서 데이터를 안전하고 효율적으로 주고 받고 싶을 때 사용
send를 통해 삽입하고, receive를 통해 추출

특징

생산자-소비자 패턴: Channel을 통해 하나의 코루틴(생산자)이 데이터를 보내고, 다른 코루틴(소비자)이 그 데이터를 받아 처리할 수 있습니다.

Channel의 경우 병목 현상이 일어나도 쓰레드 블로킹이 아닌 중단이 되는 장점이 있음


import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() {

    runBlocking {
        val channel = Channel<Int>()

        launch {
            for (i in 0 .. 10) {
                channel.send(i)
                delay(100)
            }
            channel.close()
        }

        launch {
            for (i in 0 .. 10) {
                println(channel.receive())
            }
        }

    }

}

• 버퍼링과 버퍼 없음: Channel은 버퍼링될 수 있으며, 버퍼의 크기에 따라 송신 코루틴의 블로킹 여부가 결정됩니다. 버퍼가 가득 찬 경우, 추가 데이터를 보낼 때 송신 코루틴이 일시 중지될 수 있습니다.

채널의 데이터 한계는 Channel() 생산자 인자로 capacity를 설정


import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() {

    runBlocking {
        val channel = Channel<Int>(2)

        launch {
            for (i in 0 .. 10) {
                channel.send(i)
                println("send $i")
            }
            channel.close()
        }

        launch {
            for (i in 0 .. 10) {
                delay(500)
                println(channel.receive())
            }
        }

    }

}


result

send 0
send 1
0
send 2
1
send 3
2
send 4
3
send 5
4
send 6
5
send 7
6
send 8
7
send 9
8
send 10
9
10

Flow

Flow란?
→ 리액티브 프로그래밍을 지원하기 위한 API로서, 데이터 스트림을 의미함

Flow는 Kotlin에서 비동기 스트림을 다루기 위한 개념으로, Coroutine을 기반으로 동작합니다

Reactive Programming?
연속된 데이터의 변화를 관찰(Observer Pattern)하고
변화된 데이터를 가공(Functional Programming)해서 지속적으로 전파

Observer 패턴

Observer 패턴은 프로그래밍에서 널리 사용되는 디자인 패턴 중 하나입니다

이 패턴의 핵심 목적은 하나의 객체(주제 또는 발행자)가 상태 변경을 여러 다른 객체(옵저버)에게 자동으로 알리고, 그 객체들이 그 변경을 감지하여 적절한 동작을 수행할 수 있도록 하는 것입니다.

데이터가 발행될 때 CallBack으로 업데이트

데이터 변화를 스트림으로 해석
스트림의 데이터가 변할 때마다 콜백 실행

flow를 사용하면 데이터 생산자와 소비자 사이의 결합도를 낮추어, 각각의 컴포넌트가 독립적으로 개발하고 테스트될 수 있습니다. 즉, 데이터 생산자는 소비자가 어떻게 소비하는지 알 필요가 없고, 소비자는 데이터 생산자가 어디서, 어떻게 데이터를 생산하는지 알 필요가 없습니다.

flow 생성 방법

  1. flow 빌더

가장 기본적인 방법으로, flow 블록 내부에서 emit 함수를 사용하여 값들을 방출


import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect

fun produceNumbers() = flow {
    val numbers = listOf(1, 2, 3)
    for (number in numbers) {
        emit(number)  // 값 방출
    }
}

fun main() = runBlocking {
    produceNumbers().collect { number ->
        println(number)  // 수집 및 출력
    }
}

2. flowOf

정적인 값들 또는 이미 결정된 값들의 시퀀스를 생성할 때 flowOf를 사용할 수 있습니다.

`` kotlin

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect

fun main() = runBlocking {
val numberFlow = flowOf(1, 2, 3)
numberFlow.collect { number ->
println(number)
}
}


**3. asFlow 확장 함수**

**기존 컬렉션 또는 시퀀스**에서 **`Flow`**를 생성할 때 사용할 수 있습니다.

``` kotlin

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect

fun main() = runBlocking {
    val numbers = listOf(1, 2, 3).asFlow()
    numbers.collect { number ->
        println(number)
    }
}

StateFlow

StateFlow는 Kotlin의 코루틴 라이브러리인 kotlinx.coroutines 내에 있는 특수한 유형의 Flow로, 상태 관리를 위해 설계되었습니다.

Flow + StateHolder

현재 상태 보존 : Flow와 같이 데이터 스트림인데, 마지막 상태 값을 저장하고 있음
변경이 일어난 데이터만 방출 : 이벤트가 발생해도 데이터가 그대로면 방출 안 함

주로 뷰의 상태를 나타내는데 사용

위와 같은 특성으로 인해 뷰가 재생성되도 마지막 상태를 읽고 렌더링 가능하며, 변경이 되었을 때만 새로 그릴 수 있게 됨

MutableStateFlow(초기값)으로 생성
asStateFlow로 불변성 부여
update로 값 갱신
collectAsState로 컴포즈 상태로 변경

Flow vs StateFlow - Cold Stream vs Hot Stream

데이터를 발행하는 방식에 따라 스트림은 Cold, Hot 두 가지 형태로 나뉨

  • Cold Stream
    느리게 데이터를 발행. 굳이 요청을 하지 않으면 발행 시작 안 함
    데이터를 발행하는 방식을 정의하고, 실제 구독자가 나타나면 데이터를 생성해서 전달
  • Hot Stream
    빠르게 데이터를 발행. 누가 요청을 하지 않아도 발행 시작
    데이터가 발행되는 즉시 이벤트를 방출하고, 자신을 구독하는 곳에 모두 브로드캐스팅

Flow - Cold Stream


import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flow

fun main() {
    runBlocking {
        val flow = flow {
            (1..4).forEach {
                delay(100)
                emit(it)
            }
        }
        
        launch {
            flow.collect {
                println("1st: $it")
            }
        }

        launch {
            delay(1000)
            flow.collect {
                println("2nd: $it")
            }
        }
    }
}


result

1st: 1
1st: 2
1st: 3
1st: 4
2nd: 1
2nd: 2
2nd: 3
2nd: 4import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flow

fun main() {
    runBlocking {
        val flow = flow {
            (1..4).forEach {
                delay(100)
                emit(it)
            }
        }
        
        launch {
            flow.collect {
                println("1st: $it")
            }
        }

        launch {
            delay(1000)
            flow.collect {
                println("2nd: $it")
            }
        }
    }
}


result

1st: 1
1st: 2
1st: 3
1st: 4
2nd: 1
2nd: 2
2nd: 3
2nd: 4

StateFlow: UI 상태 관리와 같이 앱 전반에 걸쳐 일관된 상태를 유지해야 하는 경우에 적합합니다.
재구성에서도 최신 상태를 유지하기에 ui 상태 관리에 적합합니다.

stateIn 연산자는 Kotlin에서 FlowStateFlow로 변환할 때 사용됩니다.

stateIn 연산자는 다음과 같은 주요 매개변수를 받습니다:

  • scope: StateFlow가 작동할 코루틴 스코프를 지정합니다. 이 스코프 안에서 StateFlow의 수명이 관리됩니다.

Flow를 계속 관찰 할 코루틴이 필요, 해당 코루틴을 수행할 Scope을 넘겨줘야 함

  • started: SharingStarted 설정을 통해 StateFlow어떻게 시작될지를 결정합니다. 예를 들어, SharingStarted.Lazily구독자가 처음으로 구독을 시작할 때 Flow가 활성화되도록 하며, SharingStarted.Eagerly즉시 Flow를 시작합니다.
  • initialValue: StateFlow시작할 때 사용할 초기 값입니다



import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    runBlocking {
        val numbersFlow = flowOf(1, 2, 3, 4).onEach { delay(1000) } 

        val stateFlow: StateFlow<Int> = numbersFlow
            .stateIn(
                scope = this, 
                started = SharingStarted.Eagerly,
                initialValue = 0
            )

        launch {
            delay(2500) 
            stateFlow.collect { value ->
                println("Received: $value")
            }
        }

        launch {
            delay(4000)
            stateFlow.collect { value ->
                println("Received2: $value")
            }
        }

    }
}

result

Received: 2
Received: 3
Received2: 3
Received: 4
Received2: 4


import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    runBlocking {
        val numbersFlow = flowOf(1, 2, 3, 4).onEach { delay(1000) }

        val stateFlow: StateFlow<Int> = numbersFlow
            .stateIn(
                scope = this,
                started = SharingStarted.WhileSubscribed(), 
                initialValue = 0 
            )

        launch {
            delay(2500) 
            stateFlow.collect { value ->
                println("Received: $value")
            }
        }

        launch {
            delay(4000) 
            stateFlow.collect { value ->
                println("Received2: $value")
            }
        }

    }
}

result

Received: 0
Received: 1
Received2: 1
Received: 2
Received2: 2
Received: 3
Received2: 3
Received: 4
Received2: 4

sharedFlow

sharedFlow는 특이하게 내부적으로 buffer로 과거의 데이터를 저장할 수 있고 collect 시 replay 설정 값만큼 이전 데이터를 전달 받을 수 있습니다.


import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    runBlocking {
        val sharedFlow = MutableSharedFlow<Int>(replay = 3)

        launch {
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
        }

        launch {
            delay(1000) 
            sharedFlow.collect { value ->
                println("Subscriber 1: $value")
            }
        }

        launch {
            delay(2500) 
            sharedFlow.collect { value ->
                println("Subscriber 2: $value")
            }
        }
    }
}

또한 stateFlow와 달리 기본 값을 가질 수 없고 중복 값에 대해서 데이터 발행의 skip을 처리하지 않습니다.

profile
Android Developer

0개의 댓글