Obserable.just("1","2","a","3")
.map(i -> Integer.parseInt(i))
.subscribe(System.out::println,throwable -> System.out.println("Error")
);
result
1
2
Error
Obserable.just("1","2","a","3")
.map(i -> Integer.parseInt(i))
.onErrorReturn(throwable -> -1)
.subscribe(System.out::println);
result
1
2
-1
Obserable.just("1","2","a","3")
.map(i -> Integer.parseInt(i))
.onErrorResumeNext(throwable ->
Obserable.just(100,200,300))
.subscribe(System.out::println);
result
1
2
100
200
300
Obserable.just("1","2","a","3")
.map(i -> Integer.parseInt(i))
.retry(2)
.subscribe(System.out::println);
result
1
2
1
2
1
2
java.lang.NumberExceiption error
Obserable.just(1,2,3)
.doOnEach(notification -> {
Integer i = notification.getValue();
boolean isOnNext = notification.getValue();
boolean isOnComplete = notification.isOnComplete();
boolean isOnError = notification.isOnError();
Throwable throwable = notification.getError();
System.out.println("i = "+i);
System.out.println("isOnComplete = "+isOnComplete);
System.out.println("isOnError = "+isOnError);
if(throwable != null){
throwable.printStackTrace();
}
}).subscribe(value ->{
System.out.println("subscribe = "+value);
});
result
i=1
isOnNext =true
isOnComplete = false
isOnError = false
Subscribed = 1
...
obserable.just(1,2,3)
.doOnNext(item ->{
if(item>1) {
throw new IllegalArgumentException();
}
})
.subscribe(System.out::println,throwable ->throwable.printStackTrace());
result
1
java.lang.illegalArgumentException
Obserable.just(1,2,3)
.doOnSubscribe(disposable -> System.out.println("구독시작!"))
.subscribe(Sytem.out::println);
result
구독시작!
1
2
3
Obserable.just(1,2,3)
.doOnComplete(() -> System.out.println("완료"))
.subscribe(Sytem.out::println);
result
1
2
3
완료!
Obserable.just(2,1,0)
.map(i -> 10/i)
.doOnError(throwable -> System.out.println("오류!"))
.subscribe(System.out::println, t->t.printStackTrace());
result
5
10
오류!
java.lang.ar~~~ : by zero
Obserable.just(2,1,0)
.map(i -> 10/i)
.doOnComplete(() -> System.out.println("doOnComplete"))
.doOnTerminate(() -> System.out.println("doOnTerminate"))
.subscribe(System.out::println, t->t.printStackTrace());
result
5
10
doOnTerminate
java.lang.ar~~~ : by zero
Obserable src = Obserable.interval(500,TimeUnit.MILLISECONDS)
.doOnDispose(() -> System.out.println("doOnDispose"));
Disposable disposable = src.subscribe(System.out::println);
Thread.sleep(1100);
disposable.dispose();
result
0
1
doOnDispose
Obserable src = Obserable.interval(500,TimeUnit.MILLISECONDS)
.doOnComplete(()->System.out.println("doOnComplete"))
.doOnTerminate(()->System.out.println("doOnTerminate"))
.doFinally(()->System.out.println("doFinally"))
Disposable dispose = src.subscribe(System.out::println);
Thread.sleep(1100);
disposeable.dispose();
result
0
1
doFinally
Scheduler io = Schedulers.io();
Scheduler computation = Schedulers.computation();
Scheduler trampoline = Schedulers.trampoline();
Shceduler newThread = Schedulers.newThread();
// android
Scheduler mainThread = AndroidSchedulers.mainThread();
Obserable<Integer> src = Obserable.create(emitter ->{
for(int i=0;i<3;i++){
String threadName = Thread.currentThread().getName();
System.out.println("#subs on"+threadName+":"+i);
emitter.onNext(i);
Thread.sleep(100);
}
emitter.onComplete();
});
src.subscribe(s ->{
String threadName = Thread.currentThread().getName();
System.out.println("obs on"+threadName+":"+s);
});
result
Subs on main:0
obs on main :0
Subs on main:1
obs on main :1
Subs on main:2
obs on main :2
src.subscribeOn(Shcedulers.io())
.subscribe(s ->{
String threadName = Thread.currentThread().getName();
System.out.println("#Obs on"+threadName+":"+s);
result
Subs on RxCachedThreadScheduler-1:0
obs on RxCachedThreadScheduler-1 :0
Subs on RxCachedThreadScheduler-1:1
obs on RxCachedThreadScheduler-1 :1
Subs on RxCachedThreadScheduler-1:2
obs on RxCachedThreadScheduler-1 :2
});
src.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe(s ->{
String threadName = Thread.currentThread().getName();
System.out.println("Obs on"+threadName+":"+s);
});
Thread.sleep(500);
result
Subs on RxCachedThreadScheduler-1:0
obs on RxComputationThreadPool-1 :0
Subs on RxCachedThreadScheduler-1:1
obs on RxComputationThreadPool-1 :1
Subs on RxCachedThreadScheduler-1:2
obs on RxComputationThreadPool-1 :2
ObserveOn 연산자를 이용해 스케줄러를 지정하면 Overable이 발행하는 아이템을 가로채 해당 스케줄러로 아이템을 구독했다 하지만 interval, timer,replay,buffer 등의 연산자는 Computation 스케줄러로 고정되어 있기에 스케줄러를 아무리 지정해도 무시된다
안드로이드 는 데이터베이스로부터 데이터를 받을때 메인스레드가 블로킹 되는것을 막기위해 io 스케줄러를 사용한다 요청한 결과로 구독은 ui를 갱신하기위해 mainthread에서만 갱신이 가능하기에 mainthead에서 구독한다
repository.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(user -> {
//ui 업데이트
});
Flowable.interval(10, TimeUnit.MIlliSECONDS)
.onBackPressureBuffer()
.observeOn(Schedulers.io())
.map(item - > {
sleep(2000);
System.out.println("아이템발행 :"+item);
return item;
})
.subscribe(item ->{
System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);
result
아이템발행 : 0
아이템소비:0
아이템발행 : 1
아이템소비:1
...
Flowable.interval(10, TimeUnit.MIlliSECONDS)
.onBackPressureLatest()
.observeOn(Schedulers.io())
.map(item - > {
sleep(100);
System.out.println("아이템발행 :"+item);
return item;
})
.subscribe(item ->{
System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);
result
아이템발행 : 124
아이템소비:125
아이템발행 : 986
아이템소비:987
...
Flowable.range(1,300)
.onBackPressureDrop()
.observeOn(Schedulers.io())
.subscribe(item ->{
System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);
result
아이템소비:1
아이템소비:2
아이템소비:3
아이템소비:987
아이템소비:987
아이템소비:987
...
Flowable.range(1,300)
.onBackPressureDrop(item ->{
System.out.println("아이템 버림: "+item);
})
.observeOn(Schedulers.io())
.subscribe(item ->{
System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);
result
아이템소비:1
아이템소비:2
아이템소비:3
아이템버림:4
아이템버림:5
아이템버림:6
....
아이템소비:987
아이템소비:987
아이템소비:987
...
Flowable.create((FlowableOnSubscribe<Integer>) emitter ->{
for(int i=0 ;i<=1000;i++){
if(emitter.isCancelled()){
return;
}
emitter.onNext(i);
}
emitter.onComplete();
},BackPressureStrategy.Buffer) // 배압제어 전략
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(System.out::println,throwable -> throwable.printStackTrace());
sleep(100);
result
0
1
2
...
998
999
1000