3. RxJava의 매커니즘(1)

안석주·2021년 11월 27일
0

RxJava

목록 보기
5/10

서론

벌써 3장이다! 6장까지 있으니, 꼭 끝내야 한다..다짐한다....
이번 장에서는 RxJava의 아키텍처와 특수기능을 살펴봅니다.

3.1 Rxjava와 디자인 패턴

RxJava는 옵저버 패턴과 이터레이터 패턴에 영향을 받았습니다!!! 두개를 알아봅시다..

3.1.1 옵저버 패턴

RxJava는 옵저버 패턴을 확장한 구조로, 관찰 대상 객체의 상태에 변화가 발생하면 해당 객체를 관찰하는 객체가 변화에 따른 처리 작업을 하는 디자인 패턴입니다.

옵저버 패턴에서는 관찰 대상이 되는 Subject에 이를 관찰하는 Observer를 등록합니다. 그리고 Subject의 상태가 변하면 Subject는 등록된 모든 Observer에 변화가 발생한 사실을 통지합니다. Observer는 Subject로부터 통지를 받은 후에 변화에 따른 적절한 처리를 합니다.

즉, 1. Subject에 Observer를 등록하고, 2. 상태가 변경되면 모든 등록된 Observer들에게 변화가 발생한 사실을 통지합니다. 3. 후에 Observer는 변화에 따른 처리를 합니다.

클래스설명
Subject관찰 대상을 나타내는 클래스. 클래스에 Observer를 추가, 삭제할 수 있고 상태 변화가 발생 시 통지 처리 기능이 있다.
Observer변화가 발생했을 때 이를 처리하는 메소드가 있는 인터페이스
ConcreteSubjectSubject를 상속한 클래스, 실제 변화가 이 클래스에서 일어나면 Subject의 통지 처리 메소드를 호출해 등록된 Observer에 통지한다
ConcreteObserverObserver의 구현 클래스로, 통지가 발생했을 때 처리할 내용을 구현한다

Subject의 메소드에 대해서 살펴보겠습니다.

메소드설명
addObserver(Observer)변화를 관찰하는 Observer 등록
deleteObserver(Observer)등록된 Observer 리스트에서 삭제
notifyObservers()변화가 발생하면 등록된 Observer에 통지한다(실제로는 등록된 Observer의 update 메소드를 호출)

마지막으로 Observer의 메소드를 보겠습니다.

메소드설명
update변화가 발생했다는 통지를 받으면 처리를 수행한다. 실제 처리 내용은 Observer 인터페이스의 구현 클래스에 구현되어 있다.

옵저버 패턴의 가장 중요한 점은 관찰 대상인 Subject에 상태 변화가 발생했을 때 Subject 스스로 자신에게 변화가 발생했다고 Observer에 통지한다는 것입니다. 옵저버 패턴을 사용하지 않는다면, 객체가 변화가 있는지 수시로 확인을 해야합니다. 이러한 방법은 Subject(관찰 대상)이 오랫동안 바뀌지 않는다면 매우 불필요한 작업입니다. 이를 방지하고자 반대로 오랫동안 관찰하지 않는다면, 변경 후 한참 뒤에 변화를 감지하는 문제가 발생합니다.

또한 옵저버 패턴의 또다른 특징은 관찰 대상인 Subject와 관찰하는 Observer가 분리됐다는 점입니다. 이는 Subject의 상태와 상태 변화로 발생하는 처리 작업을 분리할 수 있으며, 이에 따라 Subject는 자신의 상태가 변경될 때 어떤 객체가 무엇을 하는지 구체적으로 알 필요가 없습니다. Subject는 등록된 Observer에 변화가 발생했다는 것만 통지하면 되니, 새로운 영향을 받는 객체가 증가했다고 해도 이 객체를 Observer로 등록하기만 하면 문제 없습니다.

RxJava에서는 생산자와 소비자의 관계에서 이 옵저버 패턴이 적용됐는데, Subject를 생산자로, Observer를 소비자로 볼 수 있습니다. 이벤트나 상태 변화 또한 데이터 스트림으로 표현해 각각의 이벤트와 상태 변화를 시간과 함께 흘러가는 데이터로 다룹니다. 이렇게 생산자가 일으킨 상태변화를 데이터로 다루어 소비자에게 통지하면, 변화가 발생시에 신속하게 처리작업을 할 수 있습니다.

