Kotlin에서 리액티브 프로그래밍 연습하기

KIYOUNG KWON·2022년 4월 7일
0

리액티브 프로그래밍

  • 선언적 프로그래밍
  • 변화의 전파와 데이터 흐름

리액티브 프로그래밍은 보통 위 2가지 키워드로 설명되는데 개인적으로 이해한 내용을 정말 단순하게 풀어써보면 for문과 같은 반복문을 사용해서 명령형으로 데이터를 처리하던 것을 함수형 프로그래밍을 활용하여 선언적으로 데이터의 처리를 수행한다고 보면 될 것 같다.(함수형 프로그래밍이 선언적 프로그래밍의 하위 카테고리라 카더라..)

구현체들의 경우에는 다음의 기술(혹은 지식?)을 사용하여 구현이 된다.

  • 함수형 프로그래밍(Stream)
  • 옵저버 패턴(Publisher, Subscriber)
  • 비동기(push) 이벤트

Java의 경우 이를 구현하기 위한 인터페이스로 Flow라는 것을 제공한다.

Java Flow

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Java의 Flow API는 위 4가지 인터페이스를 제공한다.

해상도가 좀 많이 구리긴 한데.. 내가 봤을 때 Flow API의 구조를 가장 잘 표현했다고 생각하는 그림이다. Subscriber가 Publisher의 subscribe()로 구독을 요청하면 subscription을 제공하고 이 subscription의 request()를 통해 데이터를 소비한다. 사실 Flow API보다 RXJava가 더 먼저 나왔다고 한다. 그래서 과거 버전의 RXJava에서 사용하던 Observable(Publisher)에는 subscriber입장에서 사용할 request() 메소드의 역할을 하는 기능이 없다.

해당 내용은 링크에 더 잘 정리되어있다.

Flow는 어디까지나 인터페이스 일 뿐 구현체는 아니다.(물론 인터페이스를 상속받아 사용하면 되긴함) 대표적인 구현체로 Akka, Rxjava가 있는데 여기선 RXjava를 kotlin의 확장메소드를 활용해 좀더 확장성이 좋은 RXKotlin을 활용하여 작업을 해보려고 한다.

RXKotlin

RXJava와 사실 기능적으론 큰차이가 없다. 확장함수같이 Kotlin이 언어레벨에서 지원하는 기능을 활용하여 편의성을 증가시킨 것이 주된 기능으로 보인다. github에 아주 간단한 예제로 쉽게 나와있다.

import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.kotlin.toObservable

fun main() {

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    list.toObservable() // iterable에 대한 확장함수로 간편하게 observable로 변환
            .filter { it.length >= 5 }
            .subscribeBy(  // 명시적인 매개변수를 지원
                    onNext = { println(it) },
                    onError =  { it.printStackTrace() },
                    onComplete = { println("Done!") }
            )

}

실제적용

이전에 진행했던 redis stream의 데이터를 읽어 telegram으로 전달하던 코드에 RxKotlin을 적용해보려고 한다.

//기존코드
while (true) {
        val messages: MutableList<StreamMessage<String, String>> = syncCommands.xreadgroup(
            Consumer.from("bot-group", "consumer_1"),
            XReadArgs.StreamOffset.lastConsumed("bot-stream")
        )
        if (messages.isNotEmpty()) {
            for (message in messages) {
                syncCommands.xack("bot-stream", "application_1", message.id)

                val birthdayBotPacket = RedisBirthdayBotPacket(
                    name = message.body["name"]!!,
                    token = message.body["token"]!!,
                    birthdayDateTime = LocalDateTime.parse(message.body["birthdayDateTime"])
                )
                println(birthdayBotPacket)


                client.request<HttpResponse>("https://api.telegram.org/bot${birthdayBotPacket.token}/sendmessage?") {
                    method = HttpMethod.Get
                    parameter("text", birthdayBotPacket.name)
                    parameter("chat_id", birthdayBotPacket.chatId)
                }
            }
        }
    }

stream에 메시지가 있는지 확인하면서 있으면 telegram api로 메시지를 전달하던 코드다.

//rx적용
while (true) {
        Observable.fromCallable {
            syncCommands.xreadgroup(
                Consumer.from("bot-group", "consumer_1"),
                XReadArgs.StreamOffset.lastConsumed("bot-stream")
            ) }
            .filter { it.isNotEmpty() }
            .observeOn(Schedulers.io())
            .subscribeBy(
                onNext = { list ->
                    list.map {
                            syncCommands.xack("bot-stream", "application_1", it.id) // 결과를 확인안해도 될 것 같아 그냥 호출만 함
                            sendTelegramMessage(client, it)
                                .subscribeBy(
                                    onNext = { response -> println(response) },
                                    onError = { throwable -> throwable.printStackTrace() }
                                )
                    }
                },
                onError = { it.printStackTrace() }
            )
    }
 
 
 //api호출하는 부분은 함수로 뺌
fun sendTelegramMessage(
    client: HttpClient,
    message: StreamMessage<String, String>
): Observable<HttpResponse> {
    return Observable.fromCallable {
        val birthdayBotPacket = RedisBirthdayBotPacket(
            name = message.body["name"]!!,
            token = message.body["token"]!!,
            birthdayDateTime = LocalDateTime.parse(message.body["birthdayDateTime"])
        )

        runBlocking {
            client.request<HttpResponse>("https://api.telegram.org/bot${birthdayBotPacket.token}/sendmessage?") {
                method = HttpMethod.Get
                parameter("text", birthdayBotPacket.name)
                parameter("chat_id", birthdayBotPacket.chatId)
            }
        }
    }
}

음... 사실 redisClient나 httpClient에서 reactive를 지원하는 라이브러리를 사용하면 훨씬 깔끔해질 것이지만 일단 기존코드에 rxkotlin만 적용해 보았다. 여기서 redis stream을 읽어오는 부분은 main thread로 처리하고 telegram으로 보내는 부분만 io thread에서 처리하도록 observeOn(Schedulers.io())를 호출했다.

만약 여기서 역압력에 대한 기능을 원하면 Observable이 아닌 Flowable(RxJava의 Flow API 적용 버전)을 사용하면 될 것 이다. 하지만 이 예제의 경우 Main Thread에서 순차적으로 Redis에서 Data를 읽기 때문에 역압력이 딱히 필요는 없어보여 그냥 Observable로 처리를 하였다.

아직 RxKotlin의 세부적인 기능에 대한 파악이 부족해서 최적의 방법으로 했다는 확신이 없다. 좀 더 공부해보고 더 좋은방법이 있다면 개선을 해봐야 겠다.

0개의 댓글