RxJava. Observable 클래스

박채빈·2022년 1월 4일
0

RxJava

목록 보기
2/3

Observable 클래스

Observable클래스는 옵저버 패턴을 구현한다.
라이프 사이클은 존재하지 않으며, 보통 단일 함수를 통해 변화만을 알린다.
Observable 클래스에는 Observable을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.
Observable을 생성할 때 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출한다.

  • RxJava의 Observable 알림
    • onNext : Observable이 데이터의 발행을 알린다. 기존 옵저버 패턴과 동일.
    • onComplete : 모든 데이터의 발행을 완료했음을 알린다. 단 한번만 발생하며, 이후 onNext가 발생하서는 안된다.
    • onError : Observable에 어떤 이유로 에러가 발생했음을 알린다. 이벤트 발생 이후 onNext, onComplete 이벤트가 발생하지 않는다.

just() 함수

just() 함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성한다.
한 개의 값을 넣을 수도 있고 인자로 여러 개의 값을 넣을 수도 있다. (최대 10개)
단, 타입은 모두 같아야 한다.

subscribe() 함수와 Disposable 객체

RxJava는 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절하기 위해 subscribe() 함수를 사용한다.
Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.

RxJava는 선언형 프로그래밍을 지향한다.

  • 어떤 방법으로 동작하는지가 아니라, 프로그래밍할 대상이 무엇인지 알려주는 것.
  • 명령형 언어에서는 실행할 알고리즘과 동작을 구체적으로 명시하지만, 선언형은 목표를 명시할 뿐 알고리즘을 명시하지 않는다.

subscribe() 함수는 인자를 3개까지 가질 수 있다.

  • 인자가 없음 : onNext와 onComplete 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던진다. 코드 테스트, 디버깅에 사용한다.
  • 인자가 1개 : onNext 이벤트를 처리한다.
  • 인자가 2개 : onNext, onError 이벤트를 처리한다.
  • 인자가 3개 : onNext, onError, onComplete 이벤트를 모두 처리할 수 있다.

subscribe() 함수는 Disposable 인터페이스의 객체를 리턴한다.
Disposabled은 아래 2개의 함수만을 가진다.

void dispose() // Observable에게 더이상 데이터를 발행하지 않도록 구독을 해지하는 함수
boolean isDisposed()

Disposable 인터페이스 함수 예제

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;

public class ObservableNotifications {
    public static void main(String[] args) {
        Observable<String> source = Observable.just("RED", "GREEN", "YELLOW");
        Disposable d = source.subscribe(
                v -> System.out.println("OnNext : value : " + v),
                err -> System.err.println("OnError : err : " + err.getMessage()),
                () -> System.out.println("OnComplete")
        );

        System.out.println("isDisposed : " + d.isDisposed());
    }
}

실행 결과

OnNext : value : RED
OnNext : value : GREEN
OnNext : value : YELLOW
OnComplete
isDisposed : true

create() 함수

just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지마 create() 함수는 onNext, onComplete, onError같은 알림을 개발자가 직접 호출해야 한다.

create() 함수를 활용하여 데이터를 발행하는 방법

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;

public class ObservableCreateExample {
    public static void main(String[] args) {
        Observable<Integer> source = Observable.create(
                (ObservableEmitter<Integer> emitter) -> {
                    emitter.onNext(100);
                    emitter.onNext(200);
                    emitter.onNext(300);
                    emitter.onNext(400);
                    emitter.onComplete();
                }
        );
        // subscribe() 함수를 호출하지 않으면 아무것도 출력되지 않는다.       
        source.subscribe(System.out::println).dispose();
        
        // 아래 1, 2는 같은 의미이다.
        source.subscribe(data -> System.out.println("Result : " + data)).dispose(); // 1
        source.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                System.out.println("Result : " + integer);
            }
        }).dispose(); // 2
    }
}

실행결과

100
200
300
400

Observable.create() 사용 시 주의사항.

  1. Observable이 구독 해지되었을 때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 잠재적으로 메모리 누수가 발생한다.
  2. 구독자가 구독하는 동안에만 onNext와 onComplete 이벤트를 호출해야 한다.
  3. 에러가 발생했을 때는 반드시 onError 이벤트로만 에러를 전달해야 한다.
  4. Back pressure를 직접 처리해야 한다.

fromArray() 함수

  • just(), create()와 달리 단일 데이터가 아닐 경우 fromXXX() 계열 함수를 사용한다.

fromArray 사용법

import io.reactivex.rxjava3.core.Observable;

public class ObservableFromArray {
    public static void main(String[] args) {
        Integer[] arr = {100, 200, 300};
        Observable<Integer> source = Observable.fromArray(arr);
        source.subscribe(System.out::println).dispose();
    }
}

int[] 배열 처리방법

Integer[]와 같은 래퍼타입이 아닌 int[] 배열인 경우, Integer[]로 변환해야 한다.

import io.reactivex.rxjava3.core.Observable;
import java.util.stream.IntStream;