추가로, RxJava에서는 상태 변화를 알리는 통지 외에도 처리를 시작할 준비가 됐음을 알리는 통지와 모든 처리가 끝났음을 알리는 완료 통지, 에러가 발생했다고 알리는 에러 통지가 가능합니다.

3.1.2 이터레이터 패턴

RxJava의 실제 구현은 이터레이터 패턴과는 완전 다른 구조지만, 개념에는 이터레이터 패턴이 많은 영향을 주었습니다! 이는 데이터 집합체에서 순서대로 데이터를 꺼내기 위한 패턴입니다.

먼저 이터레이터 패턴은 데이터 집합체(List 같은)에서 데이터를 꺼내는 Iterator를 생성하고, 이 Iterator로 데이터를 하나씩 순서대로 얻을 수 있게 합니다. 이때 데이터 집합체가 어떤 형태로 데이터를 가지고 있는지는 Iterator를 사용하는 측에서 알 필요가 없습니다! 단순히 다음 데이터가 있는지 hasNext() 메소드를 호출해 확인하고, 있다면 next() 메소드를 통해 다음 데이터를 얻어 처리 작업을 하면 됩니다. 위와 같이 인터페이스와 이를 구현한 클래스를 살펴봅시다!

클래스설명
Aggregate데이터를 담고 있는 집합체를 나타내는 인터페이스
Iterator데이터를 순서대로 받을 수 있게 하는 인터페이스
ConcreteAggregate실제 데이터를 넣어 두는 데이터 집합체를 구현한 클래스
ConcreteIterator실제로 데이터를 받을 수 잇게 Iterator를 구현한 클래스

Aggregate의 메소드

메소드설명
iterator()Iterator를 생성한다

Iterator의 메소드

메소드설명
hasNext()true를 반환하면 next 메소드로 받을 수 있는 데이터가 있음을 나타낸다.

이제 실제 예제를 통해 봅시다!!!

fun main() {
    val list = listOf("a", "b", "c", "d", "e", "f", "g")
    val iterator = list.iterator()

    while (iterator.hasNext()) {
        print(iterator.next())
    }
}
-> abcdefg

위와 같이 next() 메소드를 호출하면, 현재 Iterator가 가리키는 인덱스에 있는 데이터를 얻고 인덱스는 다음으로 이동하게 합니다. 또한 다음 데이터가 있는지 hasNext() 메소드를 이용해 확인합니다. 이 두 메소드를 이용해 데이터 집합체에서 순서대로 모든 데이터를 얻을 수 있습니다.

그런데, RxJava에서는 생산자 자체가 데이터 집합체이므로 데이터를 순서대로 공급하는 역할(Iterator)을 하기도 합니다. 단, RxJava는 이터레이터 패턴처럼 소비자가 데이터를 가져가는 형태가 아니라, 소비자에게 데이터를 통지하는 방식입니다. 이러한 차이가 있지만, 데이터를 하나식 순서대로 처리하는 매커니즘이라는 점에서 공통점이 있습니다.

이처럼 순서대로 통지받는 것은 소비자 내부에서 상태를 다루기 쉽다는 이점이 있습니다. 예를 들어 데이터를 받을 때 처리가 순서대로 실해되지 않고 여러 데이터를 동시에 받아 동시에 처리한다면, 의도한 결과를 얻을 수 없을 수도 있습니다. (물론 RxJava의 규칙을 잘만 따른다면 - 데이터를 순서대로 통지한다 - 큰 문제는 없을 겁니다!!)

그러면 앞의 Iterable 예제와 마찬가지로 리스트에 담긴 객체를 순서대로 출력해봅니다!

val list = listOf("a", "b", "c", "d", "e", "f", "g")
    Flowable.fromIterable(list)
        .subscribe { println(it) }
-> abcdefg

일반적인 이터레이터 패턴은 데이터가 모두 모일 때까지 처리를 시작할 수 없지만, RxJava에서의 이터레이터 패턴은 모든 데이터가 만들어지지 않은 상태에서도 생성된 데이터를 순서대로 처리할 수 있습니다!

그 예로, 일반적인 이터레이터 패턴에서는 hasNext() 시에 false가 나온다면 데이터 처리작업이 종료됩니다. 그러나 RxJava에서는 데이터를 생성하는 시점에 통지하므로, 데이터가 아직 만들어지지 않았다면 데이터가 생성될 때까지 기다리기만 하면 됩니다. RxJava에서 데이터의 통지가 끝나는 시점은 완료 또는 에러 통지를 받았을 때 뿐입니다!!! 이를 이용한다면 끝없이 계속 생상되는 데이터도 다룰 수 있고, 언제 생성될지 알 수 없는 이벤트도 처리할 수 있다는 뜻입니다.

