Kotlin Coroutine Deep Dive

Park Jae Hong·2023년 4월 12일
0

Coroutine 이란 ?

: Coroutine에서 co는 함께/동시에 라는 의미가 있다. 고로, Coroutine이란 동시에 실행되는 비동기적 으로 실행되는 코드라고 할 수 있다.


Coroutine은 왜 나왔을까 ?

: Coroutine을 알기 전에 Process 와 Thread 라는 개념을 알아야 한다.

❗️ Process 란 ?

: Process는 컴퓨터에서 연속적으로 실행되고 있는 프로그램을 의미한다. 또한, 프로그램이 각 할당된 Heap 메모리에 적재되어 실행되고 있는 인스턴스를 의미하기도 한다.

특징

  • 서로 완벽한 독립 공간을 가지고 있어, 하나의 Process 종료되어도 다른 Process 에는 영향을 주지 않는다.

  • 하나의 프로그램에 여러 Process를 생성할 수 있다.

  • 프로세스는 병행 실행이 가능하다.
    단, 병행 실행하기 위해선 프로세스간의 통신이 필요하다.

  • 독립적으로 실행되기 때문에 동기화를 하지 않아도 된다.

❗️ Thread 란 ?

: Thread는 하나의 Process에 실행되는 여러 흐름의 단위이다.Process내의 별도의 영역에 저장되고 Process와 다르게 Heap 영역이 아닌 Stack 영역에 할당된다.

특징

  • Thread는 본질적으로 Process에 속해있기 때문에 Thread 간의 자원 공유가 가능하다.

  • 자원을 공유하면서 사용하므로 통신 비용이 절감하고 메모리가 효율적이다.

  • Process 비해 Context Swithing 비용이 적다.

  • 단, 자원을 공유하기 때문에 공유자원을 관리해줘야 한다.(동기화 작업)
    (만약 서로 다른 Thread 가 동시에 공유 자원에 접근하면 교착 상태(Deadlock) 에 빠질 수 있다.)

❗️ Coroutine

그렇다면 Coroutine은 왜 나왔을까 ?
: 사실 Thread 와 Coroutine 의 개념이나 동작 방식은 유사하다.
(Light-Weight Thread라 불릴만큼 동작하는 결과는 Thread와 유사
차이점은 Thread는 Process에 종속되지만 Coroutine는 Thread에 종속되지 않는다.)
하지만 Thread의 경우에는 각각의 Thread 마다 메모리에 직접 할당하기 때문에 많은 비용이 발생하게 되는데,
Coroutine의 경우는 Object를 Thread를 할당 함으로써 처리 비용, Context Switching 비용 등을 최대한으로 줄이기 위해 고안된 기술이다.

결과적으로, Process 에서 Thread가 생긴 개념처럼 제한된 메모리를 더욱 효율적으로 관리하기 위해서 생겨난 개념이라고 생각하면 좋을것이다.

Kotlin Coroutine 내부 살펴보기

suspend function

: Kotlin Coroutine의 suspend function 인 CPS(Continuation Passing Style: 연속체를 전달하는 방식) 기반으로 작동하게 된다. 이때 Continuation(연속체) 이라는 개념이 중요하다.
Continuation 내부를 살펴보면 아래 처럼 구현되어 있다.

