[RxJava] RxJava & Observable_1

Elen li·2021년 8월 16일
0
post-thumbnail

ReactiveX는 Microsoft사 주도 아래 옵서버 패턴, 이터레이터 패턴, 함수형 프로그래밍의 장점과 개념을 접목한 반응형 프로그래밍 기법이다.

RxJava란?

이벤트 처리 및 비동기 처리의 구성에 최적화된 라이브러리입니다. Observable 추상화 및 관련 상위 함수에 중점을 둔 단일 JAR로 가벼운 라이브러리며, Java 이외에도 C++, Kotlin, Swift, JavaScript 등 여러 언어를 지원합니다.

반응형 프로그래밍?

주변 환경과 끊임없이 상호 작용을 하는 프로그래밍입니다.
즉, 프로그램이 주도하는 것이 아닌 환경이 변하면 이벤트를 받아 동작하도록 만드는 프로그래밍 기법을 말합니다.

RxJava의 장점

개발자는 동시성 문제, 다중 이벤트 처리, 백그라운드 스레드 처리 등을 다루면 많은 문제점에 직면하는데, Rx는 이에 대해 범용적이고 확실한 해결책을 제시합니다.


1. Observable

RxJava에서는 Observable을 구독하는 Observer가 존재하고, Observable이 순차적으로 발행하는 데이터에 대해서 반응합니다.

  • onNext()
    : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터 발행
  • onComplete()
    : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 더는 onNext() 호출이 발생하지 않음을 나타낸다.
  • onError()
    : 오류가 발생했음을 Observer에 전달한다.
    데이터 및 오류 내용을 발행할 때 null을 발행할 수 없음을 주의해야 합니다.

2. 연산자

1) create()

: Observable.create()를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료 및 오류의 알림을 직접 설정할 수 있다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    
    // 발행이 완료되면 반드시 onComplete()를 호출해야 한다.
    // onComplete()를 호출 후에는 아이템이 추가로 발행되더라도 구독자는 데이터를 통지받지 못한다.
    emiiter.onComplete();
    
    emitter.onNExt("World");
});

//Consumer를 통해 구독하기
source.subscribe(System.out::println);

//결과
Hello

** 만약 오류가 발행했을 시에는 Emitter를 통해 onError(Throwable)를 호출해야하며, 구독자는 이를 적절히 처리해야만 합니다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onError(new Throwable());
    emitter.onNext("World");
});
source.subscribe(System.out::println,
	throwable -> System.out.println("Error!!")
);

//결과
Hello
Error!!

<< 주의사항 >>

Observable이 폐기되었을 때 등록된 콜백을 모두 해제하지 않으면 메모리 누수가 발생하고, BackPressure(배압)을 직접 처리해야한다.

2) just()

  • 해당 아이템을 그대로 발행하는 Observable을 생성해줍니다.
  • 인자로 넣은 아이템을 차례로 발행합니다.
  • 1개 ~ 여러개의 아이템을 넣을 수 있습니다.
Observable<String> source = Observable.just("Hello","World");
source.subscribe(System.out::println);

//결과
Hello
World

from 연산자

  1. fromArray()
    : 배열을 ObservableSource로 변환하여 아이템을 순차적으로 발행합니다.
String[] itemArray = new String[]{"A","B","C"};
Observable source = Observable.fromArray(itemArray);
source.subscribe(System.out::println);

//결과
A
B
C
  1. fromIterable()
    : ArrayList, HashSet처럼 Iterable을 구현한 모든 객체를 ObservableSource로 변환하여 아이템을 순차적으로 발행합니다.
ArrayList itemList = new ArrayList();
itemList.add("A");
itemList.add("B");
itemList.add("C");

Observable source = Observable.fromIterable(itemList);
source.subscribe(System.out::println);

//결과
A
B
C
  1. fromFuture()
    : Future 인터페이스는 '비동기적'인 작업의 결과를 구할 때 사용되는데 이 역시도 fromFuture()연산자를 통해 Observable로 변경이 가능합니다.
    : Emitter는 Observable 내부에서 Future.get()메서드를 호출하고, Future의 작업이 끝나기 전까지 스레드는 블로킹됩니다.
    : RxJava에서는 Executor를 직접 다루기보다는 RxJava에서 제공하는 스케줄러를 사용하는 것을 권장합니다.
//Executor Service를 통해 비동기적인 작업이 이루어질 때 사용됩니다.
Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
	Thread.sleep(5000);
   	return "Hello World";
});

Observable source = Observable.fromFuture(future);
source.subscribe(System.out::println);
  1. fromPublisher()
  • Publisher
    : 잠재적인 아이템을 발행을 제공하는 생산자
    : Subscriber로부터 요청을 받아 아이템을 발행합니다.
Publisher<String> publisher = subscriber -> {
	subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onComplete();
};

Observable<String> source = Observable.fromPublisher(publisher);
source.subcribe(System.out::println);

//결과
A
B
C
  1. fromCallable()
  • Callable
    : 비동기적인 실행 결과를 반환하는 인터페이스입니다.
Callable<String> callable = () -> "Hello World";
Observable source = Observable.fromCallable(callable);
source.subscribe(System.out::println);

//결과
Hello World

출처: 아키텍처를 알아야 앱 개발이 보인다.

profile
Android, Flutter 앱 개발자입니다.

0개의 댓글