[Android] RxJava와 Reactive Programming

Delight Yoon·2023년 3월 17일
0

Android

목록 보기
15/17

RxJava와 Reactive Programming

잘 사용하지도 않고, 사용해본 적도 없지만 안드로이드 개발자 채용공고에 자격요건에 RxJava와 Reactive Programming에 대한 이해를 가지고 있는 분이라고 적힌 것을 본적이 많다.

그래서 이번 기회에 RxJava에 대해 공부를 해보려고 한다.

📌 Reactive Programming

기존의 알고리즘 문제와 같이 절차를 명시하여 순서대로 실행하는 프로그래밍 방식을 Imperative Programming(명령형 프로그래밍)방식이라고 한다. Reactive Programming이란, 데이터의 흐름을 먼저 정의하고, 데이터가 변경되었을 때, 연관된 작업이 실행된다.

즉, 프로그래머가 어떠한 기능을 직접 정해서 실행하는 것이 아니라 시스템에 이벤트가 발생하였을 때, 처리되는 것이다.

  • Imperative Programming(명령형 프로그래밍) → Pull 방식
    • 데이터를 사용하는 곳인 소비자(Consumer)에서 데이터를 직접 가져와서 사용한다.
  • Reactive Programming → Push 방식
    • 데이터의 변화가 발생하는 곳(이벤트가 발생하는 곳)에서 새로운 데이터를 Consumer에게 전달한다.
    • 이벤트의 발생에 의한 View 갱신 처리 또는 특정 작업
    • 주변 환경과 끊임없이 상호작용, 환경이 변하면 이벤트를 받아 동작함으로써 상호작용

📌 RxJava

  • ReactiveX의 Java언어 라이브러리
  • 2013년 2월 넷플릭스에서 처음 소개
  • 비동기 프로그래밍
  • 함수형 프로그래밍
  • RxJava2 에서는 Maybe나 Flowable 같은 새로운 클래스가 추가되었고, 비동기 테스팅이 더 자연스러워짐
  • 이벤트에 소비자가 비동기로 반응하여 처리한다.
    • 이벤트 → 스크린 터치, 마우스 클릭, 키 입력, 서버의 비동기 응답

공식 사이트 설명

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

  • ReactiveX는 관찰가능한 절차를 통해 비동기, 이벤트 기반 프로그램을 구성하기 위한 라이브러리이다.
  • Observer Pattern을 확장하며, Sequence를 조합할 수 있는 연산자를 지원한다.
  • low-level Thread, 동기화, Thread 안전성, non-blocking I/O에 관한 우려를 줄인다.

전통적인 스레드 기반의 프로그래밍을 하는 자바와는 접근방식이 다르다. 여러 스레드를 사용하는 경우 예상치 못한 문제가 발생할 수도 있다.

RxJava는 함수형 프로그래밍 방식을 도입하여, 순수 함수를 지향하기 때문에 Thread-Safe하며, Side Effect를 감소시킬 수 있다.

예제

import io.reactivex.rxjava3.core.Observable;

public class RxExample {
    public static void main(String[] args){
        Observable.just(1, 2, 3)
                .map(x -> x*10)
                .subscribe(System.out::println);
  }
}
  • Observable : ReactiveX의 핵심 요소이자 데이터 흐름에 맞게 Consumer에게 알림을 보내는 Class
    • RxJava의 가장 핵심적인 요소, Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용(subscribe)할 수 있도록 한다.
  • just() : 가장 간단한 Observable 생성 방식이다. (생성 연산자라고도 한다)
  • fromXXX() : 여러 데이터를 다뤄야 하는 경우 사용, 특정 데이터 타입의 데이터 XXX를 Observable로 변환해주는 메서드
  • map() : RxJava의 연산자이다. 데이터를 원하는 형태로 바꿀 수 있다.
  • subscribe() : Observable은 구독(subscribe)을 해야 데이터가 발행된다. 따라서 Observable을 구독하여 데이터를 발행 후, 수신한 데이터를 원하는 방식으로 사용(System.out::println)한다.


1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
2. 구독하고 있는 Observer들은 Observable이 데이터를 발행할 때마다, 알림을 받는다.
3. 알림을 받고 데이터를 수신한 Observer는 데이터를 가지고 어떠한 일을 한다.

Observable이 데이터를 발행하고 알림(Event)을 보내면, Observer는 Observable을 구독(subscribe)해 데이터를 소비(consume)한다.

Observer의 역할 → Consumer(소비자), Subscriber(구독자), Watcher(감시자)

Observable의 데이터 발행

public interface Emitter<@NonNull T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}
  • onNext : 데이터의 발행을 알림
  • onComplete : 모든 데이터의 발행이 완료되었음을 알림, 딱 한 번만 발생하며 이후에 onNext가 발생하면 안됨
  • onError : 오류가 발생했음을 알림, 이후에 onNext와 onComplete가 발생하지 않음

Observable의 데이터를 구독

구독이란, 수신할 데이터를 가지고 할 행동을 정의한다. Observer는 subscribe() 메소드에서 수신한 각각의 알림에 대한 실행할 내용을 정의한다.'

public final Disposable subscribe()
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete)
public final void subscribe(@NonNull Observer<? super T> observer)

Disposable class는 구독의 정상적인 해지를 돕는다.
isDisposed()를 통해 구독이 해지되었는지 확인할 수 있다.

발행과 구독 예제

//Observable 생성
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        // 데이터 흐름 정의
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
        // onComplete() 이후의 데이터는 발행되지 않음
        emitter.onNext(3);
    }
});

// subscribe 함수를 통해 실제로 데이터를 발행하여 소비함
observable.subscribe(
    // onNext
   new Consumer<Integer>() {
       @Override
       public void accept(Integer integer) throws Throwable {
           System.out.println("onNext : " + integer);
       }
   },
   // onError
   new Consumer<Throwable>() {
       @Override
       public void accept(Throwable throwable) throws Throwable {
           System.out.println("onError : " + throwable);

       }
   },
   // onComplete
   new Action() {
       @Override
       public void run() throws Throwable {
           System.out.println("onComplete");
       }
   }
);

References

profile
Yoon's Dev Blog

0개의 댓글