3. RxJava 반응형 프로그래밍 -3

Jamezz Dev·2020년 8월 8일
0

android-Architecture

목록 보기
8/8

오류를 다루는 연산자

기본 오류 처리 방법

Obserable.just("1","2","a","3")
    .map(i -> Integer.parseInt(i))
    .subscribe(System.out::println,throwable -> System.out.println("Error")
);

result 
1
2
Error 

1. OnErrorReturn 연산자

  • 오류가 발생하면 아이템 발행을 종료하고 onError()를 호출하는 대신에 오류 처리를 위한 함수를 실행한다.
Obserable.just("1","2","a","3")
    .map(i -> Integer.parseInt(i))
    .onErrorReturn(throwable -> -1)
    .subscribe(System.out::println);

result 
1
2
-1

2. onErrorResumeNext 연산자

  • 오류 발생 시 기존 스트림을 종료시키고 , 다른 Obserable 소스로 스트림을 대체한다.
Obserable.just("1","2","a","3")
    .map(i -> Integer.parseInt(i))
	.onErrorResumeNext(throwable -> 
    	Obserable.just(100,200,300))
    .subscribe(System.out::println);

result 
1
2
100
200
300

3. retry 연산자

  • Obserable 이 에러를 발행할때 Obserable을 재구독하도록 한다.
  • 재구독하더라도 변함없이 에러를 발생하게 되면 무한히 재시도하게 되는데 이를 방지하기 위해 시도 횟수를 명시하는것이 좋다
Obserable.just("1","2","a","3")
    .map(i -> Integer.parseInt(i))
    .retry(2)
    .subscribe(System.out::println);

result 
1
2
1
2
1
2
java.lang.NumberExceiption error 

디버깅을 돕는 do On 연산자

1. doOnEach 연산자

  • Obserable이 아이템을 발행하기 전에 이를 콜백으로 확인할 수 있도록 한다.
Obserable.just(1,2,3)
        .doOnEach(notification -> {
        Integer i = notification.getValue();
        boolean isOnNext = notification.getValue();
        boolean isOnComplete = notification.isOnComplete();
        boolean isOnError = notification.isOnError();
        Throwable throwable = notification.getError();
        System.out.println("i = "+i);
        System.out.println("isOnComplete = "+isOnComplete);
        System.out.println("isOnError = "+isOnError);
        if(throwable != null){
        	throwable.printStackTrace();
        }
        }).subscribe(value ->{
        	System.out.println("subscribe = "+value);
        });
        
result

i=1
isOnNext =true
isOnComplete = false
isOnError = false
Subscribed = 1
... 

2. doOnNext 연산자

  • doOnEach과 형태는 비슷하지만 notifiacation 대신 간단히 아이템을 확인할 수 있는 cosumer형태로 넘긴다
obserable.just(1,2,3)
.doOnNext(item ->{
    if(item>1) {
        	throw new IllegalArgumentException();
        }
    })
    .subscribe(System.out::println,throwable ->throwable.printStackTrace());

result
1
java.lang.illegalArgumentException

3. doOnSubscribe 연산자

  • doOnSubscribe 연산자는 구독시마다 콜백을 받을수 있도록 한다
  • 매개변수로 Disposable 을 받을 수 있다
Obserable.just(1,2,3)
.doOnSubscribe(disposable -> System.out.println("구독시작!"))
.subscribe(Sytem.out::println);

result
구독시작!
1
2
3

4. doOnComplete 연산자

  • onComplete 연산자는 Emitter 의 onComplete()호출로 Obserable이 정상적으로 종료될때 호출되는 콜백
Obserable.just(1,2,3)
.doOnComplete(() -> System.out.println("완료"))
.subscribe(Sytem.out::println);

result
1
2
3
완료! 

5. doOnError 연산자

  • Obserable 내부에서 onError 호출로 Obserable 이 정상적으로 종료되지 않을때 호출되는 콜백이다
  • 내부적으로 throwable이 들어온다
