코틀린 코루틴 3부 -1

벼리·2025년 11월 16일

코루틴

목록 보기
3/4

채널

채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있습니다.

Channel은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스입니다.

  • SendChannel은 원소를 보내거나 채널을 닫는 용도로 사용됩니다.
  • ReceivewChannel은 원소를 받을 때 사용됩니다.
interface SendChannel<in E> {
	suspend fun send(element: E)
	fun close(): Boolean
	// ...
}
interface ReceiveChannel<out E> {
	suspend fun receive(): E
	fun cancel(cause: CancellationException? = null)
	// ...
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

두 인터페이스는 구분되어 있으며, 채널의 진입점을 제한하기 위해 ReceiveChannel이나 SendChannel 중 하나만 노출시키는 것도 가능합니다.

sendreceive 모두 중단 함수라는 것을 확인할 수 있습니다. 원소를 보내고 받는 함수가 중단 함수인 것은 필수적인 특징입니다.

  • receive를 호출했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단됩니다.
  • 반면 send는 채널의 용량이 다 찼을 때 중단됩니다. 대부분의 채널은 용량이 제한되어 있다는 것을 나중에 확인할 수 있습니다.

채널은 송신자와 수신자의 수에 제한이 없습니다. 하지만 채널의 양쪽 끝에 각 하나의 코루틴만 있는 경우가 일반적입니다.

다음은 채널의 예시입니다.

suspend fun main(): Unit = coroutineScope {
	val channel = Channel<Int>()
	launch {
		repeat(5) { index ->
			println("Producing next one")
			delay(1000)
			channel.send(index * 2)
		}
		channel.close()
	}

	launch {
		for (element in channel) {
			println(element)
		}
   
		// 또는
		// channel.consumeEach { element ->
		//  println(element)
		// }
    }
	
}
		

채널에서 수신하는 방법

송신자 코루틴에서 channel.send를 통해 원소를 보낼 수 있습니다. 그리고 이 원소를 받는 방법에는 여러가지가 있습니다.

  • channel.receive
    • channel.receive()는 보내는 원소의 개수를 알아야 한다는 단점이 있습니다.
  • for 루프 혹은 consumeEach
    • channel.receive와는 달리, 송신자가 원소를 얼마나 보내는지 알 필요가 없습니다.
    • 채널이 닫힐 때까지 원소를 받을 수 있습니다.
    • comsumeEachfor 루프와는 달리, 모든 원소를 가지고 온 다음 채널이 닫힌 뒤 채널을 취소한다는 차이점이 있습니다.
    • 예외 같은 문제가 발생했을 때 채널을 닫는 것을 깜빡하기 쉽다는 단점 존재
    • 예외로 인해 코루틴이 원소를 보내는 것을 중단하면, 다른 코루틴은 원소를 영원히 기다려야함
  • produce
    • ReceiveChannel을 반환하는 코루틴 빌더

    • produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이(끝나거나, 중단되거나, 취소되거나) 채널을 닫습니다.

    • 따라서 close를 반드시 호출합니다.

    • suspend fun main(): Unit = coroutineScope {
      	val channel = produce {
      		repeat(5) { index ->
      			println("Producing next one")
      			delay(1000)
      			send(index * 2)
      		}
      	}
      	
      	for (element in channel) {
      		println(element)
      	}
      }

채널 타입

설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.

