[RxJava] RxJava 프로그래밍 - Observable 클래스

Happy Jiwon·2023년 4월 26일
1

Study

목록 보기
4/5
post-thumbnail

이전 포스팅 에서 리액티브 프로그래밍에 대해 소개하였는데, RxJava 프로그래밍은 Observable에서 시작해 Observable로 끝난다고 해도 과언이 아닐 정도로 중요한 개념이다.
오늘은 Observable 클래스와 그의 파생 클래스에 대해 알아보자


Observable 클래스

Observable은 Observer 패턴을 구현한다.

🖐🏻 Observer 패턴이란?
객체의 상태 변화를 관찰하는 관찰자(Observer) 목록을 객체에 등록한다. 그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵서버에게 변화를 알려준다.

사용자가 버튼을 누르면 미리 등록해 둔 onClick() 메서드를 호출해 원하는 처리를 하는 것이 Observer 패턴의 대표적인 예이다.

RxJava의 Observable은 3가지 알림을 구독자에게 전달해준다.

  • onNex: Observable이 데이터 발행을 알린다. (기존의 옵서버 패턴과 동일)
  • onComplete: 모든 데이터의 발행을 완료했음을 알린다. (단 한번단 발생하며, 발생한 후에는 onNext 이벤트가 발생하면 안된다.)
  • onError: Obervable 에서 어떤 이유로 에러가 발생했음을 알린다. (onError가 발생하면 onNext, onComplete 이벤트는 발생하지 않는다. 즉, 실행 종료)

Observable 함수 알아보기

< 단일 데이터 >

just() 함수

데이터를 발행하는 가장 쉬운 방법은 기존의 자료구조를 사용하는 것이다. just() 함수는 인자로 넣은 데이터를 차례로 발행하고 Observable을 생성한다. 한 개의 값부터 같은 타입의 여러 개의 값(최대 10개)을 넣을 수 있다.
(단, 실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다.)

여러 개의 인자 값을 넣은 just() 함수의 코드는 어떻게 될까?

Observable.just(1, 2, 3, 4, 5, 6)
		.subscribe(System.out::println);

/**
* print-> 1
*         2
*         3
*         4
*         5
*         6
*/

위와 같이 인자로 넣은 1~6을 변경 없이 그대로 발행한다.


subscribe() 함수와 Disposable 객체
RxJava에서 동작시키기 원하는 것을 사전에 정의한 다음 실행되는 시점을 조절할 수 있다. 이때 사용하는 것이 subscribe() 함수이다.

subscribe() 함수의 주요 원형을 살펴보자

1. Disposable subscribe()
2. Disposable subscribe(Consumer<? super T> onNext)
2. Disposable subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError)
4. Disposable subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError, Action onComplete)

각 원형의 의미는 아래와 같다.

1) 인자가 없는 subscribe() 함수
onNext와 onComplete 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던진다.(throw)
따라서 Observable로 작성한 코드를 테스트하거나 디버깅할 때 활용한다.

2) 인자가 1개 있는 오버로딩
onNext 이베트 처리
만약 onError 이벤트가 발생하면 OnErrorNotImplementedException을 던진다.

3) 인자가 2개인 함수
onNext와 onError 이벤트 처리

4) 인자가 3개인 함수
onNext, onError, onComplete 이벤트 모두 처리 가능

위 함수 원형은 모두 Disposable 인터페이스의 객체를 리턴한다.
Disposable은 RxJava 1.x의 subscription 객체에 해당하고 아래 2개 함수만 존재한다.

1. void dispose()
2. boolean isDisposed()

💡Disposable 인터페이스 함수
dispose()는 Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수이다. Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독과 관계를 끊는다.
[isDisposed() 함수 활용 예]

Observable<String> source = Observable.just("RED", "GREEN", "YELLOW");
Disposable d = source.subscribe(
	v -> System.out.println("onNext() : value : " + v),
    err -> System.err.println("onError() : err : " + err.getMessage()),
    () -> System.out.println("onComplete()");
);
System.out.println("isDisposed() : " + d.isDisposed());

