reactiveX website title

ReactiveX?

최근 Rx에 대한 관심이 뜨겁다. Reactive Programming 이라는 말을 만들어 낼 정도로 뜨거운 개념인데, 이번에 코틀린으로 뭔가 해 볼 생각이 들어서 이왕 하는 김에 RxKotlin을 좀 써보기로 했다. 조금 써본 경험은 '이거 대단하다'... Java8에서 lambda stream으로 무언가 작업을 할 때 예외 처리나 비동기 처리를 하는 것이 굉장히 불편하고 짜증나는 경험이었는데, Rx는 functional 하게 lambda를 다루면서도 예외 처리를 굉장히 깔끔하게 할 수 있다.

개념

이 한 장의 그림이 모든 걸 설명한다.

psuedo rx transforming

모든 데이터는 일련의 흐름으로 나타나고, 이 일련의 데이터 스트림을 받아 동기 혹은 비동기적으로 작업을 처리하는 Observer(관측자) 를 세우는 형태의 프로그래밍 방식이다. 무슨 말인지 잘 이해가 안 간다면 몇 가지 이미지와 함께 이해를 해 보자.

map rx transforming

가장 처음으로 볼 것은 map 이다. map은 기존의 functional programming 에 익숙하지 않더라도 java8이나 여타 다른 언어들의 lambda 식에서 많이 봐서 익숙하리라 생각한다. stream에 대해 map을 수행하면 각 스트림의 원소에 대해 값으로 넘긴 function을 수행한다. 이 개념을 대입해서 그림에 있는 각 도형/색깔의 의미와 가운데에 있는 함수의 의미에 익숙해지면 다음 그림으로 넘어가자.

flatmap fx transforming

flatmap은 하나의 값이 여러 값으로 분리가 될 때 사용한다. 이 때 분리가 되는 순서는 비동기적이다. 즉. 그림에서 보듯이 초록색이 flat 되는 동안 파란색 원도 flat 될 수 있다.

switchmap fx transforming

switchMap은 기존의 flatMap과 동일하지만 중간에 데이터가 들어 올 경우 기존의 작업을 새로운 작업으로 덮어 씌워 진행한다.

좀 더 자세히

위의 흐름을 제어하기 위해 Rx는 여러 장치들이 마련되어 있는데. 데이터의 흐름은 Observable 이라는 형태로 나타나고. 이를 Observer에 등록하는 방법으로 진행된다.

Observable

간단한 코드를 보자.

Observable.just("Alpha","Beta","Gamma","Delta", "Epsilon")
    .map { it.length }
    .subscribe(::println)

.just로 데이터의 흐름을 만들고, 각각의 흐름을 .map을 이용해 데이터를 변환한다. 그 후에 가서야 .subscribe로 구독 등록을 한 Observer가 각각의 데이터에 대해 작업을 수행한다. 데이터의 흐름은 같은 형태로만 이루어 져야 하고, list 같은 자료구조로부터도 Observable 한 타입으로 쉽게 변환이 가능하다.

val list = listOf(1,2,3,4,5)
val source = list.toObservable()

물론 반대도 쉽게 가능하다.

val source = Observable.range(1,5)
val list = source.toList()

Observer

Observer는 다음과 같은 구성으로 이루어져 있다.

onNext
onError
onComplete

단일 데이터만을 가지고 있는 Observable을 받는 Observer는 다음과 같은 구성으로 이루어져 있다.

onSuccess
onError

각 요소에 function을 넘기면 타이밍에 맞춰 실행된다. onNext는 각 데이터들을 받아 오고, 각 데이터들에 대해 수행할 function을 넘기면 된다. onError는 에러가 발생하면 실행할 메서드를 넘기면 된다. onComplete는 완료되면 실행할 메서드를 넘기면 된다. 코드로는 다음과 같다.

Observable.range(1,5)
    .subscribeBy(
        onNext = ::println,
        onError = { it.printStackTrace() },
        onComplete = { println("Done!") }
    )

Override도 가능하다.

val subscriber = object: ResourceObserver<Int>() {
    override fun onComplete() = println("Done!")

    override fun onNext(i: Int) = println(i)

    override fun onError(e: Throwable) = e.printStackTrace()
}

Observable.range(1,5)
    .subscribe(subscriber)

마치며

이런 비동기적인 작업을 쉽게 처리할 수 있고, 그러면서도 에러 처리도 간편하게 할 수 있어서 개인적인 첫 인상은 너무나도 좋았다. Java8에서 lambda를 쓰면서 error 처리가 매우 어려운 바람에 쓰기가 매우 까다로웠는데, 이 부분을 시원하게 긁어주는 기분이었다. 이런 장점은 특히나 GUI같이 UI쓰레드와 백그라운드 작업이 동시에 비동기적으로 작동해야 하는 개발에서 빛을 발하는데, 시리즈로 rxKotlin + TornadoFX(JavaFx with Kotlin) 을 이용한 GUI 개발 방법을 적어보자 한다.

레퍼런스

http://reactivex.io/documentation/operators