educative - kotlin - 14

Sung Jun Jin·2021년 5월 6일
0
post-custom-banner

Coroutines and Concurrency

Coroutine은 동시성 프로그래밍을 가능하도록 만든 개념이다. 스레드와 기능적으로 같지만, 좀 더 가볍고 유연한 백그라운드 스레드에서 코드를 처리할 때 사용하는 하나의 방법이다.

Concurrency(동시성)는 하나의 작업을 일정량 처리하고 다음 작업을 처리하는 여러 개의 스레드가 번갈아가면서 실행되는 방식이다. 각 스레드들이 병렬적으로 실행되는 것처럼 보이지만 사실은 번갈아가면서 조금씩 실행되고 있는 것이다.

Parallel vs. concurrent

병렬성 처리와 동시성 처리의 차이점에 대한 좋은 그림이다.

여기서 피자를 먹으면서 귀로 상대방의 목소리를 듣는것은 병렬성 처리에 해당된다. 반대로 피자를 먹다가 말을 하는 것은 동시성 처리에 해당된다.

Coroutines as cooperating functions

서브루틴이란 caller에게 리턴하기 전에 완료되는 방식이다. 호출중간에 상태값을 가지지 않는 특성이 있다. 일반적인 목적의 프로그래밍에서 서브루틴은 코루틴보다 더 흔하게 사용된다. 서브루틴은 단 하나의 entry point를 가지는 반면 코루틴은 여러개의 entry point를 가지고 있다. 이 특성을 가지고 우리는 하나의 Producer, Consumer 관계의 cooperating function을 만들 수 있다.

Producer와 Consumer가 코루틴을 통해 작업을 실행하면서 상태를 공유하고 있다.

Starting with sequential execution

먼저 코틀린에서 코드를 실행할때 순차적으로 실행될 것인지, 병렬로 실행된 것일지를 지정하는 방법부터 알아보자. 첫번째로 순차적으로 함수 호출을 실행하는 부분의 예시다.

fun task1() {
  println("start task1 in Thread ${Thread.currentThread()}")
  println("end task1 in Thread ${Thread.currentThread()}")
}

fun task2() {
  println("start task2 in Thread ${Thread.currentThread()}")
  println("end task2 in Thread ${Thread.currentThread()}")
}

println("start")

run {
  task1()
  task2() 
  
  println("called task1 and task2 from ${Thread.currentThread()}")
}

println("done")

task1()task2() 함수는 각각 스레드의 실행정보를 출력하는 함수이다.run() 함수는 Any 클래스의 람다 표현식을 인자로 받는 확장함수이다.

실행결과는 당연히 task1이 완료되고 순차적으로 task2가 실행되는 모습을 확인할 수 있다.

start
start task1 in Thread Thread[main,5,main]
end task1 in Thread Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
called task1 and task2 from Thread[main,5,main]
done

Creating a coroutine

다시 위에있는 task1()task2() 함수를 동시성(concurrently)으로 실행되게 바꿔보자.먼저 kotlinx.coroutines 패키지를 import하고 run()함수를 runBlocking() 함수로 대체했다. runBlocking() 함수는 kotlinx.coroutines 안에 있고 람다를 인자로 받고 동시성으로 실행되게 한다.

import kotlinx.coroutines.*

fun task1() {
  println("start task1 in Thread ${Thread.currentThread()}")
  println("end task1 in Thread ${Thread.currentThread()}")
}

fun task2() {
  println("start task2 in Thread ${Thread.currentThread()}")
  println("end task2 in Thread ${Thread.currentThread()}")
}

println("start")

runBlocking {
  task1()
  task2() 
  
  println("called task1 and task2 from ${Thread.currentThread()}")
}

println("done")

실행결과는 위에 결과랑 똑같다. 코루틴은 동시에 코드를 실행시키고 메인 스레드에서 실행되는 람다가runBlocking() 호출 전후에 실행되기 때문이다.

start
start task1 in Thread Thread[main,5,main]
end task1 in Thread Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
called task1 and task2 from Thread[main,5,main]
done

Launching a task

이번엔 task1()task2()의 함수가 launch() 함수에 의해 각각 다른 코루틴에서 실행되게 해보자. launch() 함수는 새로운 코루틴을 인자로 받아 실행시켜준다.

runBlocking {
  launch { task1() }
  launch { task2() }
  println("called task1 and task2 from ${Thread.currentThread()}") 
}
println("done")

실행결과가 살짝 달라졌다. 시작 메세지와 함께 task1, task2가 수행되고 마지막 종료 메세지가 나온다. 모든 코드는 메인 스레드에서 실행되지만 마지막 출력 함수가 task1, task2 이전에 실행되는 모습을 확인할 수 있다.

