1. RxJava의 기본(1)

안석주·2021년 11월 7일
0

RxJava

목록 보기
1/10

서론

이번에 RxJava를 공부하면서, 책 한 챕터씩 공부하고, 제대로 모르는 것에 대해 정리하면서 파악해보도록 해보겠습니다!
공부할 책은 RxJava 리액티브 프로그래밍이란 책이며, 총 6장으로 구성되어 있습니다! 이번 챕터는 1장이며 RxJava의 기본에 대해서 정리합니다. 후에 2,3장에서 람다 함수나 비동기 처리, 옵저버 패턴 등을 같이 정리해보겠습니다. 이 책에서의 RxJava는 RxJava2.x 버전이며, 예제가 모두 Java 또는, Kotlin으로 예제를 실습하도록 해보겠습니다.

1. RxJava의 기본

1.1 RxJava와 리액티브 프로그래밍?

RxJava는 이벤트 처리와 같은 비동기 처리에 최적화 됐으며, 2.0 버전부터 Reactive Streams 사양을 구현합니다. 이는 어떤 라이브러리나 프레임워크라도 데이터 스트림을 비동기로 처리하는 공통 메커니즘을 인터페이스로 제공합니다. 또한 RxJava는 안드로이드 2.3 버전 이상을 지원하니 참고해야 합니다.

리액티브 프로그래밍이란 데이터가 통지될 때마다 관련 프로그램이 반응해 데이터를 처리하는 프로그래밍 방식입니다. 예를 들어 GPS 위치 정보가 변경될 때는 데이터를 전송하다가, 이동을 멈췄을 때 데이터 전송을 멈추는 것처럼, 데이터를 한번에 보내는 것이 아닌, 각각의 데이터가 생성될 때마다 순서대로 보냅니다. 이러한 데이터의 흐름을 데이터 스트림이라고 합니다. 이는 List같은 컬렉션과는 다르게 앞으로 발생할 가능성이 있는 데이터까지 포함하는 집합체입니다.

예를들어 부가가치세 세율을 계산하는 앱이 있다고 가정합니다.
리액티브 프로그래밍이 아니라면
1. 상품 가격을 입력한다.
2. 세금 계산이라는 버튼을 누른다
3. 계산된 결과를 표시한다
라는 세가지 순서가 존재하고, 상품가격이 변경되어도 세금 계산 버튼을 누르지 않을 경우 계산이 되지 않게됩니다.

그런데 리액티브 프로그래밍에서는
1. 상품가격을 입력한다
2. 입력한 내용을 통지한다
3. 통지를 받아 계산하고 결과를 표시한다.

세금 계산이라는 버튼을 만들지 않아도 자동 계산이 되며, 상품 가격이 변경될 때마다 부가가치세가 자동으로 변경이 됩니다.

하지만 위의 행위들은 가격 변동을 감지하는 리스너를 이용해 자동으로 부가가치세를 계산하는 프로그램과 어떤 차이가 있는지 의문이 생길 수 있습니다. 이때 리액티브 프로그래밍인지는 무엇이 데이터 처리를 수행하는가로 가릴 수 있습니다.

위의 말대로 상품가격이 변경될 때 리스너가 반응하면서 상품가격에 해당하는 부가가치세를 다시 계산해 표시하는 것은 리액티브 프로그래밍이라고 할 수 없습니다.(상품가격이 부가가치세 금액을 바꾼다고 생각)

하지만 리스너가 반응하면서 부가가치세 항목에 새로운 데이터가 전달되고, 부가가치세 항목에서 계산 프로그램을 실행해 결과를 부가가치세로 표시한다고 생각하면 이는 리액티브 프로그래밍이라고 할 수 있습니다.(상품가격 데이터를 전달만 하고, 부가가치세 항목이 새로운 데이터를 받아 부가가치세 값을 변경한다고 생각한다)

이는 이러한 작은 프로그램에서는 코드만 봐서는 같은 결과를 예상할 수 있습니다. 그러나 어떤 프로그램이 어떤 일에 책임이 있는지는 확연한 차이가 납니다. 처리가 복잡해져 상품가격에 따른 변경사항의 수가 늘어난다면, 어떤 프로그램이 무엇을 처리해야할지에 대한 비중이 바뀌고, 코드도 바뀌어야 합니다.