  • 무제한(Unlimited)
    • 제한이 없는 용량 버퍼를 가진 Channel.UNLIMITED로 설정된 채널로, send가 중단되지 않습니다.
  • 버퍼(Buffered)
    • 특정 용량 크기 또는 Channel.BUFFERED로 설정된 채널
    • Channel.BUFFERED는 기본값이 64이며, JVM의 defaultBuffer를 설정하면 오버라이딩 가능
  • 랑데뷰(Rendesvous)
    • 용량이 0이거나 Channel.RENDEZVOUS(용량이 0)인 채널
    • 송신자와 수신자가 만날 때만 원소를 교환
  • 융합(Conflated)
    • 버퍼 크기가 1인 Channel.CONFLATED를 가진 채널로, 새로운 원소가 이전 원소를 대체합니다.

채널이 가진 용량을 실제 예를 보면서 확인할 수 있습니다.

다음은 용량이 무제한인 채널의 예시입니다. 용량이 무제한이면, 채널은 모든 원소를 받고, 수신자가 하나씩 가져갑니다.

suspend fun main(): Unit = coroutineScope {
	val channel = produce(capacity = Channel.UNLIMITED) {
		repeat(5) { index ->
			send(index * 2)
			delay(100)
			println("Sent")
		}
	}
	
	delay(1000)
	for (element in channel) {
		println(element)
		delay(000)
	}
// Sent
// (0.1 초 후)
// Sent
// (0.1 초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1 초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 0
// (1초 후)
// 2
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후》
// 8
// (1초 후)

정해진 크기의 용량을 가지고 있다면, 버퍼가 가득 찰 때까지 원소가 생성되고, 이후 생성자는 수신자가 원소를 소비할 때까지 기다리고 시작합니다.

suspend fun main(): Unit = coroutineScope {
	val channel = produce(capacity = 3) {
		repeat(5) { index ->
			send(index * 2)
			delay(100)
			println("Sent")
		}
	}
	delay(1000)
	for (element in channel) {
		println(element)
		delay(1000)
	}
}
// Sent
// (0.1초 후)
// Sent
// (0.1 초 후)
// Sent
// (1 - 2 * 0.1 = 0.8초 후)
// 0
// Sent
// (1초 후)
// 2
// Sent
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후)
// 8
// (1초 후)

기본(또는 Channel.RENDEZVOUS) 용량을 가진 채널의 경우 송신자는 항상 수신자를 기다립니다.

suspend fun main(): Unit = coroutineScope {
	val channel = produce {
	// 또는 produce(capacity = Channel.RHMDEZVOUS) {
		repeat(5) { index ->
			send(index * 2)
			delay(100)
			println("Sent")
		}
	}
	delay(1000)
	for (element in channel) {
		println(element)
		delay(1000)
	}
}
// 0
// Sent
// (1 초 후)
// 2
// Sent
// (1 초 후)
// 4
// Sent
// (1초 후)
// 6
// Sent
// (1초 후)
// 8
// Sent
// (1초 후)

마지막으로, Channe.CONFLATED 용량을 사용하면 이전 원소를 더 이상 저장하지 않습니다. 새로운 원소가 이전 원소를 대체하며, 최근 원소만 받을 수 있게 되므로 먼저 보내진 원소가 유실됩니다.

suspend fun main(): Unit = coroutineScope {
	val channel = produce(capacity = Channel.CONFLATED) {
		repeat(5) { index ->
			send(index*2)
			delay(100)
			printlnC'Sent")
		}
	}
	delay(1000)
	for (element in channel) {
		printIn(element)
		delay(1000)
	}
}

// Sent
// (0.1 초 후)
// Sent
// (0.1 초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1 초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 8

버퍼 오버플로일때

채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때(onBufferOverflow 파라미터)의 행동을 정의할 수 있습니다.

다음은 오버플로와 관련된 옵션입니다.