start
called task1 and task2 from Thread[main,5,main]
start task1 in Thread Thread[main,5,main]
end task1 in Thread Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
done

Interleaving calls with suspension points

Suspension point랑 함수가 실행하는 중간에 작업을 잠시 중단(suspend)하고고 다른 task를 수행하는 지점을 뜻한다. kotlinx.coroutine 라이브러리의 delay()yield() 함수로 구현할 수 있다. delay()함수는 실행중인 작업을 잠깐 멈추고 yield()는 명시적으로 작업을 멈추는것이 아니라는 차이점이 있다.

suspend 키워드를 사용하는 예시이다.

import kotlinx.coroutines.*

suspend fun task1() {
  println("start task1 in Thread ${Thread.currentThread()}")
  yield()
  println("end task1 in Thread ${Thread.currentThread()}")
}

suspend fun task2() {
  println("start task2 in Thread ${Thread.currentThread()}")
  yield()
  println("end task2 in Thread ${Thread.currentThread()}")
}

println("start")

runBlocking {
  launch { task1() }
  launch { task2() }
  
  println("called task1 and task2 from ${Thread.currentThread()}")
}

println("done")

실행결과 확실한 동시성 방식으로 코드가 돌아가는것을 확인할 수 있다.

start
called task1 and task2 from Thread[main,5,main]
start task1 in Thread Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task1 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
done

Explicitly setting a context

launch()runBlocking() 함수에 CoroutineContext를 넘겨줄 수 있다. 여기서 CoroutineContext의 인자인 Dispatchers.Default는 스레드에서 실행되는 코루틴에게 DefaultDispatch pool에서 실행되도록 지시하는 역할을 한다.

Dispatchers.IO의 값들은 IO 작업이 집중된 코루틴 풀에서 사용할 수 있다.
Dispatchers.Main은 Swing UI를 사용하는 안드로이드 디바이스에서 사용할 수 있다.

runBlocking {
  launch(Dispatchers.Default) { task1() } 
  launch { task2() }
  println("called task1 and task2 from ${Thread.currentThread()}") 
}

위 코드에서 실행하면 task1()은 다른 스레드에서 실행될 것이다. 즉, 병렬로 처리된다.
실행결과

start
start task1 in Thread Thread[DefaultDispatcher-worker-1,5,main] end task1 in Thread Thread[DefaultDispatcher-worker-2,5,main] called task1 and task2 from Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
done

Running in a custom pool

이번엔 커스텀 pool에서 작업이 돌아가게 만들어보자. 먼저 thread executor를 생성하는 방법은 다음과 같다.

  1. java.util.concurrent 패키지에서 Executors API를 사용한다.
  2. 확장함수인 asCoroutineDispatcher()를 사용해 CoroutineContext를 가져온다.
// single.kts
import kotlinx.coroutines.*
import java.util.concurrent.Executors
//...task1 and task2 function definitions as before...

Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { context ->
  println("start")

  runBlocking {
    launch(context) { task1() }
    launch { task2() }

    println("called task1 and task2 from ${Thread.currentThread()}")
  }

  println("done")  
}

실행결과

start
start task1 in Thread Thread[pool-1-thread-1,5,main] 
end task1 in Thread Thread[pool-1-thread-1,5,main] 
called task1 and task2 from Thread[main,5,main] 
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
done

