dependencies{
implementation "io.reactivex.rxjava3:rxjava:3.0.0"
implementation "io.reactivex.rxjava3:rxjava:3.0.0"
}
android{
compileOptions{
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
1) onNext()
하나의 소스 Obserable 에서 observer 까지 한 번에 하나씩 순차적으로 데이터를 발행한다.
2) onComplete()
데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext() 호출이 발생하지 않음을 나타낸다.
3) onError()
오류가 발생했음을 Observer 에 전달한다.
Obserable<String> source = Obserable.create(emitter -> {
emitter.onNext("hello")
emitter.onNext("world")
emitter.onComplete();
});
source.subscribe(system.out::println);
//결과
Hello
World
Obserable<String> source = Obserable.create(emitter -> {
emitter.onNext("hello")
emitter.onComplete();
emitter.onNext("world")
});
source.subscribe(system.out::println);
//결과
Hello
Obserable<String> source = Obserable.create(emitter -> {
emitter.onNext("hello")
emitter.onError(new Throwable());
emitter.onNext("world")
});
source.subscribe(system.out::println,
throwable -> System.out.println("error!")
);
//결과
Hello
error
Obserable<String> source = Obserable.just("hello","world");
source.subscribe(System.out::println);
//결과
Hello
World
from 과 관련된 메서드를 확인해 본다
fromArray()
fromIterable()
fromFuture()
fromPublisher()
fromCallable()
String[] itemArray = new String[]{"a","b","c"};
Obserable source = Obserable.fromArray(itemArray);
source.subscribe(Sytem.out::println);
//실행결과
a
b
c
ArrayList itemList = new ArrayList();
itemList.add("a");
itemList.add("b");
itemList.add("c");
Obserable source = Obserable.fromIterable(itemList);
source.subscribe(System.out::println);
// 실행결과
a
b
c
Future<String> future = Executors.newSingleThreadExecutor().submit(() - > {
Thread.sleep(5000);
return "Hello World";
});
Obserable source = Obserable.fromFuture(future);
source.subscribe(System.out::println);
//실행결과
Hello World
Publisher<String> publisher = subscriber ->{
subscriber.onNext("A");
subscriber.onNext("B");
subscriber.onNext("C");
subscriber.onComplete();
};
Obserable<String> source = Obserable.fromPublisher(publisher);
source.subscribe(System.out::println);
Callable<String> callable = () ->"Hello World";
Obserable source = Obserable.fromCallable(callable);
source.subscribe(System.out::println);
//실행결과
hello world
Single.just("hello world").subscribe(Systm.out::println)
Single.create(emitter -> emitter.success("Hello"))
.subscribe(System.out::println);
Obserable<Integer> src = Obserable.just(1,2,3);
Single<Boolean> singleSrc1 = src.all(i -> i>0);
Single<Integer> singleSrc2 = src.first(-1);
Single<List<Integer>> singleSrc3 = src.toList();
Single<String> singleSrc = Single.just("Hello World");
Obserable<String> obserableSrc = singleSrc.toObserable();
Maybe.create(emitter ->{
emitter.onSuccess(100);
emitter.onComplete();
})
.doOnSuccess(item ->System.out.println("doOnSuccess"))
.doOnComplete(() -> System.out.println("doOnComplete"))
.subscribe(Sytem.out::println);
Maybe.create(emitter -> emitter.onComplete())
.doOnSuccess(item -> System.out.println("doOnSuccess2"))
.doOnComplete(()->System.out.println("doOnComplete2"))
.subscribe(System.out::println);
//실행결과
doOnSuccess
100
doOnComplete2
Obserable<Integer> src1 = Obserable.just(1,2,3);
Maybe<Integer> srcMaybe1 = src1.firstElement();
srcMaybe1.subscribe(System.out::println);
Obserable<Integer> src2 = Obserable.empty();
Maybe<Integer> srcMaybe2 = src2.firstElement();
srcMaybe2.subscribe(System.out::println,throwable -> {},
() -> System.out.println("onComplete!"));
//실행결과
1
onComplete!
Completable.create(emitter ->{
//do something here
emitter.onComplete();
}).subscribe(()->System.out.println("completed1");
Completable.fromRunnable(() ->{
//do Something here
}).subscribe(() -> System.out.println("completed2"));
//실행 결과
completed1
completed2
Obserable src = Obserable.interval(1,TimeUnit.SECONDS);
src.subscribe(value -> System.out.println("#1 : "+value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("#2 : "+value));
Thread.sleep(3000);
//실행결과
#1:0
#1:1
#1:2
#1:3
#2:0
#1:4
#2:1
#1:5
#2:2
ConnectableObserable src = Obserable.interval(1,TimeUnit.SECONDS).publish();
src.connect();
src.subscribe(value -> System.out.println("#1 : "+value));
Thead.sleep(3000);
src.subscribe(value -> System.out.println("#2: "+value));
Thread.sleep(3000);
//실행결과
#1:0
#1:1
#1:2
#1:3
#2:3
#1:4
#2:4
#1:5
#2:5
Obserable<Long> src = Obserable.interval(100,TimeUnit.MILLISECONDS)
.publish()
.autoConnect(2);
src.subscribe(i -> System.out.println("A:"+i));
src.subscribe(i ->System.out.println("B:"+i));
Thread.sleep(500);
//실행결과
A:0
B:0
A:1
B:1
A:2
B:2
A:3
B:3
A:4
B:4
Obserable source = Obserable.just("A","B","C");
Disposable disposable = source.subscribe(o ->System.out.println(source));
유한한 아이템을 발행하는 Obserable의 경우 onComplete 호출로 안전하게 종료 된다. 하지만 무한하게 아이템을 발행하거나, 안드로이드의 생명주기와 관련된 경우는 메모리 누수를 방지하기 위해 dispose 가 필요하다
Disposable.dispose() 메서드 이용시 아이템 발행을 중단할 수 있다.
Obserable source = Obserable.interval(1000,TimeUnit.MILISECONDS);
//1초에 한번씩 아이템 발행
Disposable disposable = source.subscribe(System.out::println);
new Thread(() ->{
try{
Thread.sleep(3500);
}catch(InterruptedException e){
e.printStackTrace();
}
disposable.dispose();
}).start();
//실행결과
0
1
2
Obserable Source = Obserable.interval(1000,TimeUnit.MILLISECONDS);
Disposable d1 = source.subscribe(System.out::println);
Disposable d2 = source.subscribe(System.out::println);
Disposable d3 = source.subscribe(System.out::println);
CompositeDisposable cd = new CompositDisposable();
cd.add(d1);
cd.add(d2);
cd.add(d3);
//또는
cd.addAll(d1,d2,d3);
//특정 시점에 폐기하기
cd.dispose();