3.2 비동기 처리

3.2.1

이전에 알아봤던 비동기 처리를 RxJava에서는 어떤 식으로 하는지 알아보겠습니다.

RxJava에서는 비동기 처리를 수행하는 데 필요한 API를 제공하므로, 기존에 구축한 비즈니스 로직에 영향을 주지 않고도 생산자나 소비자의 작업을 비동기로 처리하게 교체할 수 있습니다.
RxJava를 사용할 때 특별한 처리를 해주지 않는다면 생산자의 처리 작업을 실행하는 쓰레드에서 각 연산자의 처리 작업과 소비자의 처리 작업이 실행됩니다. 생산자가 메인 쓰레드에서 처리 작업을 하고 있다면 연산자와 소비자도 메인 쓰레드에서 실행된다는 뜻입니다!

하지만 개발자가 직접 비동기 처리를 하도록 설정하면 생산자와 연산자, 소비자가 처리작업을 실행할 쓰레드를 분리할 수 있습니다!

예를 들어 안드로이드에서, 서버에서 데이터를 받아와 TextView에 표시할 때, 서버에서 데이터를 받아오는 작업은 백그라운드 쓰레드에서, 텍스트를 표시하는 작업은 UI 쓰레드에서 실행할 수 있도록 바꾸어줄 수 있습니다!

또한, 같은 쓰레드에서 생산자와 소비자를 처리해줄 시 데이터를 통지하는 측은 데이터를 받아 처리하는 측의 처리 속도에 영향을 받습니다. 특히 데이터를 받는 측의 처리 속도가 느리다면, 통지하는 측의 속도에도 영향을 미칩니다.

Flowable.interval(1000L, TimeUnit.MILLISECONDS)
        .doOnNext { println("emit: ${System.currentTimeMillis()} 밀리초 : $it") }
        .subscribe { Thread.sleep(2000L) }

    Thread.sleep(5000L)

emit: 1637910900837 밀리초 : 0
emit: 1637910902843 밀리초 : 1

예를 들어, 1000밀리초마다 데이터를 통지하는 Flowable을 interval 메소드로 실행한다 해도 통지된 데이터를 받는 측의 처리 작업이 1000밀리초보다 길어지면 Flowable은 1000밀리초마다 데이터를 통지할 수 없습니다! (위의 예제에서 interval 1000L로 데이터 생산, 소비에서 2000L 걸려 결과에도 2000L씩 차이)

이처럼 같은 쓰레드에서 처리할 경우 생산자, 소비자 모두 고려해야 합니다.
하지만 RxJava가 제공하는 interval 메소드 처럼 시간을 다루는 메소드가 생성한 Flowable/Observable은 데이터를 받는 측의 처리 속도가 어느정도 느려도 데이터를 통지하는 측의 속도보다 느리지 않으면 시간 간격을 조정해 적절한 시점에 데이터를 통지할 수 있습니다! 아래와 같이요!!

Flowable.interval(1000L, TimeUnit.MILLISECONDS)
        .doOnNext { println("emit: ${System.currentTimeMillis()} 밀리초 : $it") }
        .subscribe { Thread.sleep(500L) }

    Thread.sleep(5000L)

emit: 1637911292108 밀리초 : 0
emit: 1637911293118 밀리초 : 1
emit: 1637911294106 밀리초 : 2
emit: 1637911295108 밀리초 : 3
emit: 1637911296105 밀리초 : 4

모두 1000L씩 차이가 납니다!
하지만 이런 결과는 RxJava가 제공하는 클래스 내부에서 해당 기능을 지원하기 때문에 가능한 것으로, 개발자가 직접 create 메소드 등으로 통지처리를 구현할 때는 데이터를 받는 측의 처리 속도에 영향ㅇ르 받지 않게 해야합니다!

또한 Flowable/Observable을 생성하는 메소드가 무엇인지에 따라 수행될 쓰레드도 달라지는데, just나 from 메소드처럼 이미 생성된 데이터를 통지한다면 메인 쓰레드, timer나 interval처럼 시간과 관련된 처리 작업은 메인 쓰레드와 다른 쓰레드에서 작동합니다.