위 코드의 실행 결과

onNext() : value : RED
onNext() : value : GREEN
onNext() : value : YELLOW
onComplete()
isDisposed() : true

create() 함수
위에서 설명한 just() 함수는 onNext(), onError(), onComplete 등의 이벤트가 자동으로 발생하지만 onCreate() 함수는 직접 호출해야 한다.
즉, 데이터를 발행하기 위해서 onNext(), onComplete() 함수를 반드시 호출해야 한다.

create() 함수를 활용해 데이터를 발행하는 방법에 대해 알아보자.

Observable<Integer> source = Observable.create(
	(ObservableEmitter<Integer> emitter) -> {
    	emitter.onNext(10);
        emitter.onNext(20);
        emitter.onNext(30);
        emitter.onComplete();
    });
source.subscribe(System.out::Println);

Observable 클래스 타입인 source 변수에서는 onNext() 함수를 호출해 차례로 10, 20, 30 데이터를 발행한 후 onComplete() 함수를 호출해 데이터 발행을 완료한다.
(subscribe() 함수를 호출하지 않으면 아무것도 출력되지 않는다.)

💡 Observable.create()를 사용할 때 주의할 점
create() 함수는 RxJava에 익숙한 사용자만 활용하도록 권고하는데.... 그 이유는 굳이 create() 를 사용하지 않고 다른 팩토리 함수를 활용해 같은 효과를 낼 수 있기 때문이다.
만약 그래도 사용해야 한다면 아래 사항을 확인해야 한다.

1. Observable이 구독 해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 memory leak 발생
2. 구독자가 구독하는 동안에만 onNext, onComplete 이벤트를 호출해야 한다.
3. 에러가 발생했을 때는 오직 onError 이벤트로만 전달해야 한다.
4. 배압(back pressure)을 직접 처리해야 한다.

< 복수 데이터 >

fromArray() 함수
지금까지 just()나 create()로 단일 데이터를 다뤄보았다. 그렇다면 단일데이터가 아닐 때는 어떻게 해야될까?

바로 fromXXX()계열 함수를 사용한다.

🖐🏻, 잠깐
RxJava 1.x에서는 from() 과 fromCallable() 함수만 사용했는데, 함수를 배열, 반복자, 비동기 계산 등 모두 사용하다보니 모호함이 생겼다고한다.
따라서 RxJava 2 에서는 from() 함수를 세분화하였다.

fromArray()함수를 활용해 배열에 들어있는 데이터를 처리하는 방법에 대해 알아보자

// fromArray()
Integer[] arr = {100, 200, 300};
Observable<Integer> source = Observable.fromArray(arr);

source.subscribe(System.out::println);
/**
* print-> 100
*         200
*         300
* 참고로 숫자뿐만 아니라 사용자 정의 클래스 객체도 넣을 수 있음
*/

🖐🏻 int[] 배열은 어떻게 처리할까?
실제 코드를 작성할 때 int[] 배열을 사용하며 Integer[]로 코딩하지는 않는다. 그럼 fromArray() 함수에 int[] 배열을 그대로 넣으면 어떤 결과가 나오는지 예상해보자

// int 배열 처리시
int[] intArray = {1, 2, 3};
Observable.fromArray(intArray).subscribe(System.out::println);
/**
* print-> [I@5c86dbc5
*/

위처럼 int[]로 사용했을 때 I@5c86dbc5라는 실행결과가 출력된다. 즉 RxJava에서 int 배열을 인식시키려면 Integer[]로 변환해야하는데 StreamAPI에서는 다음과 같은 방법을 제공한다.

// Stream API 사용 방법
Observable.fromArray(IntStream.of(intArray).boxed().toArray(Integer[]::new)).subscribe(System.out::println);
boxed() 메서드는 int[] 배열 각각의 요소를 Integer[] 배열의 스트림으로 만든다.