1개의 thread pool 대신 다수의 thread를 사용하고 싶다면 다음과 같이 코드를 바꿔주면 된다

Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) 
  .asCoroutineDispatcher().use { context ->

Switching threads after suspension points

다른 suspension point로 전환하고 싶으면 CoroutineContext 인자와 CoroutineStart 인자를 사용할 수 있다.

launch() 함수의 두번째 인자를 CoroutineStart의 DEFAULT 인자로 지정해준다. 'start() 함수가 실행되기 전까지 기다리고 싶으면 LAZY,
_ATOMIC은 취소가 불가능한 상태,
suspension point 이후 다른 스레드의 전환은 UNDISPATCHED를 사용해준다.

import kotlinx.coroutines.*
import java.util.concurrent.Executors

suspend fun task1() {
  println("start task1 in Thread ${Thread.currentThread()}")
  yield()
  println("end task1 in Thread ${Thread.currentThread()}")
}
suspend fun task2() {
  println("start task2 in Thread ${Thread.currentThread()}")
  yield()
  println("end task2 in Thread ${Thread.currentThread()}")
}

Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
  .asCoroutineDispatcher().use { context ->
  println("start")

  runBlocking {
    @UseExperimental(ExperimentalCoroutinesApi::class)
    launch(context = context, start = CoroutineStart.UNDISPATCHED) { task1() }
    launch { task2() }

    println("called task1 and task2 from ${Thread.currentThread()}")
  }

  println("done")

CoroutineStart.UNDISPATCHED@UseExperimental 어노테이션과 함께 사용해준다.

실행결과. task1()가 pool-1's 스레드 대신 메인 스레드에서 실행됬다. yield()가 있는 suspension point 도달 후, pool-1 스레드로 옮겨 실행됐다.

start
start task1 in Thread Thread[main,5,main]
end task1 in Thread Thread[pool-1-thread-1,5,main] called task1 and task2 from Thread[main,5,main] start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]
done

Changing the coroutine context

withContext() 함수를 사용하면 코루틴 실행 중간에 다른 context로 바꿀 수 있다.

runBlocking { 
  println("starting in Thread ${Thread.currentThread()}")
  withContext(Dispatchers.Default) { task1() }
  
  launch { task2() }
  
  println("ending in Thread ${Thread.currentThread()}")
}

실행결과

starting in Thread Thread[main,5,main]
start task1 in Thread Thread[DefaultDispatcher-worker-1,5,main] 
end task1 in Thread Thread[DefaultDispatcher-worker-1,5,main] 
ending in Thread Thread[main,5,main]
start task2 in Thread Thread[main,5,main]
end task2 in Thread Thread[main,5,main]

Running in debug mode

코틀린 파일을 실행할때 -Dkotlinx.coroutines.debug 옵션을 주면 command-line에서 디버깅 옵션으로 실행된다.

kotlinc-jvm -Dkotlinx.coroutines.debug \
-classpath /opt/kotlin/kotlinx-coroutines-core-1.2.2.jar \-script withcontext.kts

실행결과 조금 더 디테일한 결과를 확인할 수 있다. 각각의 코루틴에 대해 식별자(identifier)를 지정해준다. 용이할수 있지만 디버깅 관점에서는 식별자에 숫자보다는 논리적인 이름(searching for meaning of life)을 코루틴에 달아주는게 더 가독성이 좋다.

starting in Thread Thread[main @coroutine#1,5,main]
start task1 in Thread Thread[DefaultDispatcher-worker-1 @coroutine#1,5,main] 
end task1 in Thread Thread[DefaultDispatcher-worker-3 @coroutine#1,5,main] 
ending in Thread Thread[main @coroutine#1,5,main]
start task2 in Thread Thread[main @coroutine#2,5,main]
end task2 in Thread Thread[main @coroutine#2,5,main]

Assigning names to coroutines

따라서 코루틴에 논리적인 이름을 할당해주는 방법은 CoroutineName() 함수를 사용해주면 된다.

runBlocking(CoroutineName("top")) {
  println("running in Thread ${Thread.currentThread()}") 
  withContext(Dispatchers.Default) { task1() }
  launch(Dispatchers.Default + CoroutineName("task runner")) { task2() } 
  println("running in Thread ${Thread.currentThread()}")
}

실행결과 top, task runnner와 같이 코루틴에 이름이 할당된 모습을 확인할 수 있다.

running in Thread Thread[main @top#1,5,main]
start task1 in Thread Thread[DefaultDispatcher-worker-1 @top#1,5,main]
end task1 in Thread Thread[DefaultDispatcher-worker-3 @top#1,5,main]
start task2 in Thread Thread[DefaultDispatcher-worker-3 @task runner#2,5,main] 
end task2 in Thread Thread[DefaultDispatcher-worker-3 @task runner#2,5,main] 
running in Thread Thread[main @top#1,5,main]

async and await

launch() 함수는 코루틴의 중단을 위해 사용되는 Job Oject를 리턴한다. 그러나 launch() 함수를 사용해 시작된 코루틴의 결과를 리턴하는 방법은 없다. launch() 대신 async()를 사용하면 이 문제가 해결된다.

async() 함수는 launch() 함수와 똑같은 인자를 받는다. 다른점은 async() 함수는 await() 함수를 가지고 있는 Deferred<T> 를 리턴한다는 점이다. 이 객체는 코루틴의 상태를 확인하거나 취소를 할 수 있다. await() 함수를 호출하면 코드의 실행흐름을 제어할 수 있다.

Using async & await

다음 코드에서 우리는 실행가능한 비동기 코어의 개수를 가져올 것이다. Dispatchers.Default 인자는 optional하고 생략 시 상속된 Dispatcher를 실행한다.

import kotlinx.coroutines.*

runBlocking {
  val count: Deferred<Int> = async(Dispatchers.Default) { 
    println("fetching in ${Thread.currentThread()}")
    Runtime.getRuntime().availableProcessors()
  }
  
  println("Called the function in ${Thread.currentThread()}")
  
  println("Number of cores is ${count.await()}")
}

Dispatcher의 요청 후 메인 스레드에서 async() 호출 후 print()문을 실행한다. await() 호출은 코루틴의 종료를 기다리고 마지막 출력 구문은 코루틴의 response를 출력할 것이다.

실행결과

Called the function in Thread[main,5,main]
fetching in Thread[DefaultDispatcher-worker-1,5,main] 
Number of cores is 8 // 코어의 갯수

Preserving the state

스레드 간의 상태 보존은 어떻게 이루어질까? 아래 Compute 클래스는 2개의 메소드로 구성되어 있다. input의 제곱값을 리턴해주는 compute1(), compute2() 메소드도 비슷하지만 실행 흐름을 잠깐 멈추고 잠재적으로 스레드를 바꿀수 있다.

import kotlinx.coroutines.*

class Compute {
  fun compute1(n: Long): Long = n * 2
  suspend fun compute2(n: Long): Long {
    val factor = 2
    println("$n received : Thread: ${Thread.currentThread()}")
    delay(n * 1000)
    val result = n * factor
    println("$n, returning $result: Thread: ${Thread.currentThread()}")
    return result
  }
}

main() 함수를 통해 실행시켜보자.

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
  val compute = Compute()
  
  launch(Dispatchers.Default) {
    compute.compute2(2)
  }
  launch(Dispatchers.Default) {
    compute.compute2(1)
  }
}

실행결과

2 received : Thread: Thread[DefaultDispatcher-worker-1,5,main]
1 received : Thread: Thread[DefaultDispatcher-worker-2,5,main]
1, returning 2: Thread: Thread[DefaultDispatcher-worker-2,5,main] 
2, returning 4: Thread: Thread[DefaultDispatcher-worker-4,5,main]

What are continuations

코틀린에서 분할된 함수를 호출할때마다 특정 코드를 실행시켜 상태를 유지하는 것을 Continuation이라고 한다. 위 예제의 바이트코드를 살펴보자

public final long compute1(long);
public final java.lang.Object compute2(long,
  kotlin.coroutines.Continuation<? super java.lang.Long>);

소스코드 단에서는 compute1()이나 compute2()나 비슷했지만 바이트 코드 레벨에서는 많은 차이가 있다. compute2()에서는 심지어 파라미터의 개수도 다르다, 그리고 long 타입 대신 Object를 리턴하는 차이도 보인다.

Continuation<?superLong>은 부분 실행된 함수의 결과를 캡슐화하고 호출부로 전달해준다. 컴파일러는
상태를 보존하는 continuation 작업을 알아서 실행해 코드를 짜는 프로그래머는 이를 활용하는데 더 집중할 수 있다.

Using sequence

코틀린에서 제공하는 sequence 라이브러리는 일련의 값들을 생성하는데 사용된다. 여기서 primes() 함수를 만들어 시작 숫자부터 무한대로 Sequence<Int>를 통해 다음 소수를 반환하는 코드를 짜보자.

fun primes(start: Int): Sequence<Int> = sequence {
  println("Starting to look")
  var index = start
  
  while (true) {
    if (index > 1 && (2 until index).none { i -> index % i == 0 }) {
      yield(index)
      println("Generating next after $index")
    }
    
    index++
  }
}

for (prime in primes(start = 17)) {
  println("Received $prime")
  if (prime > 30) break
}

"""
실행결과

Starting to look
Received 17
Generating next after 17
Received 19
Generating next after 19
Received 23
Generating next after 23
Received 29
Generating next after 29
"""

sequence 함수는 다음과 같은 장점을 가지고 있다.

  • 컬렉션을 미리 생성해 놓을 필요가 없다. 즉 얼만큼의 값을 미리 만들어놔야 하는지 알필요가 없다.
  • 시간에 따른 값 생성 비용을 감안할 필요 없이 이미 생성된 값을 사용하면 된다
  • lazy 하게 값이 필요할때마다 연산이 이루어지기 때문에 미리 계산해둘 필요가 없다.

Sequence<T> 와 비슷하게 iterator() 함수를 사용해 비슷한 예제를 만들 수 있다.

Using iterator()

operator fun ClosedRange<String>.iterator(): Iterator<String> = iterator {
  val next = StringBuilder(start)
  val last = endInclusive
  
  while (last >= next.toString() && last.length >= next.length) {
    val result = next.toString()
    
    val lastCharacter = next.last()
    
    if (lastCharacter < Char.MAX_VALUE) {
      next.setCharAt(next.length - 1, lastCharacter + 1)        
    } else {
      next.append(Char.MIN_VALUE)        
    }
    
    yield(result)
  }
}

for (word in "hell".."help") { print("$word, ") }

iterator() 함수는 Iterator<T>를 반환한다. 함수의 인자로 넘겨진 람다 표현식에서 다음 문자열을 생성하고 yield() 함수를 호출한다.

profile
주니어 개발쟈🤦‍♂️
post-custom-banner

0개의 댓글