Observable클래스는 옵저버 패턴을 구현한다.
라이프 사이클은 존재하지 않으며, 보통 단일 함수를 통해 변화만을 알린다.
Observable 클래스에는 Observable을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.
Observable을 생성할 때 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출한다.
just() 함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성한다.
한 개의 값을 넣을 수도 있고 인자로 여러 개의 값을 넣을 수도 있다. (최대 10개)
단, 타입은 모두 같아야 한다.
RxJava는 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절하기 위해 subscribe() 함수를 사용한다.
Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
RxJava는 선언형 프로그래밍을 지향한다.
- 어떤 방법으로 동작하는지가 아니라, 프로그래밍할 대상이 무엇인지 알려주는 것.
- 명령형 언어에서는 실행할 알고리즘과 동작을 구체적으로 명시하지만, 선언형은 목표를 명시할 뿐 알고리즘을 명시하지 않는다.
subscribe() 함수는 인자를 3개까지 가질 수 있다.
subscribe() 함수는 Disposable 인터페이스의 객체를 리턴한다.
Disposabled은 아래 2개의 함수만을 가진다.
void dispose() // Observable에게 더이상 데이터를 발행하지 않도록 구독을 해지하는 함수
boolean isDisposed()
Disposable 인터페이스 함수 예제
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
public class ObservableNotifications {
public static void main(String[] args) {
Observable<String> source = Observable.just("RED", "GREEN", "YELLOW");
Disposable d = source.subscribe(
v -> System.out.println("OnNext : value : " + v),
err -> System.err.println("OnError : err : " + err.getMessage()),
() -> System.out.println("OnComplete")
);
System.out.println("isDisposed : " + d.isDisposed());
}
}
실행 결과
OnNext : value : RED
OnNext : value : GREEN
OnNext : value : YELLOW
OnComplete
isDisposed : true
just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지마 create() 함수는 onNext, onComplete, onError같은 알림을 개발자가 직접 호출해야 한다.
create() 함수를 활용하여 데이터를 발행하는 방법
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
public class ObservableCreateExample {
public static void main(String[] args) {
Observable<Integer> source = Observable.create(
(ObservableEmitter<Integer> emitter) -> {
emitter.onNext(100);
emitter.onNext(200);
emitter.onNext(300);
emitter.onNext(400);
emitter.onComplete();
}
);
// subscribe() 함수를 호출하지 않으면 아무것도 출력되지 않는다.
source.subscribe(System.out::println).dispose();
// 아래 1, 2는 같은 의미이다.
source.subscribe(data -> System.out.println("Result : " + data)).dispose(); // 1
source.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("Result : " + integer);
}
}).dispose(); // 2
}
}
실행결과
100
200
300
400
fromArray 사용법
import io.reactivex.rxjava3.core.Observable;
public class ObservableFromArray {
public static void main(String[] args) {
Integer[] arr = {100, 200, 300};
Observable<Integer> source = Observable.fromArray(arr);
source.subscribe(System.out::println).dispose();
}
}
Integer[]와 같은 래퍼타입이 아닌 int[] 배열인 경우, Integer[]로 변환해야 한다.
import io.reactivex.rxjava3.core.Observable;
import java.util.stream.IntStream;
public class IntegerArray {
private static Integer[] toIntegerArray(int[] intArray) {
// boxed() 메서드는 int[] 배열의 각 요소를 Integer로 변환해 Integer[] 배열의 스트림으로 만든다.
// toArray() 메서드는 이 스트림을 Integer[] 배열로 만든다.
return IntStream.of(intArray).boxed().toArray(Integer[]::new);
}
public static void main(String[] args) {
int[] intArray = {400, 500, 600};
Observable<Integer> source = Observable.fromArray(toIntegerArray(intArray));
source.subscribe(System.out::println).dispose();
}
}
Iterable 인터페이스는 반복자(iterator)를 반환한다.
Iterator 인터페이스는 이터레이터 패턴을 구현한 것으로, 다음에 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는다.
List 객체에서 Observable 만드는 방법
List<String> names = new ArrayList<>();
names.add("Jerry");
names.add("William");
names.add("Bob");
Observable<String> source = Observable.fromIterable(names);
source.subscribe(System.out::println).dispose();
Set 인터페이스 객체로 Observable 만드는 방법
Set<String> cities = new HashSet<>();
cities.add("Seoul");
cities.add("London");
cities.add("Paris");
Observable<String> source = Observable.fromIterable(cities);
source.subscribe(System.out::println).dispose();
BlockingQueue 인터페이스 객체로 Observable 만드는 방법
BlockingQueue<Order> orderQueue = new ArrayBlockingQueue<>(100);
orderQueue.add(new Order("ORD-1"));
orderQueue.add(new Order("ORD-2"));
orderQueue.add(new Order("ORD-3"));
Observable<Order> source = Observable.fromIterable(orderQueue);
source.subscribe(order -> System.out.println(order.getId())).dispose();
Callable객체와 fromeCallable() 함수를 이용해 Observable을 만드는 방법
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.Callable;
public class ObserverFromCallable {
// 3초 쉬었다가 "Hello Callable" 반환
static Callable<String> callable1 = () -> {
Thread.sleep(3000);
return "Hello Callable";
};
// 람다 표현식 없는 fromCallable 함수
static Callable<String> callable2 = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "Hello Callable";
}
};
public static void main(String[] args) {
System.out.println("ObserverFromCallable1");
Observable<String> source1 = Observable.fromCallable(callable1);
source1.subscribe(System.out::println).dispose();
System.out.println("ObserverFromCallable2");
Observable<String> source2 = Observable.fromCallable(callable1);
source2.subscribe(System.out::println).dispose();
}
}
자바5에서 추가된 동시성 API. 비동기 계산의 결과를 구할 때 사용한다.
보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환한다.
get() 메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때 까지 블로킹된다.
Future 객체에서 fromFuture() 함수를 사용해 Observer를 생성하는 방법
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ObservableFromFuture {
// 3초 쉬고 Hello Future 반환
static Future<String> future = Executors.newSingleThreadExecutor().submit(()->{
Thread.sleep(3000);
return "Hello Future";
});
public static void main(String[] args) {
System.out.println("ObserverFromFuture");
Observable<String> source = Observable.fromFuture(future);
source.subscribe(System.out::println).dispose();
}
}
fromPublisher() 함수로 Observable 만드는 방법
import io.reactivex.rxjava3.core.Observable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public class ObservableFromPublisher {
/*
* Observable은 io.reactive
* Publisher, Subscriber는 org.reactivestreams
*/
static Publisher<String> publisher1 = (Subscriber<? super String> s) -> {
s.onNext("Hello Observable.fromPublisher()");
s.onComplete();
};
// 람다 표현식 안쓴거
static Publisher<String> publisher2 = new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onNext("Hello Observable.fromPublisher()");
s.onComplete();
}
};
public static void main(String[] args) {
Observable<String> source1 = Observable.fromPublisher(publisher1);
source1.subscribe(System.out::println).dispose();
Observable<String> source2 = Observable.fromPublisher(publisher2);
source2.subscribe(System.out::println).dispose();
}
}