만약 메인 쓰레드에서 처리 작업을 수행하는 Flowable을 just 메소드로 생성하면 이 Flowable이 처리 작업을 완료할 때까지 다음 처리 작업을 수행하지 않게 됩니다.

아래 예제는 구독 전에 start를 출력, 구독 끝나면 end를 출력하는데 메인 쓰레드에서 작동하기 때문에 모든 데이터 통지가 끝난 뒤 end가 출력됩니다.

println("start")

    Flowable.just(1, 2, 3)
        .subscribe(object : ResourceSubscriber<Int>() {
            override fun onNext(t: Int) {
                println("${Thread.currentThread()} : $t")
            }

            override fun onError(t: Throwable?) {
                println(t?.message)
            }

            override fun onComplete() {
                println("${Thread.currentThread()} : 완료")
            }
        })

println("end")
    
=> start
Thread[main,5,main] : 1
Thread[main,5,main] : 2
Thread[main,5,main] : 3
Thread[main,5,main] : 완료
end    

그렇다면 이번 예제에서는 interval로 생성해보겠습니다.

println("start")

    Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .subscribe(object : ResourceSubscriber<Long>() {
            override fun onNext(t: Long) {
                println("${Thread.currentThread()} : $t")
            }

            override fun onError(t: Throwable?) {
                println(t?.message)
            }

            override fun onComplete() {
                println("${Thread.currentThread()} : 완료")
            }
        })

println("end")

Thread.sleep(1000L)

=>start
end
Thread[RxComputationThreadPool-1,5,main] : 0
Thread[RxComputationThreadPool-1,5,main] : 1
Thread[RxComputationThreadPool-1,5,main] : 2

메인 쓰레드와 다른 쓰레드에서 작동하기 때문에 Flowable의 처리 작업을 호출한 뒤 바로 다음 문장(여기서는 end)가 실행됩니다!

RxJava는 구조상 데이터를 통지하는 측과 받는 측의 처리 작업이 논리적으로 분리돼어 있어 상호 독립적으로 수행하게 됩니다. 이는 통지하는 측은 생산과 통지만 책임 지고, 처리 측은 데이터를 받아 처리하는 것만 책임지기 때문에 통지된 데이터가 어떻게 만들어졌는지 신경쓰지 않아도 됩니다.

이처럼 통지와 처리측의 책임 범위가 분리된 점은 각각의 처리 작업을 다른 쓰레드에서 실행하기 쉽게 합니다. 즉 데이터를 통지하는 측은 데이터를 통지한 뒤 바로 자신의 작업으로 돌아와도 받는 측의 처리 작업에는 논리적으로 영향을 미치지 않으며, 데이터를 통지하는 측의 처리 작업에 의해 데이터를 받는 측의 행동이 바뀌는 일도 발생하지 않습니다.

RxJava에서 이를 이용하는 방법은 subscribeOn() 메소드, observeOn() 메소드가 있습니다.
subscribeOn 메소드는 생산자의 처리 작업을 실행하는 메소드이고, observeOn은 데이터를 받는 측의 쓰레드를 설정할 수 있습니다. 그리고 이 쓰레드 종류를 용도에 따라 스케줄러를 제공합니다.

스케줄러

메소드반환하는 스케줄러
computation연산 처리를 할 때 사용하는 스케줄러, 논리 프로세서 수 만큼 쓰레드를 캐시한다. I/O처리 작업에선 사용 불가능
ioI/O 처리 작업을 할 때 사용하는 스케쥴러, 쓰레드 풀에서 쓰레드를 가져오며 필요에 따라 새 쓰레드를 만든다.
single싱글 쓰레드에서 처리작업을 할 때 사용하는 스케쥴러
newThread매번 새로운 쓰레드를 생성하는 스케줄러
from(Executor executor)지정한 Executor가 생성한 쓰레드에서 처리 작업을 수행하는 스케줄러
trampoline현재 쓰레드의 큐에 처리 작업을 넣는 스케줄러, 이미 다른 처리 작업이 큐에 있다면 큐에 들어있는 작업의 처리가 끝난 뒤 새로 등록한 처리 작업을 수행

이중 computation과 io는 거의 같은 역할을 하며 호출할 때 쓰레드 풀에서 서로 다른 쓰레드를 가져옵니다. 다만 연산 용도, io 용도로 사용된다는 것만 다릅니다.