리액티브 프로그래밍에서 데이터를 생산하는 측(상품가격)은 데이터를 전달하는 것까지만 책임을 집니다. 그러므로 데이터를 생산하는 측은 데이터를 소비하는 측(부가가치세 계산)이 전달받은 데이터로 무엇을하는지 몰라도 됩니다! 또한 데이터를 생산하는 측은 소비하는 측에서 무엇을 하든지 관계가 없기 때문에 소비측의 처리를 기다릴 필요가 없습니다. 이 때문에 데이터 통지 후 데이터를 소비하는 측에서 데이터를 처리하는 도중이라도 데이터를 생산하는 측은 바로 다음 데이터를 처리할 수 있습니다. 그러므로 데이터를 통지한 후 데이터를 소비하는 측에서 데이터를 처리하는 도중이라도, 데이터를 생산하는 측은 바로 다음 데이터를 처리할 수 있습니다. 이처럼 비동기 처리를 쉽게 구현할 수 있습니다.

1.2 Reactive Streams

Reactive Streams란 라이브러리나 프레임워크에 상관없이 데이터 스트림을 비동기로 다룰 수 있는 공통 메커니즘으로, 이 메커니즘을 편리하게 사용할 수 있는 인터페이스를 제공합니다! 이 인터페이스의 구현은 각 라이브러리와 프레임워크에서 해야합니다.

Reactive Streams는 데이터를 만들어서 통지하는 Publisher와 통지된 데이터를 받아 처리하는 Subscriber로 구성됩니다. Subscriber가 Publisher를 구독하면 Publisher가 통지한 데이터를 Subscriber가 받을 수 있습니다!

  • Publisher : 데이터를 통지하는 생산자
  • Subscriber : 데이터를 받아 처리하는 소비자


(발그림 죄송합니다..)
순서대로 알아보면, 가장 먼저 Publisher는 통지 준비가 끝나면 이를 Subscriber에 통지(onSubscribe)합니다. 해당 통지를 받은 Subscriber는 받고자 하는 데이터 개수를 요청(request)합니다. 이때 Subscriber가 자신이 통지 받을 데이터 개수를 요청(request)하지 않으면 Publisher는 통지해야 할 데이터 개수 요청을 기다리기 때문에, 통지를 시작할 수 없습니다. 그다음 Publisher는 데이터를 만들어 Subscriber에 통지(onNext) 합니다. 이 데이터를 받은 Subscriber는 받은 데이터를 사용해 처리 작업을 수행합니다. Publisher는 요청받은 만큼의 데이터를 통지한 뒤 Subscriber로부터 다음 요청이 올 때까지 데이터 통지를 중단합니다. 이 요청을 보내지 않으면 Publisher는 요청 대기 상태가 돼 Subscriber에 데이터를 통지할 수 없습니다.

Publisher는 Subscriber에 모든 데이터를 통지하고 마지막으로 데이터 전송이 완료돼 정상 종료됐다고 통지(onComplete)합니다. 완료 통지를 하고 나면 Publisher는 이 구독 건에 대해 어떤 통지도 하지 않습니다! 또한 Publisher는 처리 도중에 에러가 발생하면 Subscriber에 발생한 에러 객체와 함께 에러를 통지(onError)합니다.

Subscriber가 Publisher에 통지받을 데이터 개수를 요청하는 이유는, 예를들어 Publisher와 Subscriber의 처리가 각각 다른 스레드에서 진행되는데, Publisher의 통지 속도가 빠르면 Subscriber가 소화할 수 없을 만큼 많은 처리 대기 데이터가 쌓입니다. 이를 막기 위해서 Publisher가 통지할 데이터 개수를 Subscriber가 처리할 수 있을 만큼으로 제어하기 위한 수단입니다.

이를 정리해보면 아래와 같습니다.

  • onSubscribe : 데이터 통지가 준비됐음을 통지
  • onNext : 데이터 통지
  • onError : 에러 통지
  • onComplete : 완료 통지

아래는 Reactive Streams에서 제공하는 인터페이스입니다.

  • Publisher : 데이터를 생성하고, 통지하는 인터페이스
  • Subscriber : 통지된 데이터를 전달받아 처리하는 인터페이스
  • Subscription : 데이터 개수를 요청하고 구독을 해지하는 인터페이스
  • Processor : Publisher와 Subscriber의 기능이 모두 있는 인터페이스

각각을 살펴보면, Publisher는 아래와 같이 선언되어 있습니다.

public interface Publisher<T> {
    public void subscribe(Subscriber <? super T> subscriber);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);
    
    public void onError(Throwable error);
    
    public void onComplete();
}

public interface Subscription {
    public void request(long num);
    
    public void cancel();
}

public abstract interface Processer<T, R> extends Subscriber<T>, Publisher<R> {}

이를 통해 publisher 객체를 생성해, subscribe함수를 이용해 Subscriber 인터페이스를 구현해줄 수 있다는 것, 그리고 Subscriber 인터페이스 구현과정에서 onSubscribe 함수에서 다시 request를 통해 데이터의 개수를 요청해주거나, 구독 해지를 해줄 수 있다는 것을 알 수 있습니다!

이를 코드로 보면 다음과 같습니다.