public class IntegerArray {
    private static Integer[] toIntegerArray(int[] intArray) {
        // boxed() 메서드는 int[] 배열의 각 요소를 Integer로 변환해 Integer[] 배열의 스트림으로 만든다.
        // toArray() 메서드는 이 스트림을 Integer[] 배열로 만든다.
        return IntStream.of(intArray).boxed().toArray(Integer[]::new);
    }

    public static void main(String[] args) {
        int[] intArray = {400, 500, 600};
        Observable<Integer> source = Observable.fromArray(toIntegerArray(intArray));
        source.subscribe(System.out::println).dispose();
    }
}

fromIterable() 함수

Iterable 인터페이스는 반복자(iterator)를 반환한다.
Iterator 인터페이스는 이터레이터 패턴을 구현한 것으로, 다음에 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는다.

List 객체에서 Observable 만드는 방법

List<String> names = new ArrayList<>();
names.add("Jerry");
names.add("William");
names.add("Bob");

Observable<String> source = Observable.fromIterable(names);
source.subscribe(System.out::println).dispose();

Set 인터페이스 객체로 Observable 만드는 방법

Set<String> cities = new HashSet<>();
cities.add("Seoul");
cities.add("London");
cities.add("Paris");

Observable<String> source = Observable.fromIterable(cities);
source.subscribe(System.out::println).dispose();

BlockingQueue 인터페이스 객체로 Observable 만드는 방법

BlockingQueue<Order> orderQueue = new ArrayBlockingQueue<>(100);
orderQueue.add(new Order("ORD-1"));
orderQueue.add(new Order("ORD-2"));
orderQueue.add(new Order("ORD-3"));

Observable<Order> source = Observable.fromIterable(orderQueue);
source.subscribe(order -> System.out.println(order.getId())).dispose();

fromCallable() 함수

  • Callable 인터페이스 : 자바5에서 추가됨. 비동기 실행 후 결과를 반환하는 call() 메서드를 정의
    • Runnable 인터페이스의 run()처럼 메서드가 하나고 인자가 없지만, 실행 결과를 리턴한다는 점에서 차이가 있고, Executor 인터페이스의 인자로 활용되기 때문에 잠재적으로 다른 스레드에서 실행되는 것을 의미한다.

Callable객체와 fromeCallable() 함수를 이용해 Observable을 만드는 방법

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.Callable;

public class ObserverFromCallable {
    // 3초 쉬었다가 "Hello Callable" 반환
    static Callable<String> callable1 = () -> {
        Thread.sleep(3000);
        return "Hello Callable";
    };

    // 람다 표현식 없는 fromCallable 함수
    static Callable<String> callable2 = new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(3000);
            return "Hello Callable";
        }
    };

    public static void main(String[] args) {
        System.out.println("ObserverFromCallable1");
        Observable<String> source1 = Observable.fromCallable(callable1);
        source1.subscribe(System.out::println).dispose();

        System.out.println("ObserverFromCallable2");
        Observable<String> source2 = Observable.fromCallable(callable1);
        source2.subscribe(System.out::println).dispose();
    }
}

fromFuture() 함수

자바5에서 추가된 동시성 API. 비동기 계산의 결과를 구할 때 사용한다.
보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환한다.
get() 메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때 까지 블로킹된다.

Future 객체에서 fromFuture() 함수를 사용해 Observer를 생성하는 방법

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ObservableFromFuture {
    // 3초 쉬고 Hello Future 반환
    static Future<String> future = Executors.newSingleThreadExecutor().submit(()->{
        Thread.sleep(3000);
        return "Hello Future";
    });

    public static void main(String[] args) {
        System.out.println("ObserverFromFuture");
        Observable<String> source = Observable.fromFuture(future);
        source.subscribe(System.out::println).dispose();
    }
}
  • Executors 클래스는 단일 스레드 실행자(SingleThreadExecutor) 뿐 아니라 다양한 스레드풀을 지원한다. 하지만 RxJava에서 제공하는 스케줄러를 활용하도록 권장한다.

fromPublisher() 함수

  • Publisher는 자바9 표준 Flow API의 일부이다.

fromPublisher() 함수로 Observable 만드는 방법

import io.reactivex.rxjava3.core.Observable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;


public class ObservableFromPublisher {
    /*
     *  Observable은 io.reactive
     *  Publisher, Subscriber는 org.reactivestreams
     */
    static Publisher<String> publisher1 = (Subscriber<? super String> s) -> {
        s.onNext("Hello Observable.fromPublisher()");
        s.onComplete();
    };

    // 람다 표현식 안쓴거
    static Publisher<String> publisher2 = new Publisher<String>() {
        @Override
        public void subscribe(Subscriber<? super String> s) {
            s.onNext("Hello Observable.fromPublisher()");
            s.onComplete();
        }
    };

    public static void main(String[] args) {
        Observable<String> source1 = Observable.fromPublisher(publisher1);
        source1.subscribe(System.out::println).dispose();

        Observable<String> source2 = Observable.fromPublisher(publisher2);
        source2.subscribe(System.out::println).dispose();
    }
}
profile
안드로이드 개발자

0개의 댓글