보통 우리 안드로이드에서는 메인 쓰레드와 다른 쓰레드를 이용하기 위해 io 쓰레드를 자주 사용합니다!

subscribeOn

subscribeOn() 메소드는 생성자의 처리 작업을 어떤 스케줄러에서 실행할지를 설정하는 메소드 입니다. 여기서 말하는 생산자란 최초에 데이터를 생성하는 Flowable/Observable 입니다.
subscribeOn 메소드는 생산자가 처리작업을 할 스케줄러를 설정할 때 사용하므로, 최초 1회만 설정 가능합니다. 그래서 뒤에 설정한 subscribeOn은 모두 무시됩니다.

Flowable.just(1,2,3,4,5)
        .subscribeOn(Schedulers.computation())
        .subscribeOn(Schedulers.io())
        .subscribeOn(Schedulers.single())
        .subscribe {
            println("${Thread.currentThread()} : $it")
        }

    Thread.sleep(500L)
=>Thread[RxComputationThreadPool-1,5,main] : 1
Thread[RxComputationThreadPool-1,5,main] : 2
Thread[RxComputationThreadPool-1,5,main] : 3
Thread[RxComputationThreadPool-1,5,main] : 4
Thread[RxComputationThreadPool-1,5,main] : 5

모두 computation 쓰레드에서 실행된 것을 볼 수 있습니다!

이처럼 subscribeOn을 많이 지정하는 것은 혼란의 원인이 되니 지양해야 합니다. 또한 interval같은 메소드로 생성한 생산자는 이미 스케줄러가 자동으로 지정될 때는 개발자가 나중에 subscribeOn을 해주어도 반영되지 않습니다. 이런 경우 생산자를 생성하는 메소드의 인자로 스케줄러를 입력해준다면 원하는 쓰레드에서 실행 또한 가능합니다.

Flowable.interval(300L, TimeUnit.MILLISECONDS, Schedulers.io())

observeOn

subscribeOn과 반대로 observeOn은 소비자의, 데이터를 받는 측의 처리작업을 어떤 스케줄러에서 실행할지를 설정하는 메소드로, 연산자 마다 서로 다른 스케줄러를 지정할 수 있습니다.

observeOn은 인자로 delayError과 bufferSize 또한 받을 수 있습니다!

인자번호인자타입설명
1Scheduler쓰레드를 관리하는 스케줄러 클래스
2booleantrue일 때 에러가 발생해도 즉시 통지하지 않고, 버퍼에 담긴 데이터를 모두 통지한 뒤 에러통지. false 경우 에러 발생시 바로통지한다! 기본값은 false
3int통지를 기다리는 데이터를 버퍼에 담는 크기로, 기본 값 128

RxJava에서 배압을 적용할 때 세 번째 인자가 중요한데, 안드로이드에서는 보통 많이 사용치 않으니 알아만 두면 좋을 듯 하다!

안드로이드에서 subscribeOn(Schedulers.io())를 가장 많이 사용해, 서버와 통신을 하는 것 같다!!!

3.2.2 연산자 내에서 생성되는 비동기 Flowable/Observable

flatMap() 메소드는 연산자 내부에서 Flowable/Observable을 생성하고, 이를 시작한 뒤 데이터를 통지하는 메소드가 입니다. 이때 생성한 Flowable/Observable을 별도의 쓰레드에서 실행하면 데이터를 받아 생성한 Flowable/Observable이 시작될 때까지는 flatMap 메소드가 데이터를 받은 순서대로 실행되지만, 일단 Flowable/Observable이 시작되면 그 뒤로는 각자 다른 쓰레드에서 처리 작업을 수행합니다. 즉, 사용하는 메소드에 따라 여러 Flowable/Observable을 서로 다른 쓰레드에서 동시에 실행한다는 뜻입니다. 그러므로 메소드에 따라서 데이터를 받은 순서대로 Flowable/Observable을 생성하더라도 데이터를 받은 순서대로 통지하는 것은 보장하지 않습니다. 말로는 이해가 어렵지만, 뒤의 예제들을 보면서 이해해보겠습니다!

flatMap() 메소드

flatMap() 메소드는 데이터를 받으면 새로운 Flowable/Observable을 생성하고, 이를 실행해 여기에서 통지되는 데이터를 메소드의 결과물로 통지하는 연산자입니다. 이때 데이터가 연속적으로 들어오고 이를 통해 생성되는 Flowable/Observable이 별도의 쓰레드에서 처리 작업을 하는 상황을 생각해봅시다! 이때 생성된 Flowable/Observable은 동시에 실행돼 제각각 데이터를 통지하고, 이렇게 통지된 데이터는 메소드의 실행 결과로 통지됩니다. 그러므로 최종으로 통지되는 데이터의 순서는 다를 수 있습니다!

