3. RxJava 반응형 프로그래밍-2

Jamezz Dev·2020년 7월 31일
0

android-Architecture

목록 보기
7/8

3 RxJava 연산자

  • Obserable 연산자는 Obserable 을 반환하기에 이를 통해 연쇄작업이 가능하다.
  • Obserable의 체이닝 순서는 중요함 -> 연산자의 결과가 다음 연산자에 영향을 미치기 때문

Obserable을 생성하는 연산자

1.defer 연산자

  • 옵저버가 구독할때까지 Obserablee의 생성을 지연시킨다.
  • Subscrib() 메서드를 호출할 때 Obserable 아이템을 생성한다.

Obserable<Long> justSrc = Obserable.just(
	System.currentTimeMillis()
);
Obserable<Long> deferSrc = Obserable.defer(() -> 
	Obserable.just(System.currentTimeMillies())
);
  System.out.println("#1 now = "+System.currentTimeMillies());
  try{
      Thread.sleep(5000);
  }catch(InterruptedException e){
      e.printStackTrace();
  }
  System.out.println("#2 now = "+System.currentTimeMillies());
  justSrc.subscribe(time -> System.out.println("#1 time = "+time);
  deferSrc.subscribe(time -> System.out.println("#1 time = "+time);
}
//연산 결과 
#1 now = 1581936258088
#2 now = 1581936263092 
#1 time = 1581936258009
#2 time = 1581936263099

2.empty 와 never 연산자

  • onComplete() 호출 여부에 따라 동작이 다르게 나타난다

  • empty 연산자는 아이템을 발행하지 않지만 정상적으로 스트림을 종료시킨다

  • never 연산자는 empty와 마찬가지로 아이템을 발행하지 않지만 스트림을 종료시키지도 않는다

 Obserable.empty().doOnTerminate(() -> System.out.println("empty 종료"))
 .subscribe(System.out::println);
 Oberable.never().doOnTerminate(() - >System.out.println("never 종료"))
 .subscribe(System.out::println);
 
 //연산 결좌 
 empty 종료 

3.interval 연산자

  • 주어진 시간 간격 순서대로 정수를 발행하는 Obserable 생성
  • 무한히 배출한다

Disposable d = Obserable.interval(1, TimeUnit.SECONDS).subscribe(System.out::println);
Thread.sleep(5000);
d.dispose();

//연산결과 
0
1
2
3
4

4.Range 연산자

  • 특정 범위의 정수를 순서대로 발행하는 Obserable을 생성한다.
  • interval 과는 달리 발행이 끝나면 스트림을 종료한다.

Obserable.range(1,3).subscribe(System.out::println);

연산결과 
1
2
3

5.timer 연산자

  • timer 연산자는 특정 시간 동안 지연시킨뒤 아이템을 발행한다 그리고 종료시킨다

Oberable src = Obserable.timer(1,TimeUnit.SECONDS);
System.out.println("구독");
// 구독하면 1초뒤에 발행된다 
src.subscribe(event -> System.out.println("실행!"));
Thread.sleep(100l);

//결과 
구독
실행!

Obserable을 변형하는 연산자

1.map 연산자

  • 발행되는 아이템을 다른 아이템으로 변경이 가능하다.

Obserable<Integer> intSrc = Obserable.just(1,2,3);
Obserable<Integer> strSrc = intSrc.map(value -> value*10);
strSrc.subscribe(System.out::println);

//실행결과
10
20
30

2.flatMap 연산자

  • Obserable 을 또다른 Obserable 로 변환시킨다. 그런다음 변환시킨 Obserable의 방출되는 아이템 또한 병합하여 다시 아이템을 방출시킨다.
  • 1 : N 형태로 시퀀스가 발행된다.
Obserable<String> src = Obserable.just("a","b","c");
src.flatMap(str -> Obseralbe.just(str+1,str+2)).subscribe(System.out::println);

// 실행결과 
a1
a2
b1
b2
c1
c2

Obserable.range(2,9)
	.flatMap(x -> Obserable.range(1,9)
    	.map(y -> String.format("%d *%d = %d",x,y,x*y)))
    		.subscribe(System.out::println);

//실행결과 
2*1 = 2
2*2 = 4
...
9*9 = 81

3.buffer 연산자

  • buffer 연산자는 Obserable 아이템을 묶어서 List로 발행한다.
  • 에러 발생시 버퍼 발행하지 않고 즉시 전달

Obserable.range(0,10)
    .buffer(3)
    .subscribe(integers -> {
        System.out.println("버퍼 데이터 실행");
        for(Integer integer: integers){
            System.out.println("#"+integer);
        }
});

//실행결과 
버퍼데이터실행
0
1
2
버퍼데이터실행
3
4
5
버퍼데이터실행
6
7
8
버퍼데이터실행
9

4.scan 연산자

  • 순차적으로 발행되는 연산을 첫번째 인자로 전달한다

Obserable.range(1,5)
.scan((x,y) ->{
  System.out.print(String.format("%d+%d=",x,y));
  return x+y;
}).subscribe(System.out::println);

//실행결과
1
1+2 = 3
3+3 = 6
6+ 4 = 10
10 +5 = 15

Obserable.range("a","b","c","d","e")
.scan((x,y) -> x+y)
.subscribe(System.out:: println);


//실행결과
a
ab
abc
abcd
abcde

5.groupBy 연산자

  • 아이템들을 특정 그룹화된 GroupObserable로 재정의가 가능하다 .
Obserable.just("Ma Cir","Cy Cir","Yel Tri","Yel Cir","Mag Tri","Cy Tri")
    .groupBy(item -> {
    if(item.contains("Cir")){
    	return "C";
    }else if(item.contains("Tri")){
    	return "T";
    }else{
    	return "None";
    }
}).subscribe(group ->{
    System.out.println(group.getKey() + "그룹 발행")
    group.subscribe(shape -> {
	    System.out.println(group.getKey()+" : "+shape); 
    });
});

//실행 결과
C 그룹 발행 
C: Mag Cir
C: Cy Cir
C: Yel Cir
T 그룹 발행 
T : Yel Tri
T: Mag Tri
T : Cy Tri

Obserable 을 필터링 하는 연산자

  • Obserable로 부터 발행되는 아이템들을 선택적으로 발행하도록 하는 연산자.

1.debounce 연산자

  • 특정 시간동안 다른 아이템이 발행되지 않을때만 아이템을 발행하도록 하는 연산자 .

Obserable.create(emitter ->{
    emitter.onNext("1");
    Thread.sleep(100);
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onNext("4");
    emitter.onNext("5");
    Thread.sleep(100);
    emitter.onNext("6");
})
  .debounce(10,TimeUnit.MILLISECONDS)
  .subscribe(System.out::println);
Thread.sleep(300);

//실행결과 
1
5
6

2.distince 연산자

  • 발행한 아이템을 중복해 발행하지 않도록 필터링 한다.
Obserable.just(1,2,2,1,3)
.distince()
.subscribe(System.out::println)

//실행결과 
1
2
3

3.elementAt 연산자

  • 아이템 시퀀스에서 특정 인덱스에 해당하는 아이템을 필터링 한다
Obserable.just(1,2,3)
.elementAt(2)
.subscribe(System.out::println);

//실행결과 
3

4.filter 연산자

-조건식이 true 인 경우에만 아이템을 발행하도록 함

Obserable.just(1,2,30,40,60,10,0)
.filter(x -> x>10)
.subscribe(System.out::println);

//result
30
40
60

5.sample 연산자

  • 일정 시간 간격으로 최근에 Obserable이 배출한 아이템들을 방출하는 연산자다
Obserable.interval(100,TimeUnit.MILLISECONDS)
.sample(300,TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
Thread.sleep(1000);

//result
1
4
7

6.skip 연산자

  • n개의 아이템을 무시하고 이후에 나오는 아이템을 발행하는 연산자
 Obserable.just(1,2,3,4)
 .skip(2)
 .subscribe(System.out::println);
 
 //result
 3
 4

7.take 연산자

  • skip 과 반대로 처음 발행하는 n 개의 아이템만 방출하도록 하는 연산자이다.
Obserable.just(1,2,3,4)
.take(2)
.subscribe(System.out::println);

//result
1
2

8. all 연산자

  • 모든 발행하는 아이템이 특정 조건을 만족시 true를 반환한다. 하나라도 조건에 부합하지 않다면 false 발행
Obserable.just(2,1)
.all(i -> i>1)
.subscribe(System.out::println);

//result
false

Obserable.just(2,1,0)
.all(i -> i>-1)
.subscribe(System.out::println);

//result
true

9. amb 연산자

  • 여러개의 Obserable들을 동시에 구독하고 그중 가장 먼저 아이템을 발행하는 Obserable을 선택하고 싶으면 amb 사용한다
ArrayList<Obserable<Integer>> list = new ArrayList<>();
list.add(Obserable.just(20,40,60).delay(100,TimeUnit.MILLISECONDS));
list.add(Obserable.just(1,2,3));
list.add(Obserable.just(0,0,0).delay(200,TimeUnit.MILLISECONDS));
Obserable.amb(list).subscribe(System.out::println);

//result
1
2
3

Obserable 을 결합하는 연산자

  • 여러 Obserable 소스를 결합하여 하나의 Obserable을 생성하는 연산자들에 대해 알아보자

1. CombineLastest 연산자

Obserable<Integer> src1 = Obserable.create(emitter -> {
    new Thread(() -> {
      for(int i =1 ;i<=5 ;i++){
      	emitter.onNext(i);
        try{
          Thread.sleep(1000);
        }catch(InterruptedException e){
          e.printStackTrace();
        }
      }
    }).start();
});

Obserable<String> src2 = Obserable.create(emitter -> {
    new Thread(() -> {
        try{
        	Thread.sleep(500);
            emitter.onNext("A");
            Thread.sleep(700);
            emitter.onNext("B");
            Thread.sleep(100);
            emitter.onNext("C");
            Thread.sleep(700);
            emitter.onNext("D");
        }catch(InterruptedExceiption e){
        	e.printStackTrace();
        }
    }).start();
});

Obserable.combineLatest(src1,src2,(num,str) -> num+str)
	.subscribe(System.out::println);
Thread.sleep(5000);

//result 
1A
2A
2B
2C
3C
3D
4D
5D

2. zip 연산자

  • 여러 Obserable 을 하나로 결합하여 지정된 함수를 통해 하나의 아이템으로 발행한다.
  • zip 은 여러 Oberable 의 발행순서를 엄격히 지켜 결합한다.
Obserable<Integer> src1 = Obserable.create(emitter -> {
new Thread(() -> {
    for(int i=1;i<=5;i++){
    	emitter.onNext(i);
        try{
			Thread.sleep(1000);
        }catch(InterruptedException e){
            e.printStackTrace();
        }
      }
   }).start();
});

Obserable<String> src2 = Obserable.create(emitter -> {
    new Thread(() ->{
        try{
        	Thread.sleep(500);
            emitter.onNext("A");
            Thread.sleep(700);
            emitter.onNext("B");
            Thread.sleep(100);
            emitter.onNext("C");
            Thread.sleep(700);
            emitter.onNext("D");            
        }catch(InterruptedException e){
        	e.printStackTrace();
        }
    }).start();
});
Obserable.zip(src1,src2,(num,str) -> num + str)
.subscribe(System.out::println);
Thread.sleep(5000);

//result 
1A
2B
3C
4D

3. Merge 연산자

  • Obserable 여러개를 하나처럼 결합해 사용이 가능하다
  • 예제에 사용된 intervalRange 는 interval 연산자와 range 연산자를 합친개념이다 .
Obserable src1 = Obserable.intervalRange(
      1,// 시작값,
      5,// 발행 횟수
      0,//초기 지연
      100,// 빌행간격
      TimeUnit.MILLISECONDS
).map(value -> value *20);
Obserable src2 = Oberalbe.create(emitter - > new Thread( () -> {
    try{
        Thread.sleep(350);
        emitter.onNext(1);
        Thread.sleep(200);
        emitter.onNext(1);
    }catch(InterruptedException e){
	    e.printStackTrace();
    }
    }).start();
});

Obserable.merge(src1,src2)
.subscribe(System.out::println);
Thread.sleep(1000);


//result 
20
40
60
80
1
100
1
profile
💻디지털 노마드를 🚀꿈꾸는 🇯🇲자메즈 🐥개발자 입니다.

0개의 댓글