Obserable.just(2,1,0)
    .map(i -> 10/i)
    .doOnError(throwable -> System.out.println("오류!"))
    .subscribe(System.out::println, t->t.printStackTrace());

result
5
10
오류!
java.lang.ar~~~ : by zero

6. doOnTerminate() 연산자

  • onComplete 연산자와 동일하지만 차이점은 오류가 발생했을때도 콜백이 호출된다는 점이다
Obserable.just(2,1,0)
    .map(i -> 10/i)
    .doOnComplete(() -> System.out.println("doOnComplete"))
    .doOnTerminate(() -> System.out.println("doOnTerminate"))
    .subscribe(System.out::println, t->t.printStackTrace());

result
5
10
doOnTerminate
java.lang.ar~~~ : by zero

7. doOnDispose 연산자

  • doOnDispose 는 구독 중인 스트림이 dispose() 메서드 호출로 인해 폐기되는 경우에 콜백이 호출된다.
Obserable src = Obserable.interval(500,TimeUnit.MILLISECONDS)
		.doOnDispose(() -> System.out.println("doOnDispose"));
Disposable disposable = src.subscribe(System.out::println);
Thread.sleep(1100);
disposable.dispose();

result 
0
1
doOnDispose

8.doFinally 연산자

  • Obserable 이 onError() , onComplete() 또는 스트림이 폐기될때 doFinally 콜백이 호출된다.
  • 구독한된 어떠한 상황에서도 후속조취가 필요한 경우 이 연산자를 이용할 수 있다.
Obserable src = Obserable.interval(500,TimeUnit.MILLISECONDS)
.doOnComplete(()->System.out.println("doOnComplete"))
.doOnTerminate(()->System.out.println("doOnTerminate"))
.doFinally(()->System.out.println("doFinally"))
Disposable dispose = src.subscribe(System.out::println);
Thread.sleep(1100);
disposeable.dispose();

result 
0
1
doFinally 

4. 스케줄러

  • RxJava 에서는 스케줄러라는 도구를 이용 멀티스레드와 같은 비동기 작업을 돕는다.
Scheduler io = Schedulers.io();
Scheduler computation = Schedulers.computation();
Scheduler trampoline = Schedulers.trampoline();
Shceduler newThread = Schedulers.newThread();
// android 
Scheduler mainThread = AndroidSchedulers.mainThread();

스케줄러의 종류

1. IO 스케줄러

  • 네트워크, 데이터베이스와 같은 블로킹 이슈가 발생하는 곳에서 비동기적인 작업을 위해 사용될수 있다

2. newThread 스케줄러

  • 매번 새로운 스케줄러를 생성한다

3. Computation 스케줄러

  • Computation 단순적인 반복 작업, 콜백처리, 계산작업에 사용된다. 블로킹 이슈가 발생하는 곳에서 추천되지 않는다.

4. Trampoline 스케줄러

  • 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 큐를 생성하는 스케줄러이다. 모든 작업을 순차적으로 실행하는 것을 보장한다 ( FIFO)

5. mainThread 스케줄러

  • RXAndroid 에서는 안드로이드 메인 스레드에서 작동하는 스케줄러를 제공한다

subscribeOn() , observeOn() 연산자

  • 스케줄러를 이용하는 방법으로는 subscribeOn과 observeOn 연산자를 제공한다 이를 이용해 멀티스레드를 이용할 수 있다
  • 지금까지 구현한 예제들은 모두 메인스레드( UI 스레드) 에서 동작했다. 왜냐면 기본적으로 rxjava 는 observer가 선언되고 구독되는 스레드에서 동작하기 때문이다
Obserable<Integer> src = Obserable.create(emitter ->{
      for(int i=0;i<3;i++){
          String threadName = Thread.currentThread().getName();
          System.out.println("#subs on"+threadName+":"+i);
          emitter.onNext(i);
          Thread.sleep(100);
      }
      emitter.onComplete();
});