fromIterable() 함수
Observable을 만드는 다른 방법은 Iterable 인터페이스를 구현한 클래스에서 객체를 생성하는 것이다.
Iterable 인터페이스는 반복자를 반환하고, 이터레이터 패턴을 구현한 것으로 다음에 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는 장점이 있다.

Iterator 인터페이스에는 hasNext(), next() 메서드가 있다.

public interface Iterator<E> {
	boolean hasNext();
    E next();
}

위의 Iterable<E> 인터페이스를 구현하는 대표적인 클래스는 ArrayList, ArrayBlockingQueue, HashSet, LinkedList, Stack, TreeSet, Vector 등이 있다.

여기에선 List, Set, BlockingQueue 인터페이스 객체의 Observable을 만드는 방법에 대해 소개하겠다.

1) fromIterable() 에 List 사용

List<String> names = new ArrayList<>();
names.add("jiwon");
names.add("shin");
Observable<String> nameSource = Observable.fromIterable(names);
nameSource.subscribe(System.out::println);

2) fromIterable() 에 Set 사용

Set<String> cities = new HashSet<>();
cities.add("서울");
cities.add("송파구");
Observable<String> citySource = Observable.fromIterable(cities);
citySource.subscribe(System.out::println);

3) fromIterable() 에 BlockingQueue 사용

BlockingQueue<Order> orderQueue = new ArrayBlockingQueue<>(100);
orderQueue.add(new Order("ord-1"));
orderQueue.add(new Order("ord-2"));
orderQueue.add(new Order("ord-3"));
Observable<Order> orderSource = Observable.fromIterable(orderQueue);
orderSource.subscribe(System.out::println);

RxJava는 비동기 프로그래밍을 하기 위한 라이브러리로 지금까지 기본적인 자료구조로 Observable을 생성하는 부분을 살펴봤다. 이번에는 기존 자바에서 제공하는 비동기 클래스나 인터페이스와의 연동을 소개해보겠다.

fromCallable() 함수

Callable<String> callable = () -> {
	Thread.sleep(1000);
	return "Hello Callable";
};
Observable<String> callableSource = Observable.fromCallable(callable);
callableSource.subscribe(System.out::println);

위 코드는 람다 표현식으로 나타낸 Callable 객체와 fromCallable() 함수를 이용해 Observable 생성하는 방법이다.

call() 함수에는 인자가 없으므로 () -> {} 로 나타내었고, sleep을 사용해 1초간 쉬었다 'Hello Callable' 문자열을 반환하라! 라는 의미이다.


fromFuture() 함수
Future 인터페이스도 동시성 API로 비동기 계산의 결과를 구할 때 사용한다. 보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환한다.

Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
	Thread.sleep(1000);
	return "Hello Future";
});

Observable<String> futureSource = Observable.fromFuture(future);
futureSource.subscribe(System.out::println);

Executors 클래스는 단일 스레드 실행자 (SingleThreadExecutor)뿐만 아니라 다양한 스레드풀을 지원한다. 하지만 RxJava에서 제공하는 스케줄러를 활용하도록 권장한다.


fromPubilsher() 함수
Pubilsher는 자바 9의 표준인 Flow API의 일부이다.

import org.reactivestreams.Publisher;

Publisher<String> publisher = s -> {
	s.onNext("Observable.formPublisher");
	s.onComplete();
};
Observable<String> publisherSource = Observable.fromPublisher(publisher);
publisherSource.subscribe(System.out::println);

import 부분을 보면 기존의 Observable의 io.reactivex 패키지와는 달리 org.reactivestreams 패키지를 사용한다.
또한 Observable.create() 와 마찬가지로 onNext(), onComplete() 함수를 호출할 수 있다.

profile
공부가 조은 안드로이드 개발자

0개의 댓글