예를 들어 A,B,C를 통지하는 Flowable을 just 메소드로 생성한다고 합니다. Flowable이 통지한 데이터를 받아 flatMap() 메소드 내부에서는 Flowable을 delay 메소드로 생성하고, 이 Flowable로 각 데이터를 1000밀리초씩 늦게 통지하려고 합니다. 이 예제에서는 순서대로 데이터를 받아 Flowable을 생성하고, 처리를 시작할 때까지는 거의 시간을 들이지 않고 실행할 수 있습니다. 즉, 거의 시간 차 없이 각각의 쓰레드에서 처리 작업을 하는 여러 Flowable을 실행하게 됩니다. 하지만 자바에서는 CPU 부하 등의 영향으로 정확한 시간 내에 처리 작업을 할 수 없습니다. 그러므로 쓰레드에 따라서는 처리속도가 느정지기도 합니다. 이렇게 되면 처리속도가 느린 쓰레드보다 다른 쓰레드의 데이터가 먼저 통지되므로 데이터 받은 순서와 통지 순서가 다를 수 있습니다.

Flowable.just("A", "B", "C")
        .flatMap {
            return@flatMap Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
        }.subscribe {
            println("${Thread.currentThread()} $it")
        }
Thread.sleep(2000L)
=> Thread[RxComputationThreadPool-3,5,main] C
Thread[RxComputationThreadPool-1,5,main] A
Thread[RxComputationThreadPool-1,5,main] B

받은 데이터(A,B,C)로 새로운 Flowable을 만들며, 1000밀리초 늦게 데이터를 통지하는 Flowable을 생성한다!
이처럼 실행 순서와 상관 없이 처리 성능이 중요한 경우 flatMap 메소드를 사용하지만, 데이터 순서가 중요하다면 concatMap을 이용합니다!

concatMap() 메소드

concatMap() 메소드는 받은 데이터로 메소드 내부에 Flowable/Observable을 생성하고, 이 Flowable/Observable을 하나씩 순서대로 실행해 통지된 데이터를 그 결과물로 통지하는 연산자입니다. 이 과정에서 생성되는 Flowable/Observable은 각각 다른 쓰레드에서 처리해도, 이에 영향을 받지 않고 새로 생성한 Flowable/Observable의 처리 데이터를 받은 순서대로 통지합니다.

예를들어 'A,B,C'를 통지하는 Flowable이 있다고 합시다. concatMap() 메소드에서 delay 메소드를 이용해 1000밀리초 늦게 통지하려 합니다. 이때 데이터를 받으면 Flowable을 생성해 처리를 시작하고, 이 Flowable이 처리 작업을 끝낼 때까지 다른 Flowable을 생성하지 않습니다. 그러므로 Flowable이 서로 다른 쓰레드에서 실행되도 상관 없이 순서대로 출력합니다.

Flowable.just("A", "B", "C")
        .concatMap {
            return@concatMap Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
        }.subscribe {
            println("${Thread.currentThread()} data=$it, time=${
                LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"))
            }")
        }
Thread.sleep(4000L)
=> Thread[RxComputationThreadPool-1,5,main] data=A, time=06.851
Thread[RxComputationThreadPool-2,5,main] data=B, time=07.878
Thread[RxComputationThreadPool-3,5,main] data=C, time=08.882

쓰레드 이름과 데이터, 그리고 시간을 함께 출력합니다.
모두 1000밀리초 씩 뒤에 출력됐으며, 데이터 또한 원본 데이터 순서대로 출력됩니다.
성능 보다는 순서가 중요한 경우 사용합니다.

concatMapEager() 메소드

concatMapEager() 메소드는 데이터를 받으면 새로운 Flowable/Observable을 생성하고, 이를 즉시 실행해 그 결과로 받은 데이터를 원본 데이터 순서대로 통지하는 연산자입니다. 이때 생성한 Flowable/Observable이 서로 다른 쓰레드에서 실행된다면 생성한 Flowable/Observable은 flatMap() 메소드 처럼 동시에 실행됩니다. 하지만 결과로 통지하는 데이터는 concatMap() 메소드와 같이 원본 순서대로 통지됩니다.