public interface Continuation<in T> {
   
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

설명을 보면, "성공 또는 실패 결과를 마지막 중단 지점의 반환 값으로 전달하는 해당 코루틴의 실행을 재개합니다." 라고 되어있다.
즉, suspend function 은 Continuation에 실행 상태와 시점을 받은 정보를 통해 Coroutine 내에서 ContextSwithing 을 하면서 동시성 처리가 가능하게 되는 것이다.

❗️ Continuation의 무슨 정보를 가지고 어떻게 동작하는 것인가 ? (State Machine)

간단하게 설명하면, suspend 로 작성한 함수는 label 을 가지게 된다.
그럼 해당 함수와 label을 Continuation을 통해서 함수를 처리 후에 label을 1 증가 시켜
다시 보냈던 suspend 함수로 실행 결과와 증가 시킨 label을 보내서 다음 suspend를 실행할 수 있게 해준다.
이때, 라벨링이 될때 suspend 함수 기반으로 라벨링이 되기 때문에 일반 함수를 같이 사용하면 다음 suspend 함수나 return 이 나오기 전까지 같이 실행된다.


글로 이해하면 어렵기 때문에 코드로 살펴 보면,
suspend 키워드가 붙은 함수는 내부의 suspend 함수 기준으로 라벨링이 된다.

suspend fun getUser(): User {
	// label 0
	val userName = getName() <- suspend 
    
    // label 1
    val profile = getProfile() <- suspend 
    val userId = getId(profile)
    
    // label 2
    return User(userId,userName)
}

그럼 컴파일 시점에 라벨을 기준으로 when으로 바뀌게 된다.

	when(label) {
    	0 -> {
        	val userName = getName() <- suspend 
        }
        
        1 -> {
        	val profile = getProfile() <- suspend 
    		val userId = getId(profile)
        }
        2 -> {
        	return User(userId,userName)
        }
    }

그렇다면, 함수 내부의 변수들은 어떻게 관리되는 것일까 ?
컴파일 과정을 간단하게 표현하면,

해당 Suspend 함수 parameter에 Continuation이 생성되고, 내부에 는 "함수명 + StateMachine" 클래스가 생성 돼, 해당 변수를 null로 초기화 해서 관리되고 있다.

fun getUser(completion: Continuation<Any?>): User {
	class GetUserStateMachine(
    completionL: Continuation<Any?>
    ): CoroutineImpl(completion) {
    	var userName: String? = null
        var profile: Profile? = null
        var userId: String? = null
        
        var result: Any? = null
        var label: Int = 0
        
        override fun invokeSuspend(result: Any?) {
        	this.result = result
            getUser(this)
        }
    }
}

이렇게 생성된 StateMachine 은 continuation이 라는 변수로 GetUserStateMachine 을 타입캐스팅 해서 사용되고 있다.

fun getUser(completion: Continuation<Any?>): User {
	val continuation = completion as? GetUserStateMachine ?: GetUserStateMachine(completion)
	when(continuation.label) {
    	0 -> {
        	val userName = getName() 
            continuation.label = 1
            getUser(continuation)
        }
        
        1 -> {
        	val profile = getProfile()
    		val userId = getId(profile)
            continuation.label = 2
            getUser(continuation)
        }
        2 -> {
        	User(userId,userName)
        }
    }
}

Kotlin Coroutine 사용방법

: Coroutine의 사용은 크게 반환값이 없는 객체(Job)와 반환값이 있는 객체(Deferred) 두가지로 나뉜다. 반환값이 없는 경우 launch, 반환값이 있는 경우 asyuc 를 사용하면 된다.

launch

: launch 는 연산이 실패한 경우에만 통보를 받는 구조이다.
예를 들면, 아래처럼 코드를 작성할 경우 실행은 중단되지 않고 Exception 값만 통보 받는 것이다.

runBlocking {
  val task = GlobalScope.launch {
    generateException()
  }

  task.join()
}

fun generateException() {
  throw Exception("Can't do")
}

또한, launch는 Job 객체를 반환한다.

Job

: Fire and Forget(실행 후 무시) 구조로 설계되어 있는데,
한번 시작된 작업은 예외가 발생하지 않는 한 대기하지 않고 Job 내부에서 발생하는 예외는 Job을 생성한 곳까지 전파되기 때문에, 완료되기를 기다리지 않아도 발생한다.

  • 생성
    : launch 나 job()을 통해 생성할 수 있고, 기본적으로 Job()은 생성될 때 자동으로 시작한다. 자동으로 시작을 원하지 않을 때는 CoroutineStart.LAZY를 사용하면 된다.