val publisher = Flowable.just(1, 2, 3, 4, 5, 6)
        .filter { data -> data % 2 == 0 }
        .map { data -> data * 100 }
        .subscribe(object : Subscriber<Int> {
            private lateinit var subscription: Subscription
            override fun onSubscribe(s: Subscription?) {
                if (s != null) {
                    subscription = s
                    subscription.request(1L)
                }
            }

            override fun onNext(t: Int?) {
                subscription.request(1L)
            }

            override fun onError(t: Throwable?) {
                TODO("Not yet implemented")
            }

            override fun onComplete() {
                subscription.cancel()
            }
        })

Flowable은 뒤에 나오니 신경쓰지 않으셔도 됩니다! 다만 Publisher를 구현한 클래스라고만 이해만 해두면될 것 같습니다. 또한 just는 데이터를 왼쪽부터 순서대로 통지합니다!
여기서 몇가지 알아둘 것이 있는데, Subscription은 구독을 취소하거나(cancel) 데이터를 개수만큼 요청할 때 사용됩니다(request). 인자로 요청 할 데이터 개수를 적어줍니다(Long). 그리고 Subscription을 onNext에서도 사용하고 싶다면 onSubscribe에서 받은 Subscription이 Subscriber 내부에 있어야 합니다!

이 예제는 첫 구독시에 데이터를 한개씩 요청하며, 후에 데이터를 1개씩 추가 요청하고, 데이터 통지 완료시에 구독을 취소합니다.

이러한 Reactive Streams에도 규칙이 있습니다.

  1. 구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생한다.
  2. 통지는 순차적으로 이루어진다.
  3. null을 통지하지 않는다.
  4. Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해 종료한다.

이를 하나씩 보면, Reactive Streams에서 구독 시작 통지는 해당 구독에서 한 번만 수행됩니다. 따라서 onSubscribe()는 구독을 시작해 통지 준비가 끝났을 때 한 번만 실행됩니다. 단, 처리가 종료된 후 같은 Publisher와 Subscriber로 subscirbe() 메소드를 호출하면, 다시 onSubscribe() 메소드가 실행되기는 합니다. 이는 처리가 끝난 뒤 다시 subscribe() 메소드를 호출하면 새로운 구독을 시작한다고 생각하기 때문입니다. 그러나 같은 인스턴스를 사용하면, 내부의 관리상태를 초기화 하지 않을 시 의도하지 않는 결과를 얻을 수 있으니 조심해야 합니다.(그냥 한번만 사용하길 권장!!)

또한 데이터는 동시에 통지되면 안되고, 꼭 순차적으로 통지되도록 해야합니다.

그리고 null을 통지할 수 없습니다. RxJava 1.x버전에서는 가능하니, 2.x 버전으로 전환할 때 알아두어야 합니다.

추가적으로 Subscription에 대한 규칙이 있는데, 데이터 개수 요청에 Long.MAX_VALUE를 적용해주면 데이터 개수에 의한 통지 제한은 없어집니다. 또한 Subscription의 메소드는 동기화된 상태로 호출해야 합니다.

Subscription의 request에서 Long.MAX_VALUE를 데이터 개수요청으로 지정하면 통지할 데이터 개수의 제한이 없기 때문에 이 요청을 전송한 후에 대이터 개수 요청(request)을 보내지 않아도 데이터 통지를 계속해서 받을 수 있습니다.

또한 요청받은 데이터 개수가 남은 상태에서 추가로 데이터 개수를 요청받으면 새로 요청받은 데이터 개수가 기존 데이터 개수에 추가된다는 점을 주의해야 합니다. 즉, 데이터 개수 요청을 받을 때마다 기존 개수에 더해져 통지 가능한 데이터 개수가 증가합니다. 그래서 이 더해진 결과가 Long.MAX_VALUE에 도달하면 통지 가능한 데이터 개수 제한이 없어집니다.
(그런데 찾아보니 MAX_VALUE가 9223372036854775807L로 선언되어있다... 더해서 저 값이 나오긴 할까 궁금하다... 대규모 서비스라면 가능할 것 같기도 하다)

마지막으로 Subscription의 메소드는 동기화된 상태로 호출해야 합니다. 즉, Subscription의 메소드를 동시에 호출해서는 안됩니다.

RxJava를 사용할 때는 각 통지 메소드와 Subscription의 메소드를 호출할 때 동기화가 이뤄지므로 처리 자체가 쓰레드 안전한지를 특히 신경써야 합니다. 안그럴 경우 정확한 통지가 이루어지지 않을 수 있음!

다음 포스트에는 RxJava의 기본구조와 Flowable, Observable, Subscriber, Observer에 대해서, Disposable 등에 대해서 정리해보겠습니다.

profile
뜻을 알고 코딩하기

0개의 댓글