잘 사용하지도 않고, 사용해본 적도 없지만 안드로이드 개발자 채용공고에 자격요건에 RxJava와 Reactive Programming에 대한 이해를 가지고 있는 분이라고 적힌 것을 본적이 많다.
그래서 이번 기회에 RxJava에 대해 공부를 해보려고 한다.
기존의 알고리즘 문제와 같이 절차를 명시하여 순서대로 실행하는 프로그래밍 방식을 Imperative Programming(명령형 프로그래밍)방식이라고 한다. Reactive Programming이란, 데이터의 흐름을 먼저 정의하고, 데이터가 변경되었을 때, 연관된 작업이 실행된다.
즉, 프로그래머가 어떠한 기능을 직접 정해서 실행하는 것이 아니라 시스템에 이벤트가 발생하였을 때, 처리되는 것이다.
공식 사이트 설명
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
전통적인 스레드 기반의 프로그래밍을 하는 자바와는 접근방식이 다르다. 여러 스레드를 사용하는 경우 예상치 못한 문제가 발생할 수도 있다.
RxJava는 함수형 프로그래밍 방식을 도입하여, 순수 함수를 지향하기 때문에 Thread-Safe하며, Side Effect를 감소시킬 수 있다.
import io.reactivex.rxjava3.core.Observable;
public class RxExample {
public static void main(String[] args){
Observable.just(1, 2, 3)
.map(x -> x*10)
.subscribe(System.out::println);
}
}
Observable
: ReactiveX의 핵심 요소이자 데이터 흐름에 맞게 Consumer에게 알림을 보내는 Classjust()
: 가장 간단한 Observable 생성 방식이다. (생성 연산자라고도 한다)fromXXX()
: 여러 데이터를 다뤄야 하는 경우 사용, 특정 데이터 타입의 데이터 XXX를 Observable로 변환해주는 메서드map()
: RxJava의 연산자이다. 데이터를 원하는 형태로 바꿀 수 있다.subscribe()
: Observable은 구독(subscribe)을 해야 데이터가 발행된다. 따라서 Observable을 구독하여 데이터를 발행 후, 수신한 데이터를 원하는 방식으로 사용(System.out::println)한다.
1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
2. 구독하고 있는 Observer들은 Observable이 데이터를 발행할 때마다, 알림을 받는다.
3. 알림을 받고 데이터를 수신한 Observer는 데이터를 가지고 어떠한 일을 한다.
Observable이 데이터를 발행하고 알림(Event)을 보내면, Observer는 Observable을 구독(subscribe)해 데이터를 소비(consume)한다.
Observer의 역할 → Consumer(소비자), Subscriber(구독자), Watcher(감시자)
public interface Emitter<@NonNull T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
onNext
: 데이터의 발행을 알림onComplete
: 모든 데이터의 발행이 완료되었음을 알림, 딱 한 번만 발생하며 이후에 onNext가 발생하면 안됨onError
: 오류가 발생했음을 알림, 이후에 onNext와 onComplete가 발생하지 않음구독이란, 수신할 데이터를 가지고 할 행동을 정의한다. Observer는 subscribe() 메소드에서 수신한 각각의 알림에 대한 실행할 내용을 정의한다.'
public final Disposable subscribe()
public final Disposable subscribe(@NonNull Consumer<? super T> onNext)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete)
public final void subscribe(@NonNull Observer<? super T> observer)
Disposable class는 구독의 정상적인 해지를 돕는다.
isDisposed()
를 통해 구독이 해지되었는지 확인할 수 있다.
//Observable 생성
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
// 데이터 흐름 정의
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// onComplete() 이후의 데이터는 발행되지 않음
emitter.onNext(3);
}
});
// subscribe 함수를 통해 실제로 데이터를 발행하여 소비함
observable.subscribe(
// onNext
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("onNext : " + integer);
}
},
// onError
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
System.out.println("onError : " + throwable);
}
},
// onComplete
new Action() {
@Override
public void run() throws Throwable {
System.out.println("onComplete");
}
}
);