선수 지식 : 옵저버 패턴
RxJava와 ReactiveX는 모두 Observer 패턴과 이벤트 기반 프로그래밍을 기반으로 한 리액티브 프로그래밍 패러다임을 구현하는 라이브러리입니다.
ReactiveX 에서는 Observable 클래스와 함께 Emitter 인터페이스가 제공됩니다. Emitter는 Observable 에서 생성한 데이터 스트림에 이벤트를 emit(방출)하는 역할을 합니다. Observable은 Emitter를 구독하고, onNext(), onError(), onComplete()의 3가지 이벤트를 활용하여 Emitter로부터 전달된 이벤트를 처리합니다.
1. 기본 예제 코드
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 구독 시작
}
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
위 코드에서는 Observable.create() 메서드를 활용하여 데이터 스트림을 생성하고, 이를 Observable 객체로 변환합니다. 이후 subscribe() 메서드를 호출하여 Observer 인터페이스를 구현한 객체를 등록하여 데이터 스트림을 처리합니다. onNext(), onError(), onComplete() 메서드를 구현하여 데이터를 처리합니다. 이 때 데이터를 생성하는 것은 Emitter 인터페이스의 onNext() 메서드를 활용하여 데이터를 생성하며, onComplete() 메서드를 활용하여 데이터 생성을 완료하는 신호를 보내줍니다.
2. disposable.dispose(); 예제 코드
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onNext("RxJava");
emitter.onComplete();
}
});
Disposable disposable = observable.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// 구독 취소
disposable.dispose();
위 예제에서는 DisposableObserver 클래스의 subscribeWith() 메서드를 사용하여 Disposable 객체를 얻어왔습니다. 이 Disposable 객체를 사용하여 구독을 취소할 수 있습니다. dispose() 메서드는 Observable이 더 이상 데이터를 발행하지 않도록 하며, Observer가 더 이상 데이터를 처리하지 않도록 합니다.
3. throws 예제 코드
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onNext("Exception");
throw new Exception("Something went wrong!");
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 구독 시작
}
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
System.out.println("Error occurred: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
출력 결과
Hello
World
Exception
Error occurred: Something went wrong!
해당 코드는 Observable을 생성하고 구독하는 예제입니다. create 메소드를 이용해서 Observable을 생성하고, 이를 구독(subscribe)합니다. 이때, ObservableEmitter의 onNext 메소드를 통해 문자열 "Hello"와 "World", "Exception"을 발행하고, 예외를 발생시킵니다.
이후, Observer에서는 onNext 메소드를 통해 Observable에서 발행한 문자열을 받고 출력합니다. 이때, 예외가 발생하면 onError 메소드를 호출하여 예외 정보를 출력합니다.
RxJava는 관찰 가능한 시퀀스를 사용하여 비동기 및 이벤트 기반 프로그램을 구성하기 위한 라이브러리인 Reactive Extensions의 Java VM 구현입니다.
즉, RxJava는 ReactiveX의 Java 구현체입니다. ReactiveX 는 다양한 언어에서 지원되는 Reactive Programming 라이브러리이며, RxJava는 Java 언어를 위한 ReactiveX의 구현 중 하나입니다.
데이터/이벤트의 시퀀스를 지원하도록 관찰자 패턴을 확장하고 낮은 수준의 스레딩, 동기화, 스레드 안전성 및 동시 데이터 구조와 같은 문제를 추상화하면서 선언적으로 시퀀스를 함께 구성할 수 있는 연산자를 추가합니다.
즉, ReactiveX는 데이터 스트림과 이벤트 기반 프로그래밍을 위한 기술로, 비동기적인 프로그래밍을 간편하게 구현할 수 있습니다. ReactiveX는 옵저버 패턴, iterator pattern, 함수형 프로그래밍 등의 개념을 활용하여 데이터 스트림을 처리하며, 데이터를 생성, 필터링, 변환, 결합하는 등의 다양한 작업을 수행할 수 있습니다. 따라서, RxJava는 ReactiveX의 Java 구현체이므로 ReactiveX에서 제공하는 다양한 기능과 개념을 활용하여 비동기적인 프로그래밍을 쉽게 구현할 수 있습니다.
1. 명령형 프로그래밍 (Imperative programming)
명령형 프로그래밍은 작성된 코드가 정해진 순서대로 실행되는 방식의 프로그래밍 입니다. 코드가 순서대로 진행되므로 이해하기 쉽습니다. 즉 개발자가 작성한 조건문, 반복문, 함수를 따라 컴파일러가 다른 코드로 이동하게 됩니다.
public void imperativeProgramming() {
ArrayList<Integer> items = new ArrayList<>();
items.add(1);
items.add(2);
items.add(3);
items.add(4);
for (Integer item : items) {
if (item % 2 == 0) {
System.out.println(item);
}
}
items.add(5);
items.add(6);
items.add(7);
items.add(8);
}
결과
2
4
2. 반응형 프로그래밍(Reactive Programming)
반응형 프로그래밍은 시간 순으로 들어오는 모든 데이터의 흐름을 스트림(Stream) 으로 처리하며, 하나의 데이터 흐름은 다른 데이터 흐름으로 변형되기도 하고, 여러 데이터 흐름이 하나의 데이터 흐름으로 변경될 수도 있습니다.
public void reactiveProgramming() {
PublishSubject<Integer> items = PublishSubject.create();
items.onNext(1);
items.onNext(2);
items.onNext(3);
items.onNext(4);
items.filter(item -> item % 2 == 0)
.subscribe(System.out::println);
items.onNext(5);
items.onNext(6);
items.onNext(7);
items.onNext(8);
}
결과
6
8
PublishSubject 는 구독 시점 이후의 데이터만 옵저버에 전달하기 때문에 6, 8만 출력됩니다. (구독시점 이전의 데이터까지 출력하려면 ReplaySubject로 대체할 수 있습니다.)
안드로이드의 LiveData랑 비슷하다고 볼 수 있습니다. 둘다 옵저버 패턴으로 데이터가 변하면 그 상태를 실시간으로 읽는 것과 이벤트를 발생하는 것에는 근본적인 공통점이 있습니다.
(1)
val = PublishSubject : PublishSubject<Int> = PublishSubject.create()
(2)
PublishSubject.subscribe { it ->
println(it)
}
(3)
PublishSubject.onNext(1)
PublishSubject.onNext(2)
실행 결과
1
2
- publisher : 데이터를 발행하는 발행자 (데이터 스트림)
- subscriber : publisher를 구독 및 관찰하며, publisher가 발행하는 데이터를 사용하는 구독자
(1) Int 타입의 데이터를 발행하는 PublishSubject를 생성합니다. 이 때, create() 함수를 사용하여 새로운 PublishSubject 객체를 생성합니다.
(2) 생성된 PublishSubject 객체를 구독하여, 발행된 데이터를 출력하는 코드입니다. subscribe() 함수를 사용하여 PublishSubject 객체를 구독하고, 람다식을 사용하여 구독한 결과를 출력합니다. 이 때, 람다식의 인자로는 구독한 데이터가 전달됩니다. 즉, 현재 코드에서 subscriber는 발행된 데이터를 출력하는 작업(println(it))을 합니다.
(3) 생성된 PublishSubject 객체에 대해 onNext() 함수를 사용하여 데이터를 발행합니다. 위 코드에서는 먼저 1이라는 데이터를 발행하고, 이후에 2라는 데이터를 발행합니다. 발행하는 순간 subscriber는 해당 데이터들을 출력합니다.
UI 개발에서 유용: 반응형 프로그래밍은 UI 개발에서 매우 유용합니다. UI 요소가 변경될 때, 이를 반영하고 업데이트하는 데 필요한 코드가 자동으로 실행됩니다. 이는 개발자가 UI 변경을 수동으로 처리하는 것보다 훨씬 효율적입니다.
데이터 흐름 추적 용이: 반응형 프로그래밍은 데이터 흐름을 추적하기 쉽게 만들어 줍니다. 데이터가 변경될 때, 시스템은 자동으로 이를 반영하고 업데이트합니다. 이는 데이터 변경이 어디서 발생했는지를 추적하기 쉽게 만들어 줍니다.
코드 중복 감소: 반응형 프로그래밍은 코드 중복을 감소시켜 줍니다. 코드를 여러 번 작성하는 대신, 데이터 변경에 대한 응답을 처리하는 데 필요한 코드를 한 번 작성하면 됩니다. 이는 코드를 더욱 간결하고 유지 보수하기 쉽게 만들어 줍니다.
비동기 프로그래밍 용이: 반응형 프로그래밍은 비동기 프로그래밍을 용이하게 만들어 줍니다. 이는 여러 작업을 동시에 처리하고 결과를 취합하는 데 유용합니다.
https://github.com/ReactiveX/RxJava
https://blog.yena.io/studynote/2020/10/11/Android-RxJava(1).html