  • 활성
    : 활성상태에 있는 Job을 다양한 방법으로 실행시킬수 있는데, 일반적으로 start() 나 join() 으로 실행시킨다.
    두 방법의 차이점은 start()의 경우는 Job이 완료될 때 까지 기다리지 않고 join()의 경우는 Job이 완료할 때 까지 대기한다는 차이점이 있다.
    (start()의 경우 suspend 키워드가 필요없지만 join()의 경우에는 함수를 중단하기 때문에 suspend 키워드가 필요하다.

  • 취소 중
    : 취소 요청을 받은 활성 Job은 취소중이라는 스테이징 상태로 들어갈 수 있다.

  • 취소
    : 취소 또는 처리되지않은 예외로 인해 실행이 종료된 Job은 취소됨으로 간주된다.

또한, Job 객체는 한 방향으로만 이동한다는 특징이 있는데,

runBlocking {
  val task = GlobalScope.launch {
  	delay(2000)
  }

  task.join()
  
  //Restart
  job.start()
  job.join()
}

Job은 특정 상태에 도달하면 이전 상태로 되돌아가지 않는다. 위 코드에서 처음 호출한 job.join()이 완료되면 완료됨 상태에 도달했으므로 start()를 호출해도 아무런 변화가 없을 것이다.

코루틴이 무거운 작업을 하고 있을 땐, job.cancel() 이 작동하지 않는다. 이때 주기적으로 yeild() 나 isActive 로 코루틴에게 작업을 취소할 수 있는 여지를 제공한다.

자식이 Exception을 뱉어버리면 전역으로 퍼지게 되어 일을 중단하게 되는데 Exception이 발생하게 될 때 부모에게는 퍼지지 않게 하기위해, SupervisorJob()을 사용하게 된다. 그렇다면, 자식들이 취소되어도 다른 자식들은 이어서 일을 진행할 수 있다.


async

: async는 결과 처리를 목적으로 Coroutine을 시작할 경우 사용된다.
또한, async()는 Deferred 를 반환하게 되는데,
이는 취소 불가능한 Non-Blocking Cancellable Future를 의미한다.

async로 만들어진 Coroutine은 join() 과 await()로 실행할 수 있다.

차이점

  • join()
    : join()으로 실행할 경우 Job으로 실행되기 때문에 연산이 실패했을 경우 실행은 중단되지 않고 실패 정보를 통보 받을 수 있다
  • await()
    : await()으로 실행할 경우 연산이 실패했을 경우 에러가 발생해 실행이 중단된다.

코드로 보면, join의 경우 예외만 통보 받을 뿐 실행이 중단 되지않고, await는 Exception 이 떨어지면서 실행이 실패하는 것을 볼 수 있다.

runBlocking {
  val task = GlobalScope.async {
    generateException()
  }

  task.join()
  task.await()
}

fun generateException() {
  throw Exception("Can't do")
}

Coroutine Context

: Coroutine은 항상 Context 안에서 실행된다. Context는 어떻게 실행되고 동작해야하는지 정의할 수 있게 해주는 요소들의 집합이다. Context 또한 결합될 수 있고 분리하면 제거도 가능하다.

withContext
: 이미 일시 중단 함수 상태일 때 withContext()를 사용해서 Context를 변경할 수 있다.
사실 withContext()는 async 와 동일한 동작을 하는 함수 인데,
차이점은 await() 를 호출할 필요가 없으면 마지막 구문에 해당하는 결과가 리턴될 때까지 기다리기 때문에
프로세스에 Job을 포함시키지 않고도 다른 Context로 전환할 수 있게 해준다.
그래서 에러가 발생했을 경우 실행에는 영향을 주지않고 에러 값을 통보 받을 수 있다.

profile
The people who are crazy enough to think they can change the world are the ones who do. -Steve Jobs-

0개의 댓글