src.subscribe(s ->{
      String threadName = Thread.currentThread().getName();
      System.out.println("obs on"+threadName+":"+s);
});

result 
Subs on main:0
obs on main :0
Subs on main:1
obs on main :1
Subs on main:2
obs on main :2
  • 앞의 예제 코드에서 subscribeOn 연산자를 이용해 스레드를 지정해 본다
src.subscribeOn(Shcedulers.io())
.subscribe(s ->{
String threadName = Thread.currentThread().getName();
System.out.println("#Obs on"+threadName+":"+s);

result 
Subs on RxCachedThreadScheduler-1:0
obs on RxCachedThreadScheduler-1 :0
Subs on RxCachedThreadScheduler-1:1
obs on RxCachedThreadScheduler-1 :1
Subs on RxCachedThreadScheduler-1:2
obs on RxCachedThreadScheduler-1 :2

});
  • subscribeOn 연산자는 Obserable 소스에 어떤 스케줄러를 사용해서 아이템을 발행할 것인지 알려준다.
  • subscribeOn 연산자만 있고 구독을 지정하는 ObserveOn 이 없다면 해당스케줄러는 아이템 발행부터 구독까지 Obserable 전체에 작용한다
src.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe(s ->{
    String threadName = Thread.currentThread().getName();
    System.out.println("Obs on"+threadName+":"+s);
});
Thread.sleep(500);

result 
Subs on RxCachedThreadScheduler-1:0
obs on RxComputationThreadPool-1 :0
Subs on RxCachedThreadScheduler-1:1
obs on RxComputationThreadPool-1 :1
Subs on RxCachedThreadScheduler-1:2
obs on RxComputationThreadPool-1 :2
  • ObserveOn 연산자를 이용해 스케줄러를 지정하면 Overable이 발행하는 아이템을 가로채 해당 스케줄러로 아이템을 구독했다 하지만 interval, timer,replay,buffer 등의 연산자는 Computation 스케줄러로 고정되어 있기에 스케줄러를 아무리 지정해도 무시된다

  • 안드로이드 는 데이터베이스로부터 데이터를 받을때 메인스레드가 블로킹 되는것을 막기위해 io 스케줄러를 사용한다 요청한 결과로 구독은 ui를 갱신하기위해 mainthread에서만 갱신이 가능하기에 mainthead에서 구독한다

repository.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(user -> {
 //ui 업데이트 
});

5. 배압과 Flowable

배압 BackPressure

  • RxJava 에서 Obserable 은 생산자, 소비자로 나눌수 있다. 생산자는 Obserable 이 발행한 아이템을 생성하고 소비자는 생성자가 생성한 아이템을 구독한다
  • 배압이란 ? 구독자의 소비량에 관계없이 아이템을 계속발행하는 현상을 말한다
  • BackPressure 현상이 나타나면 OutOfMemoryError 예외등 많은 문제를 발행하게 된다

Flowable 다루기

  • Flowable은 소비자가 구독하는 양에 따라 생산자의 생산을 제어할수 있도록 한다
  • 아이템 발행량이 일정량 누적되면 아이템을 발행하지 않는다 이것이 바로 Obserable 과 Flowable의 차이점
  • 단 시간을 기반으로 하는 interval 연산자는 Flowable과 같이 사용한다면 에러가 발생하게 된다

배압제어 연산자

  • Flowable 과 interval 연산자를 같이 사용할 때 발생할 수 있는 MissingBackPressureExceiption 극복할 수 있다 .

1. onBackPressureBuffer 연산자

  • 배압 구현이 되지 않은 Flowable에 대해 BackPressureStrategy.Buffer를 적용한다 .
  • 매개 변수별로 용량 , 지연 , 오버플로 콜백 등에 대한 것을 정의할 수 있다.