이는 'A,B,C'라는 데이터가 있다면, flatMap() 메소드처럼 동시에 실행합니다. 그러나 무작위로 나온 결과값을 버퍼에 잠시 저장 후 생성된 순서대로 통지합니다.

Flowable.just("A", "B", "C")
        .concatMapEager {
            return@concatMapEager Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
        }.subscribe {
            println("${Thread.currentThread()} data=$it, time=${
                LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"))
            }")
        }
Thread.sleep(2000L)
=> Thread[RxComputationThreadPool-2,5,main] data=A, time=32.905
Thread[RxComputationThreadPool-2,5,main] data=B, time=32.911
Thread[RxComputationThreadPool-2,5,main] data=C, time=32.911

결과를 보면 순서도 동일하며, 시간 또한 거의 같습니다!

이처럼 순서와 성능이 모두 중요하다면 concatMapEager()을 사용하는 것이 맞습니다.
하지만 데이터의 사이즈가 커 대량의 데이터가 버퍼에 쌓이면 메모리가 부족할 수 있으니 주의해야 합니다!!

3.2.3 다른 쓰레드간 공유되는 객체

RxJava는 Reactive Streams 규칙과 Observable 규약에 따라 구현하는 한 비동기 처리를 쉽게할 수 있습니다! 하지만 생산자와 소비자 사이가 아니라, 외부 공유 객체를 다룬다면 순차성을 잃을 수 있습니다. 예를 들어 아래 예제에서 다른 쓰레드 처리 작업을 하는 Flowable 두개가 각각 데이터를 통지할 때 각 Flowable을 구독하는 Subscriber는 받은 데이터의 처리 작업을 하면서 내부에서 같은 Counter 객체에 대한 변경 처리작업을 합니다!

class Counter {
    private var count = 0

    fun increment() {
        count++
    }

    fun get(): Int {
        return count
    }
}

val counter = Counter()

Flowable.range(1, 10000)
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe({ data -> counter.increment() },
            { error -> println("${error.printStackTrace()}") },
            { println(counter.get()) }
        )

Flowable.range(1, 10000)
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe({ data -> counter.increment() },
            { error -> println("${error.printStackTrace()}") },
            { println(counter.get()) })

Thread.sleep(1000L)
=> 19607
19607 // 매번 다르게 나옵니다!

10000번씩 2번 했기 때문에 20000을 예상했지만, 답은 더 적게 나옵니다.

Counter가 순차적으로 실행됐다면 get()=20000이 되어야 하지만 동시에 실행되어 기대한 값을 얻지 못했습니다.
이는 각 생산자와 소비자 사이에는 처리 작업을 순차적으로 진행ㅎ지만, 두 소비자가 동시에 공유 객체(Counter)에 접근할 때는 순차적으로 접근하지 않기 때문입니다.

이에 대한 해결책으로 두 Flowable/Observableㅇ르 한 Flowable/Observable로 결합(merge)하고 이를 이용해 새로운 Flowable을 생성하는 merge와 같은 메소드를 이용해 해결할 수 있습니다!
이 Flowable/Observable을 구독하면 다른 쓰레드에 있는 여러 개의 Flowable/Observable이라 해도 데이터를 순차적으로 받을 수 있습니다.

val counter = Counter()

val flowable1 = Flowable.range(1, 10000)
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())


val flowable2 = Flowable.range(1, 10000)
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())

Flowable.merge(flowable1, flowable2)
        .subscribe({ data -> counter.increment() },
            { error -> println("${error.printStackTrace()}") },
            { println(counter.get()) })

Thread.sleep(1000L)
=> 20000

결과 값은 20000이 나왔습니다. 이는 여러개의 Flowable을 하나로 merge하여 Counter 객체에 대한 접근이 순차적으로 처리됐기 때문입니다!
이는 거의 동시에 데이터를 통지해도 merge한 후 받는 측에서는 순차적으로 통지됩니다. 그래서 통지를 받게 되면 이 처리 작업(increment)가 끝날 때 까지 다음 통지의 처리 작업이 이루어지지 않습니다.
그래서, 공유 객체에 대한 순차적 처리를 원한다면 여러 Flowable/Observable을 merge(병합)하는 방법도 생각해 보면 좋을 것 같습니다.

다음 포스트에서는 에러 처리에 대해서 알아보겠습니다!!

profile
뜻을 알고 코딩하기

0개의 댓글