-채널은 일종의 파이프 라인 같은 것으로 보면 된다. 한쪽에서 데이터를 넣으면 다른 한쪽에서 데이터를 받는것을 의미한다. 채널은 두개 이상의 코루틴에서 안정적이고 효율적으로 데이터를 교환 할 수 있는 방법이야.
-Channel<데이터 타입>(버퍼사이즈 : Int , 버퍼오버플로우_정책) 이렇게 선언해주면 된다.
-버퍼사이즈 기본값 = RENDEZVOUS , 버퍼오버플로우정책기본값 = BufferOverflow.SUSPEND
-채널을 통해서 데이터를 주고 받을때는 각각 channel.send()와 channel.receive()를 사용하며, send()와 receive()는 모두 suspend function이다.
-send||receive가 suspension point이고 서로에게 의존적이기 때문에 같은 코루틴에서 사용하는 것은 위험 할 수 있다. send와 receive가 반대쪽이 준비가 되지 않으면 잠드는 suspend function이기 때문에 같은 코루틴에서 사용하면 해당 코루틴이 영원히 잠에 들게 된다.
-for 문을 사용해서 channel에 있는 데이터를 순차적으로 받아올 수 있다. 또한 채털은 FirstInFirstOut 방식으로 동작하기 때문에 , 보낸 순서대로 메시지를 받아 올 수 있다.
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..10) {
channel.send(x) // 수신측이 없다면, 잠이 들었다가 받은 이후에 깨어나서 다음 데이터를 보낸다.
}
}
repeat(10) {
println(channel.receive()) // 채널에 데이터가 없다면, 잠이 들었다가 채널에 데이터가 들어온 이후에 깨어나서 수행한다.
}
println("완료")
}
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..10) {
channel.send(x)
}
repeat(10) {
println(channel.receive())
}
println("완료")
}
}
-채널의 송신측에서 더 이상 보낼 것이 없다면 close 메서드를 사용해서 채널을 닫을 수 있다. 채널이 닫히면 receive 메서드를 사용하지 않고 for in 문을 사용해서 채널에서 보낸 값을 받아 올 수 있다.
fun main() = runBlocking {
val channel = Channel<Int>(5)
launch {
for (x in 1..10) {
channel.send(x)
}
channel.close()
}
// repeat(10) {
// println(channel.receive())
// }
for (x in channel) {
println(x)
}
println("완료")
}
-생산자 소비자는 일반적인 패턴이기 때문에 채널을 이용해서 한 쪽에서 데이터를 만들어 보내고 다른 쪽에서 받는 것을 도와주는 확장함수인 produce가 있다.
-produce 확장 함수(CoroutineScope.produce)를 사용하게 되면 , ProducerScope가 만들어지게 되고 'CoroutineScope 인터페이스'와 'SendChannel 인터페이스'를 함께 상속 받는다. 그 결과 'CoroutineContext와 Channel'의 몇가지 인터페이스를 사용 할 수 있게 도와주는 특이한 Scope이다.
-조금 쉽게 말하면 CoroutineScope.produce를 사용하면 ProducerScope 코드블락과 ProducerCoroutine을 얻게 된다. ProducerScope 블락에서는 this.send와 this.coroutineContext도 사용이 가능하다.
-주의!!produce{}의 반환 타입은 ReceiveChannel이다.
-consumeEach 확장 함수(ReceiveChannel.consumeEach)를 사용하면 produce를 사용해서 만든 SendChannel에서 보내는 데이터들을 it으로 받아온다. 받아온 it를 활용해서 람다 확장 함수에서 활용하면 된다.
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
val oneToTen = produce { // produce에서 ProducerScope(= CoroutineScope + SendChannel)를 만들어준다.
// 즉, this.send도 가능하고 , this.coroutineContext도 가능하다.
for (x in 1..7) {
this.send(x) // channel.send와 같은거야.
}
}
oneToTen.consumeEach {
println(it)
}
println("완료")
}
-파이프라인은 하나의 코루틴(프로듀서)이 데이터 스트릠을 생성하고 , 다른 코루틴(소비자)가 받아서 처리하거나 변형해서 새로운 스트림을 생성하는 패턴을 말한다.
-이러한 패턴을 활용하면 두가지 이상의 채널을 연결해서 사용 할 수 있다. 즉, ChannelPipeLine은 두가지 이상의 채널을 연속으로 붙혀서 사용하는 것을 말한다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceNumbers() = produce<Int> {
// CoroutineScope의 확장함수로 만들었으므로 , produce를 만들때 별도의 코루틴 없이 사용 할 수 있다. (코드를 한 번만 사용하고자 할 때 유용하다.)
var x = 1
while (true) {
this.send(x++)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceStringNumbers(numbers : ReceiveChannel<Int>) : ReceiveChannel<String> = produce {
for (i in numbers) {
send("${i}!")
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
>'numbers'는 [receive 채널: receive 메서드 이용 가능][send 채널 아니야.: send 메서드 이용 불가능]
> 따라서 'numbers'는 값을 확인 할 수 만 있고 , 따로 값을 보낼 수 는 없다.
> cf. channel은 receive 채널과 send 채널 둘 다 사용 가능.
val stringNumbers = produceStringNumbers(numbers)
repeat(5) {
println(stringNumbers.receive())
}
>위의 두개의 채널 모두 close가 없기 때문에 for 문을 사용 할 수 없다.
>그래서 명시적으로 횟수를 정해두고, receive 함수를 명시적으로 호출을 해야한다.
println("완료")
coroutineContext.cancelChildren()
>명시적으로 CoroutineContext에게 취소를 명령해서 produceNumbers()와 produceStringNumbers()를 취소를 시킨다.
}
fun CoroutineScope.produceNumbers2() = produce<Int> {// 1, 2, 3, ... , 반환하는 것은 리시브 채널
var x = 1
while (true) {
send(x++)
}
}
fun CoroutineScope.filterOdd(numbers : ReceiveChannel<Int>) : ReceiveChannel<String> = produce {
for (i in numbers) {
if (i % 2 == 1) {
send("${i}!")
}
}
}
fun main() = runBlocking {
val numbers = produceNumbers2() // 리시브 채널이야 : send 불가.
val oddNumbers = filterOdd(numbers)
repeat(10) {
println(oddNumbers.receive())
}
coroutineContext.cancelChildren()
}
-여러 코루틴이 동시에 하나의 채널을 구독하는 것을 말한다.(1:N=소비자:생산자)
-FanOut 시 장점
-병렬 처리 : 각각의 코루틴이 독립적으로 채널을 통해 데이터를 소비하므로 병렬적으로 실행이 가능하다. (다른 스레드에 속해있는 코루틴이라면 동시적으로 실행이 가능하다.)
-필요에 따라 새로운 코루틴을 추가시켜 작업을 확장 할 수 있다.
-각각의 코루틴이 다른 동작을 한다면 모듈화 시킬 수 있다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++)
delay(100L)
}
}
fun CoroutineScope.processNumber(id : Int , channel : ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("${id}가 ${it}을 받았습니다.")
}
}
fun main() = runBlocking {
val producer = produceNumbers()
repeat(5) {
processNumber(it, producer) // 5개의 코루틴에서 producer 채널을 구독한다.
}
delay(1000L)
producer.cancel()
}
-여러 채널을 하나의 코루틴이 구독하는 것을 말한다.(N:1=생산자:소비자)
-FanIn의 장점
-여러개의 채널에서 나오는 데이터를 효율적으로 병합 할 수 있다.
suspend fun produceNumbers(channel : SendChannel<Int> , from : Int , interval : Long) {
var x = from
while (true) {
channel.send(x)
x += 2
delay(interval)
}
}
fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("${it}을 받았습니다.")
}
}
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
produceNumbers(channel , 1 , 100L)
}
launch {
produceNumbers(channel , 2 , 150L)
}
processNumber(channel)
delay(1000L)
coroutineContext.cancelChildren()
}
-Channel + select : 먼저 끝난 요청을 처리하고자 할 때 사용한다.
-(cf)Channel에 대해서만 onReceiver를 사용하는 것 이외에도 [Job:onJoin] , [Deffered:onAwait] , [SendChannel:onSend] , [ReceiveChannel:onReceive, onReceiveCatching] , [delay:onTimeout] 이렇게 사용 할 수 있다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.sayFast() = produce<String> {
while (true) {
delay(100L)
send("패스트")
}
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.sayCampus() = produce<String> {
while (true) {
delay(150L)
send("캠퍼스")
}
}
fun main() = runBlocking {
val fasts = sayFast()
val campus = sayCampus()
repeat(5) {
select {// 먼저 끝내는 애만 듣겠다.
fasts.onReceive {
println("fast: $it")
}
campus.onReceive {
println("campus: $it")
}
}
}
coroutineContext.cancelChildren()
}
-채널 버퍼링 : 기본적으로 채널은 send를 하면 receive를 할 때까지 잠이 들고 , receive를 하면 send를 할 때까지 잠에 든다. 그러나 버퍼링을 하면 자유롭게 보낼 수 있고 자유롭게 받을 수 있고 , 중단되지 않으면서 채널을 사용 할 수 있다.
-채널 생성자는 인자로 버퍼의 사이즈를 받을 수 있다. 그럼 채널이 한번에 '버퍼사이즈'만큼 데이터를 저장 할 수 있다는거야. 즉, 채널이 수신자가 데이터를 가져가기 전까지 최대 '버퍼사이즈' 만큼의 데이터를 버퍼에 보관 할 수 있다는 의미이다.
-그러면 송신측은 채널의 버퍼가 버퍼사이즈 만큼 가득 찰 때까지 데이터를 계속해서 'send'할 수 있으며 , 버퍼가 가득 차면 송신측은 버퍼에 여유가 생길때까지 대기한다.
-수신측은 버퍼가 버퍼사이즈 만큼 가득차야지만 데이터를 가져오는 것은 아니야 데이터가 가득차지 않아도 가져 올 수 있어.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(10)
launch {
for (x in 1..20) {
println("${x} 전송중")
channel.send(x) // 받든 안 받든 채널로 계속 보낸다.
}
channel.close()
}
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}
-버퍼의 사이즈를 랑데뷰(Channel.RENDEZVOUS)로 지정 할 수 있다. : 실제로는 0을 지정하는거야. -> 버퍼가 없는 것을 '랑데뷰'라고 부른다.
-랑데뷰 이외에도 UNLIMITED : 무제한 설정(실제로는 에러발생) , CONFLATED : 처리하지 못한 오래된 값이 버림. , BUFFERED : 64개의 버퍼 (오버하면 suspend발생)
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(Channel.RENDEZVOUS)
launch {
for (x in 1..20) {
println("${x} 전송중")
channel.send(x)
}
channel.close()
}
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}
-버퍼의 사이즈가 오버 되었을 때 어떻게 처리 할 것인가에 대한 정책을 의미한다.
-[DROP_OLDEST : 오래된 것 버림] , [SUSPEND : 송신측이 잠 들었다가 버퍼에 여유가 생기면 깨어나] , [DROP_LATEST : 처리하지 못한 새로운 데이터를 지워]
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(2, BufferOverflow.DROP_LATEST)
launch {
for (x in 1..50) {
channel.send(x)
}
channel.close()
}
delay(500L)
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}