Flowable.interval(10, TimeUnit.MIlliSECONDS)
    .onBackPressureBuffer()
    .observeOn(Schedulers.io())
    .map(item - > {
          sleep(2000);
          System.out.println("아이템발행 :"+item);
          return item;
    })
    .subscribe(item ->{
	    System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);

result 
아이템발행 : 0
아이템소비:0
아이템발행 : 1
아이템소비:1
...

2. onBackPressureLatest 연산자

  • onBackPressureLatest 는 스트림 버퍼가 가득차면 최신의 아이템을 버퍼에 유지하려고 오래된 아이템을 버리는 연산자이다
  • 최신의 데이터만 의미가 있을때 사용하면 좋다
Flowable.interval(10, TimeUnit.MIlliSECONDS)
    .onBackPressureLatest()
    .observeOn(Schedulers.io())
    .map(item - > {
          sleep(100);
          System.out.println("아이템발행 :"+item);
          return item;
    })
    .subscribe(item ->{
	    System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);

result 
아이템발행 : 124
아이템소비:125
아이템발행 : 986
아이템소비:987
...

3. onBackPressureDrop 연산자

  • onBackPressureDrop 는 버퍼가 가득찬 상태에서 버퍼에 든 아이템을 소비하는 쪽이 바쁘다면 발행된 아이템을 버린다
Flowable.range(1,300)
    .onBackPressureDrop()
    .observeOn(Schedulers.io())
    .subscribe(item ->{
	    System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);

result 
아이템소비:1
아이템소비:2
아이템소비:3

아이템소비:987
아이템소비:987
아이템소비:987

...
  • onBackPressureDrop은 버려지는 아이템에 대한 콜백을 제공한다
Flowable.range(1,300)
    .onBackPressureDrop(item ->{
    	System.out.println("아이템 버림: "+item);
    })
    .observeOn(Schedulers.io())
    .subscribe(item ->{
	    System.out.println("아이템 소비 : "+item);
},throwable -> throwable.printStackTrace());
Thread.sleep(30*1000);

result 
아이템소비:1
아이템소비:2
아이템소비:3
아이템버림:4
아이템버림:5
아이템버림:6
....
아이템소비:987
아이템소비:987
아이템소비:987

...

Flowable 생성과 배압 전략

  • Flowable.create() 은 Oberable.create() 과 비슷하지만 배압전략 (EmitterBackPressureStrategy배압 전략을 설정해야 한다
Flowable.create((FlowableOnSubscribe<Integer>) emitter ->{
    for(int i=0 ;i<=1000;i++){
        if(emitter.isCancelled()){
	        return;
    }
    	emitter.onNext(i);
    }
    	emitter.onComplete();
    },BackPressureStrategy.Buffer) // 배압제어 전략 
      .subscribeOn(Schedulers.computation())
      .observeOn(Schedulers.io())
      .subscribe(System.out::println,throwable -> throwable.printStackTrace());
sleep(100);

result 
0
1
2
...
998
999
1000

1. BackPressureStrategy.MISSING

  • 기본적으로 배압전략을 구현하지 않는다. 오버플로우를 다루려면 배압 제어 연산자를 이용해야 한다

2. BackPressureStrategy.ERROR

  • 소비자가 생산자를 따라가지 못한 경우 MissionBackPressureException 예외 발생한다

3. BackPressureStrategy.BUFFER

  • 구독자가 아이템을 소비할때가지 발행한 아이템들을 버퍼에 넣어둔다. 이 버퍼는 제한 없는 큐이지만 가용 메모리를 벗어난 경우 OutOfMemoryError를 발행한다

4. BackPressureStrategy.DROP

  • 구독자가 소비하느라 바뻐서 생산자를 못따라가는 경우 발행된 아이템을 무시하고 버린다

5. BackPressureStrategy.LATEST

  • 구독자가 아이템 받을 준비가 될때까지 가장 최신의 아이템을 유지하고 이전아이템을 버린다
profile
💻디지털 노마드를 🚀꿈꾸는 🇯🇲자메즈 🐥개발자 입니다.

0개의 댓글