  • SUSPEND(기본 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단됩니다.
  • DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거됩니다.
  • DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근 원소가 제거됩니다.

채널 용량 중 Channel.CONFLATED는 용량을 1로 설정하고 onBufferOverflowDROP_OLDEST로 설정한 것임을 알 수 있습니다.

다음은 오버플로를 설정한 예시입니다.

suspend fun main(): Unit = coroutineScope {
	val channel = Channel<Int>(
		capacity = 2,
		onBufferOverflow = BufferOverflow.DROP_OLDEST
	)

	launch {
		repeat(5) { index ->
			channel.send(index * 2)
			delay(100)
			println("Sent")
		}
		channel.close()
	}
	delay(1000)
	for (element in channel) {
		println(element)
		delay(1000)
	}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1 초 후)
// Sent
// (0.1 초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 6
// (1초 후)
// 8

전달되지 않는 원소 핸들러

Channel 함수에서 반드시 알아야할 또 다른 파라미터는 onUndeliveredElement입니다.

원소가 어떤 이유로 처리되지 않을 때 호출됩니다. 대부분 채널이 닫히거나 취소되었음을 의미하지만, send, receive, receiveOrNull 또는 hasNext가 에러를 던질 때 발생할 수도 있습니다. 주로 채널에서 보낸 자원을 닫을 때 사용합니다.

val channel = Channel<Resource>(capacity) { resource ->
	resource.close()
}
// 또는
// val channel = Channel<Resource>(
//     capacity,
//     onUndeliveredElement = { resource ->
//       resource.close()
//     }
// )

// 생성자 코드
val resourceToSend = openResource()
channel.send(resourceToSend)

// 소비자 코드
val resourceReceived = channel.receive()

try {
	// 수신한 자원으로 작업합니다
} finally {
	resourceReceived.close()
}

팬아웃(Fan-out)

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다. 하지만 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 합니다.

consumeEach는 여러개의 코루틴에서 사용하기에는 적합하지 않습니다.

fun Coroutinescope.produceNumbers() = produce {
	repeat(10) {
		delay(100)
		send(it)
	}
}

fun Coroutinescope.launchProcessor(
	id: Int,
	channel: ReceiveChannel<Int>
) = launch {
	for (msg in channel) {
		println("#$id received $msg")
	}
}

suspend fun main(): Unit = coroutineScope {
	val channel = produceNumbers()
	repeat(3) { id ->
		delay(10)
		launchProcessor(id, channel)
	}
}
// #0 received 0
// #1 received 1
// #2 received 2
// #0 received 3
// #1 received 4
// #2 received 5
// #0 received 6
// ...

원소는 공평하게 분배됩니다. 채널은 원소를 기다리는 코루틴들을 FIFO 큐로 가지고 있습니다.

팬인(Fan-in)

여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다. 다음 예제를 보면 두 개의 코루틴이 같은 채널로 원소를 보내고 있습니다.

suspend fun sendString(
	channel: SendChannel<String>,
	text: String,
	time: Long
) {
	while (true) {
		delay(time)
		channel.send(text)
	}
}
fun main() = runBlocking {
	val channel = Channel<String>()
	launch { sendString(channel,"foo",200L) }
	launch { sendString(channel,"BAR!", 500L) }
	repeat(50) {
		println(channel.receive())
	}
	coroutinecontext.cancelchildren()	
}
// (200밀리초후)
// foo
// (200밀리초후)
// foo
// (100밀리초후)
// BAR!
// (100밀리초후)
// foo
// (200밀리초후)
// ...

다수의 채널을 하나의 채널로 합쳐야하는 경우, fanIn 함수를 사용할 수 있습니다.

fun <T> Coroutinescope.fanIn(
	channels: List<ReceiveChannel<T>
): ReceiveChannel<T> = produce {
	for (channel in channels) {
		launch {
			for (elem in channel) {
				send(elem)
			}
		}
	}
}

파이프라인

한 채널로부터 받은 원소를 다른 채널로 전송하는 경우가 있습니다. 이를 파이프라인이라고 부릅니다.

// 1부터 3까지의 수롤 가진 채널
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
	produce {
		repeat(3) { num ->
			send(num + 1)
		}
	}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
	produce {
		for (num in numbers) {
			send(num * num)
		}
	}
	
suspend fun main() = coroutineScope {
	val numbers = numbers()
	val squared = square(numbers)
	for (num in squared) {
			println(num)
	}
}
// 1
// 4
// 9

셀렉트

코루틴은 가장 먼저 완료되는 코루틴의 결과를 기다리는 select 함수를 제공합니다. 또한 여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 먼저 확인하여 데이터를 보내거나, 이용 가능한 원소가 있는 채널로부터 데이터를 받을 수 있는지 여부도 확인할 수 있습니다. 코루틴 사이에 경합을 일으키거나, 여러 개의 데이터 소스로부터 나오는 결과값을 합칠 수도 있습니다. 실제 적용 사례를 통해 확인할 수 있습니다.

지연되는 값 선택하기

여러 개의 소스에서 데이터를 요청한 뒤, 가장 빠른 응답만 얻는 경우를 생각해봅시다. 가장 쉬운 방법은 요청을 여러 개의 비동기 프로세스로 시작한 뒤, select 함수를 표현식으로 사용하고 표현식 내부에서 값을 기다리는 것입니다.

다음 예제에서 비동기 결과값 하나만 반환하는 걸 볼 수 있는데, select 표현식이 하나의 비동기 작업이 완료됨과 동시에 끝나게 되어 결과값을 반환한다는 것을 알 수 있습니다.

suspend fun requestData1(): String {
	delay(100_000)
	return "Data1"
}

suspend fun requestData2(): String {
	delay(1000)
	return "Data2"
}

val scope = CoroutineScope(SupervisorJob())

suspend fun askMultipleForData(): String {
	val defDatal = scope.async { requestData1() }
	val defData2 = scope.async { requestData2() }
	return select {
		defDatal.onAwait { it }
		defData2.onAwait { it }
	}
}

suspend fun main(): Unit = coroutineScope {
	println(askMultipleForData())
}
// (1 초 후)
// Data2

위 예제를 보면 외부의 스코프로부터 async가 시작됩니다. 따라서 askMultipleForData를 시작하는 코루틴을 취소하면, 외부의 스코프인 비동기 태스크는 취소되지 않습니다. coroutineScope를 사용하면 자식 코루틴도 기다리게 되며, 1초가 아닌 100초 뒤에 Data2를 결과로 받을 수 있습니다.

asyncselect를 사용하면 코루틴끼 리 경합하는 상황을 쉽게 구현할 수 있지
만, 스코프를 명시적으로 취소해야 합니다. select가 값을 생성하고 나서 also를 호출한 뒤 다른 코루틴을 취소할 수 있습니다.

suspend fun askMultipleForData(): String = coroutineScope {
	select<String> {
		async { requestData1() }.onAwait { it }
		async { requestData2() }.onAwait { it }
	}.also { coroutineContext.cancelChildren() }
}

// (1 초 후)
// Data2
suspend fun main(): Unit = coroutineScope {
	println(askMultipleForData())
}
// (1 초 후)
// Data2

위 해결책은 약간 복잡하기 때문에, 많은 개발자들은 헬퍼 함수를 정의하거나 raceOf 함수를 지원하는 외부 라이브러리를 사용합니다.

채널에서 값 선택하기

select 함수는 채널에서도 사용할 수 있습니다. 셀렉트 표현식에서 사용하는 주요 함수는 다음과 같습니다.

  • onReceive
    • 채널이 값을 가지고 있을 때 선택됩니다. 값을 받은 뒤 람다식의 인자로 사용합니다.
    • onReceive가 선택되었을 때, select는 람다식의 결과값을 반환합니다.
  • onReceiveCatching
    • 채널이 값을 가지고 있거나 닫혔을 때 선택됩니다.
    • 값을 나타내거나 채널이 닫혔다는 것을 알려주는 ChannelResult를 받으며, 이 값을 람다식의 인자로 사용합니다. onReceiveCatching이 선택되었을 때, select는 람다식의 결과값을 반환합니다.
  • onSend
    • 채널의 버퍼에 공간이 있을 때 선택됩니다.
    • 채널에 값을 보낸 뒤, 채널의 참조값으로 람다식을 수행합니다. onSend가 선택되었을 때 select는 Unit을 반환합니다.

다음은 select 표현식의 예시 코드입니다.

fun main(): Unit = runBlocking {
	val cl = Channel<Char>(capacity = 2)
	val c2 = Channel<Char>(capacity = 2)
	// 값을 보냅니다

	launch {
		for (c in 'A'..'H') {
			delay(400)
			select<Unit> {
				cl.onSend(c) { printInC'Sent $c to 1") }
				c2.onSend(c) { printIn("Sent $c to 2") }
			}
		}
	}
	
	// 값을 받습니다
	launch {
		while (true) {
			delay(1000)
			val c = select<String> {
				cl.onReceive { "$it from 1" }
				c2.onReceive { "$it from 2" }
			}
			println("Received $c")
		}
	}
}
// Sent A to 1
// Sent B to 1
// Received A from 1
// Sent C to 1
// Sent D to 2
// Received B from 1
// Sent E to 1
// Sent F to 2
// Received C from 1
// Sent G to 1
// Received E from 1
// Sent H to 1
// Received G from 1
// Received H from 1
// Received D from 2
// Received F from 2

핫 데이터 소스와 콜드 데이터 소스

코틀린 코루틴은 처음에는 채널만 가지고 있었지만, 코루틴 개발자들은 이것만으로는 부족하다고 느꼈습니다. 채널은 핫 스트림으로 가지만 콜드 스트림이 필요할 때가 있습니다.

List와 Set과 같은 컬렉션은 이며, Sequence와 자바의 Stream은 콜드입니다.

Channel은 이지만, Flow와 RxJava 스트림은 콜드입니다.

콜드
컬렉션(List, Set)Sequence, Stream
ChannelFlow, RxJava 스트림

핫 vs 콜드

핫 데이터 스트림은 데이터를 소비하는 것과 무관하게 원소를 생성하지만, 콜드 데이터 스트림은 요청이 있을 때만 작업을 수행하며 아무것도 저장하지 않습니다.

핫 데이터 스트림의 빌더와 연산은 즉각 실행됩니다. 콜드 데이터 스트림에서는 원소가 필요할 때까지 실행되지 않습니다.

fun main() {
	val l1 = buitdList {
		repeat(3) {
			add("User$it")
			println("L: Added User")
		}
	}
	val l2 = l1.map {
		println("L: Processing")
		"Processed $it"
	}

	val s = sequence {
		repeat(3) {
			yield("User$it")
			println("Added User")
		}
	}
	val s2 = s.map {
		println("S: Processing")
		"Processed $it"
	}
}
// L: Added User
// L: Added User
// L: Added User
// L: Processing
// L: Processing
// L: Processing	

그 결과 Sequence, Stream 또는 Flow와 같은 콜드 스트림은

  • 무한할 수 있습니다.
  • 최소한의 연산만 수행합니다.
  • 중간에 생성되는 값들을 보관할 필요가 없기 때문에 메모리를 적게 사용합니다.

Sequence는 원소를 지연 처리하기 때문에 더 적은 연산을 수행합니다. 작동하는 방식은 아주 간단합니다. 중간 연산은 이전에 만든 시퀀스에 새로운 연산을 첨가할 뿐입니다. 최종 연산이 모든 작업을 실행합니다.

리스트의 원소는 컬렉션이지만, 시퀀스는 원소를 어떻게 계산할지 정의한 것에 불과합니다.

핫 데이터 스트림은

  • 항상 사용 가능한 상태입니다. 각 연산이 최종 연산이 될 수 있습니다.
  • 여러 번 사용되었을 때 매번 결과를 다시 계산할 필요가 없습니다.

그렇기 때문에, 밑의 예제에서 find 연산 시, 결과를 다시 계산할 필요가 없기 때문에 16이 한 번만 실행되는 것을 확인할 수 있습니다.

fun m(i: Int): Int {
	print("m$i ")
	return i * i
}

fun main() {
	val i = listOf(lr 2, 3, 4f 5, 6, 7, 8, 9, 10)
		.map { m(it) } // ml m2 m3 m4 m5 m6 m7 m8 m9 ml0
	println(l) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
	println(l.find { it > 10 }) // 16
	println(l.find { it > 10 }) // 16

	println(l.find { it > 10 }) // 16
	val s = sequenceOf(l, 2, 3, 4, 5, 6, 7, 8, 9, 10)
		.map { m(it) }
		
	println(s.toList())
	// [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
	println(s.find { it > 10 }) // ml m2 m3 m4 16
	println(s.find { it > 10 }) // ml m2 m3 m4 16
	println(s.find { it > 10 }) // ml m2 m3 m4 16
}

자바의 Stream은 코틀린의 Sequence와 비슷한 특징을 가지고 있습니다. 둘 모두 콜드 데이터 스트림입니다.

핫 채널, 콜드 플로우

플로우를 생성하는 가장 일반적은 방법은 produce 함수와 비슷한 형태의 빌더를 사용하는 것입니다.

val channel = produce {
	while (true) {
		val x = computeNextValue()
		send(x)
	}
}

val flow = flow {
	while (true) {
		val x = computeNextValue()
		emit(x)
	}
}

두 빌더는 개념적으로 동일하지만, 수신자 여부에 따라 다르게 동작합니다.

채널

채널은 핫이라 값을 곧바로 계산합니다. 별도의 코루틴에서 계산을 수행하기 때문에, prodceCoroutineScope의 확장 함수로 정의되어 있는 코루틴 빌더가 되어야 합니다.

private fun CoroutineScope.makeChannel() = produce {
	printIn("Channel started")
	for (i in 1..3) {
		delay(1000)
		send(i)
	}
	
suspend fun main() = coroutineScope {
	val channel = makeChannel()
	delay(1000)
	println("Calling channel...
	for (value in channel) {
		println(value)
	}
	println("Consuming again..
	for (value in channel) {
		println(value)
	}
}
// Channel started
// (1 초 후)
// Calling channel...
// 1
// (1 초 후)
// 2
// (1 초 후)
// 3
// Consuming again...

위의 예제에서는 계산은 곧바로 시작되지만, 랑데뷰 채널이라 데이터 생성이 중단되어 수신자가 준비될 때까지 재게되지 않습니다. 이때, 수신자가 없을 때 데이터 생성을 중단하는 것과 요청할 때 데이터를 생성하는 것의 차이를 알아야 합니다.

채널은 핫 데이터 스트림이기에 값이 소비되는 것과 상관없이, 값을 생성한 뒤에 저장합니다. 수신자가 얼마나 많은지는 신경쓰지 않습니다.

플로우

플로우는 콜드 데이터 스트림이기 때문에 값이 필요할 때만 생성합니다. 따라서 flow는 빌더가 아니며 어떤 처리도 하지 않습니다. flow는 최종 연산이 호출될 때 원소가 어떻게 생성되어야 하는지 정의하는 것에 불과합니다. 그래서 flow 빌더는 CoroutineScope가 필요하지 않습니다.

flow 빌더는 호출한 최종 연산의 스코프에서 실행됩니다.

coroutineScope와 다른 코루틴 스코프 함수처럼 중단 함수의 컨티뉴에이션 객체로부터 스코프를 가지고 옵니다. 플로우의 각 최종 연산은 처음부터 데이터를 처리하기 시작합니다.

플로우란 무엇인가?

플로우는 비동기적으로 계산해야할 값의 스트림을 나타냅니다.

Flow 인터페이스 자체는 떠다니는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리하는 것을 의미합니다.

interface Flow<out T> {
	suspend fun collect(collector: FlowCollector<T>)
}

flow의 유일한 멤버 함수는 collect 입니다. 다른 함수들은 확장 함수로 정의되어 있습니다. iterator만 멤버 함수로 가지고 있는 Iterable 또는 Sequence와 비슷하다고 볼 수 있습니다.

플로우와 값들을 나타내는 다른 방법들의 비교

플로우에 대해 더 이해하기 위해 예시를 들어보겠습니다.

여러 개의 값을 반환하는 함수가 필요하다고 한번에 모든 값을 만들 때는 ListSet과 같은 컬렉션을 사용합니다.

fun allUsers(): List<User> =
	api.getAllUsers().map { it.toUser() }

명심해야 할 점은 ListSet이 모든 원소의 계산이 완료된 컬렉션이라는 점입니다. 값들을 계산하는 과정에 시간이 걸리기 때문에, 원소들이 채워질 때까지 모든 값이 생성되길 기다려야 합니다.

fun getList(): List<Int> = List(3) {
	Thread.sleep(1000)
	"User$it"
}

fun main() {
	val list = getList()
	println("Function started")
	list.forEach { println(it) }
}
// (3초 후)
// Function started
// User0
// User1
// User2

원소를 하나씩 계산할 때는, 원소가 나오자마자 얻을 수 있는 것이 낫습니다. Sequence 사용하는 것이 한 가지 방법입니다.

fun getSequenceO: Sequence<String> = sequence {
	repeat(3) {
		Thread.sleep(1000)
		yield("User$itn)
	}
}

fun main() {
	val list = getSequence()
	println("Function started")
	list.forEach { println(it) }
}
// Function started
// (1초 후》
// User0
// (1 초 후》
// Userl
// (1 초 후》
// User2
	

시퀀스는 CPU 집약적인 연산 또는 블로킹 연산일 때 필요할 때마다 값을 계산하는 플로우를 나타내기에 적절합니다. 시퀀스의 최종 연산은 중단 함수가 아니기 때문에, 시퀀스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드가 블로킹됩니다. 따라서 sequence 빌더의 스코프에서는 Sequencescope의 리시버에서 호출되는 함수(yield와 yieldAll) 외에 다른 중단 함수를 사용할 수 없습니다.

시퀀스를 잘못 사용하면 안 되기 때문에 중단 함수를 사용할 수 없다는 제약 사항이 도입되었습니다. 누군가 HTTP의 엔드포인트로부터 시퀀스를 사용해 페이징 기법으로 빈 페이지를 받을 때까지 모든 사용자의 리스트를 얻는 경우를 생각해 봅시다. Sequence의 iterator가 중단 함수가 아니기 때문에, 시퀀스의 원소를 소비할 때 블로킹이 되는 것은 문제가 됩니다.

// 이렇게 구현하면 안 됩니다
fun allUsersSequence(
	api: UserApi
): Sequence<User> = sequence {
	var page = 0
	do {
			// 중단 함수이므로컴파일 에러가 발생합니다
			val users = api. takePage(page++) 
			yieldAll(users)
	} while (!users.isNullOrEmpty())
}

시퀀스는 위와 같은 상황에서 사용하기에는 적합하지 않습니다. 데이터 소스의 개수가 많거나 원소가 무거운 경우, 원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 시퀀스가 정확히 들어맞습니다.

val fibonacci: Sequence<BigInteger> = sequence {
	var first = 0.toBiglnteger()
	var second = 1.toBiglnteger()
	while (true) {
		yield(first)
		val temp = first
		first += second
		second = temp
	}
}

fun countCharactersInFile(path: String): Int =
	File(path).useLines { lines ->
		lines.sumBy { it.length }
	}

스레드 블로킹이 매우 위험하고 예기치 않은 상황을 유발할 수 있다고 느꼈을 것입니다.

다음 예제를 통해 확인해 봅시다. Sequence를 사용했기 때문에 forEach가 블로킹 연산이 됩니다. 따라서 같은 스레드에서 launch로 시작된 코루틴이 대기하게 되면, 하나의 코루틴이 다른 코루틴을 블로킹하게 됩니다.

fun getSequence(): Sequence<String> = sequence {
	repeat(3) {
		Thread.sleep(1000)
		// 여기에 delay(1000)이 있는 것과 같은 결과입니다
		yield("User$it")
	}
}

suspend fun main() {
	withContext(newSingleThreadContext("main")) {
		launch {
			repeat(3) {
				delay(100)
				println("Processing on coroutine")
			}
		}
		val list = getSequence()
		list.forEach { println(it) }
	}
}
// (1초 후)
// User0
// (1초 후)
// User1
// (1초 후)
// User2
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine

이런 상황에서 Sequence 대신 Flow를 사용해야 합니다. 플로우를 사용하면 코루틴이 연산을 수행하는 데 필요한 기능을 전부 사용할 수 있습니다. 플로우의 빌더와 연산은 중단 함수이며 구조화된 동시성과 적절한 예외 처리를 지원합니다.

이후에 이어지는 이러한 기능을 살펴보기로 하고, 지금은 플로우가 앞에서 언급한 문제를 어떻게 풀 수 있는지 보겠습니다.

fun getFlow(): Flow<String> = flow {
	repeat(3) {
		delay(1000)
		emit("User$it")
	}
}
suspend fun main() {
	withContext (newSingleThreadContext ("main")) {
		launch {
			repeat(3) {
				delay(100)
				printIn("Processing on coroutine")
			}
		}
			
		val list = getFlow()
		list.collect { printIn(it) }
	}
}
// (0.1초 후)
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine
// (1 - 3 * 0.1 = 0.7초 후)
// User0
// (1 초 후)
// User1
// (1 초 후)
// User2

플로우는 코루틴을 사용해야 하는 데이터 스트림으로 사용되어야 합니다. 예를 들면, API 페이지에서 페이지별로 사용자를 얻은 뒤 사용자 스트림을 사용자 스트림을 만드는 데 사용될 수 있습니다. 플로우 함수를 호출함으로써 다음 페이지가 나오자마자 처리할 수 있으며, 얼마나 많은 페이지를 얻어와야 하는지 정할 수 있습니다.

fun allUsersFlow(
	api: UserApi
): Flow<User> = flow {
	var page = 0
	do {
		val users = api.takePage(page++) // 중단 함수
		emitAll(users)
	} while (!users.isNullOrEmpty())
}

플로우의 특징

collect와 같은 플로우의 최종 연산은 스레드를 블로킹하는 대신 코루틴을 중단시킵니다. 플로우는 코루틴 컨텍스트를 활용하고 예외를 처리하는 등의 코루틴 기능도 제공합니다. 플로우 처리는 취소 가능하며, 구조화된 동시성을 기본적으로 갖추고 있습니다.

flow 빌더는 중단 함수가 아니며 어떠한 스코프도 필요로 하지 않습니다. 플로우의 최종 연산은 중단 가능하며, 연산이 실행될 때 부모 코루틴과의 관계가 정립됩니다.

다음 예제는 CoroutineName 컨텍스트가 collect에서 flow 빌더의 람다 표현식으로 전달되는 것을 보여줍니다. launch를 취소하면 플로우 처리도 적절하게 취소된다는 것 또한 확인할 수 있습니다.

// 플로우 빌더는 중단 함수가 아니기 때문에
// CoroutineScope 가 필요하지 않습니다

fun usersFlow(): Flow<String> = flow {
	repeat(3) {
		delay(1000)
		val ctx = currentCoroutineContext()
		val name = ctx[CoroutineName]?.name
		emit("User$it in $name")
	}
}

suspend fun main() {
	val users = usersFlow()
	withContext(CoroutineName("Name")) {
		val job = launch {
			// collect는 중단 함수입니다
			users.collect { println(it) }
		}
		launch {
			delay(2100)
			println("I got enough")
			job.cancel()
		}
	}
}

// (1초 후)
// User0 in Name
// (1초 후)
// User1 in Name
// (0.1초 후)
// I got enough

플로우 명명법

모든 플로우는 몇 가지 요소로 구성됩니다.

  • 플로우는 어딘가에서 시작되어야 합니다. 플로우 빌더, 다른 객체에서의 변환, 또는 헬퍼 함수로부터 시작됩니다.
  • 플로우의 마지막 연산은 최종 연산이라 불리며, 중단 가능하거나 스코프를 필요로 하는 유일한 연산이라는 점에서 아주 중요합니다. 최종 연산은 주로 람다 표현식을 가진 또는 가지지 않는 collect가 됩니다. (하지만 다른 최종 연산 또한 존재합니다.)
  • 시작 연산과 최종 연산 사이에 플로우를 변경하는 중간 연산을 가질 수 있습니다.

실제 사용 예

데이터베이스에서의 변경이나 UI 위젯의 변화 또는 센서와 같이 이벤트를 감지해야 할 필요가 있다면, 감지하는 모듈 각각이 이벤트를 받게 됩니다. 감지하는 모듈이 없다면 이벤트를 받는 것을 멈춰야 합니다.

플로우가 사용되는 전형적인 예시는 다음과 같습니다.

  • 웹소켓이나 RSocket 알림과 같이 서버가 보낸 이벤트를 통해 전달된 메시지를 받는 경우
  • 텍스트 입력 또는 클릭과 같은 사용자 액션이 감지된 경우
  • 데이터베이스의 변경을 감지하는 경우
profile
코딩일기

0개의 댓글