[Reactive] Observable

임동현·2022년 5월 1일
0

Observable

Observable 은 데이터의 흐름을 관장하는 클래스로 데이터의 흐름에 맞게 알림을 보내 옵저버가 데이터 처리를 할 수 있도록 만드는 역할을 수행합니다. 따라서 옵저버블은 일련의 연산자를 거친 아이템을 최종 옵저버로 내보내는 방출 기반의 조합 가능한 lterator 라고 할 수 있습니다.
이 부분을 조금 더 자세히 설명하면 아래와 같습니다

  • 옵저버는 옵저버블을 구독합니다.
  • 옵저버블이 그 내부의 아이템들을 방출하기 시작합니다.
  • 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응합니다.

옵저버블이 onNext, onComplete,onError 같은 이벤트 함수를 통해 방출을 제어합니다.
이 함수들이 호출되는 시점은 옵저버가 구독을 시작한 시점입니다.

-onNext : 옵저버블은 모든 아이템을 하나씩 이 함수에 전달합니다.
-onComplete: 모든 아이템이 onNext 함수를 통과하면 옵저버블은 onComplete 함수를 호출합니다.
-onError:옵저버블에서 에러가 발생하면 onError 함수가 호출돼 정의된 대로 에러를 처리합니다.

이 내용을 다이어그램으로 표한하면 아래와 같습니다.

위 내용을 예제로 살펴보면 아래와 같습니다.

// 리스트 이터레이터를 Observable 객체로 만들어줍니다.

var list : List<Any> = listof("one","Two","Three","Four","Five")
var observable: Observable<String> = list.toObservable()

// Observable에서 방출하는 데이터를 수신할 Observer 객체를 생성합니다.
val observer: Observer<String> = object: Observer<String>{
    override fun onComplete(){
        println("All Complete")
    }

    override fun onNext(item: Any){
        println("Next $item")
    }

    override fun onError(e: Throwable){
        println("Error Occured $e")
    }

    override fun onSubscribe(e: Disposable){
        println("New Subscription”)
    }
}

// 마지막으로 옵저버블에 옵저버를 연결하면서 동시성을 관리하는 스케줄러를 정의합니다.
observable
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer)

위 코드의 실행하면 옵저버가 옵저버블 구독을 시작하면 구독이 시작되었음을 옵저버에 알려주는 onSubscribe() 함수가 호출되고 이후 방출이 되면 onNext() 함수가 호출되고 모든 아이템방출이 완료되면 onComplete()함수가 호출됩니다 . 실행결과는 아래와 같습니다.

New Subscription
Next One
Next Two
Next Three
Next Four
Next Five
All Complete

Creating Observable

Observable 객체를 생성하는 방법은 create,from,just 등 다양한 방법이 있습니다.
이번 절에서는 옵저버블 객체를 생성하는 다양한 방법에 대해 설명하도록 하겠습니다.

create()

Create() 는 직접적인 코드 구현을 통해 옵저버 메서드를 호출하여 옵저버블을 생성합니다.
create() 함수는 사용자가 지정한 데이터구조를 사용하거나 내보내는 값을 제어하려고 할 때 유용합니다.

Create() 함수를 사용해 옵저버블 객체를 생성하는 예제 코드는 아래와 같습니다.

fun funExam(){
    val observer: Observer<String> = object: Observer<String>{
        override fun onComplete(){
            println("All Complete")
        }

        override fun onNext(item: String){
            println("Next $item")
        }

        override fun onError(e: Throwable){
            println("Error Occured $e")
        }

        override fun onSubscribe(e: Disposable){
            println("New Subscription")
        }
    }
    
    val observable: Observable<String> = Observable.create<String>{
        it.onNext(“One”)
        it.onNext(“Two”)
        it.onNext(“Three”)
        it.onNext(“Four”)
        it.onNext(“Five”)
        it.onComplete()
    }
    observable.subscribe(observer)
}

출력 결과는 이전과 동일 하지만 이전 코드와의 차이점을 살펴보면 이전코드에서는 toObservable() 함수를 사용해 list 의 이터레이터를 옵저버블 객체로 변환해주었다면 , 위 코드에서 Observable class 에서 지원하는 create() 함수를 사용해 직접 옵저버블 객체를 생성하고 있습니다. create() 함수의 람다식 내부에서 onNext 함수를 사용해 옵저버에 방출할 아이템을 추가해주고 마지막에 onComplete()함수를 호출해 방출을 완료합니다.

즉 , create() 함수의 람다식 내부에서 ObservableEmitter의 onNext () 함수에 방출할 아이템을 추가하면 옵저버의 onNext() 함수에 아이템이 방출됩니다. 또한 onComplete() 함수 호출후에는 어떠한 방출도 옵저버에 전달되지 않습니다.

위 예제코드의 onComplete() 함수 다음에 onNext() 함수를 사용해 방출을 추가해주어도 옵저버에는 방출되지 않습니다.

val observable: Observable<String> = Observable.create<String>{
        it.onNext(“One”)
        it.onNext(“Two”)
        it.onNext(“Three”)
        it.onNext(“Four”)
        it.onNext(“Five”)
        it.onComplete()
        it.onNext(“Six”)
    }
observable.subscribe(observer)

.
from()
from 함수는 create()함수에 비해 상대적으로 간단히 옵저버블 객체를 만들수 있는 방식으로 배열에 포함된 요소를 하나씩 순서대로 방출하는 업저버블 객체를 생성합니다.

from() 계열 함수는 다음과 같습니다.

  • fromArray()

  • fromIterable()

  • fromCallable()

  • fromFuture()

  • fromPublisher()

from()의 동작 매커니즘을 그림으로 살펴보면 아래와 같습니다

fromArray()

Array를 Observable객체를 생성하는 것으로 fromArray() 함수를 통해 배열을 옵저버블 객체로 변환합니다.

fromIterable()

Iterable 인터페이스를 구현한 클래스에서 Observable객체를 생성하는 것으로 Iterable 인터페이스는 반복자(iterator)를 반환합니다.

fromCallable()

Java 5에 추가된 동시성 API인 Callable인터페이스로, 비동기 실행 후 결과를 반환하는 call()메소드를 정의합니다.

Runnable 인터페이스처럼 메소드가 하나이고 인자가 없다는 점에서 비슷하지만, 실행 결과를 리턴한다는 점에서 차이가 있습니다.

또한 Executor 인터페이스의 인자로 활용되기 때문에 잠재적으로 다른 스레드에서 실행되는 것을 의미하기도 합니다.

fromFuture()

Future 인터페이스 역시 자바 5에서 추가된 동시성 API로 비동기 계산의 결과를 구할 때 사용합니다.

보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future객체를 반환합니다.

get()메소드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때 까지 블로킹 됩니다.

fromPublisher()

Publisher는 자바 9 의 표준인 Flow API의 일부로 Observable.create()와 마찬가지로 onNext()와 onComplete()함수를 호출할 수 있습니다.

profile
프론트엔드 공부중

0개의 댓글