Obserable<Long> justSrc = Obserable.just(
System.currentTimeMillis()
);
Obserable<Long> deferSrc = Obserable.defer(() ->
Obserable.just(System.currentTimeMillies())
);
System.out.println("#1 now = "+System.currentTimeMillies());
try{
Thread.sleep(5000);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("#2 now = "+System.currentTimeMillies());
justSrc.subscribe(time -> System.out.println("#1 time = "+time);
deferSrc.subscribe(time -> System.out.println("#1 time = "+time);
}
//연산 결과
#1 now = 1581936258088
#2 now = 1581936263092
#1 time = 1581936258009
#2 time = 1581936263099
onComplete() 호출 여부에 따라 동작이 다르게 나타난다
empty 연산자는 아이템을 발행하지 않지만 정상적으로 스트림을 종료시킨다
never 연산자는 empty와 마찬가지로 아이템을 발행하지 않지만 스트림을 종료시키지도 않는다
Obserable.empty().doOnTerminate(() -> System.out.println("empty 종료"))
.subscribe(System.out::println);
Oberable.never().doOnTerminate(() - >System.out.println("never 종료"))
.subscribe(System.out::println);
//연산 결좌
empty 종료
Disposable d = Obserable.interval(1, TimeUnit.SECONDS).subscribe(System.out::println);
Thread.sleep(5000);
d.dispose();
//연산결과
0
1
2
3
4
Obserable.range(1,3).subscribe(System.out::println);
연산결과
1
2
3
Oberable src = Obserable.timer(1,TimeUnit.SECONDS);
System.out.println("구독");
// 구독하면 1초뒤에 발행된다
src.subscribe(event -> System.out.println("실행!"));
Thread.sleep(100l);
//결과
구독
실행!
Obserable<Integer> intSrc = Obserable.just(1,2,3);
Obserable<Integer> strSrc = intSrc.map(value -> value*10);
strSrc.subscribe(System.out::println);
//실행결과
10
20
30
Obserable<String> src = Obserable.just("a","b","c");
src.flatMap(str -> Obseralbe.just(str+1,str+2)).subscribe(System.out::println);
// 실행결과
a1
a2
b1
b2
c1
c2
Obserable.range(2,9)
.flatMap(x -> Obserable.range(1,9)
.map(y -> String.format("%d *%d = %d",x,y,x*y)))
.subscribe(System.out::println);
//실행결과
2*1 = 2
2*2 = 4
...
9*9 = 81
Obserable.range(0,10)
.buffer(3)
.subscribe(integers -> {
System.out.println("버퍼 데이터 실행");
for(Integer integer: integers){
System.out.println("#"+integer);
}
});
//실행결과
버퍼데이터실행
0
1
2
버퍼데이터실행
3
4
5
버퍼데이터실행
6
7
8
버퍼데이터실행
9
Obserable.range(1,5)
.scan((x,y) ->{
System.out.print(String.format("%d+%d=",x,y));
return x+y;
}).subscribe(System.out::println);
//실행결과
1
1+2 = 3
3+3 = 6
6+ 4 = 10
10 +5 = 15
Obserable.range("a","b","c","d","e")
.scan((x,y) -> x+y)
.subscribe(System.out:: println);
//실행결과
a
ab
abc
abcd
abcde
Obserable.just("Ma Cir","Cy Cir","Yel Tri","Yel Cir","Mag Tri","Cy Tri")
.groupBy(item -> {
if(item.contains("Cir")){
return "C";
}else if(item.contains("Tri")){
return "T";
}else{
return "None";
}
}).subscribe(group ->{
System.out.println(group.getKey() + "그룹 발행")
group.subscribe(shape -> {
System.out.println(group.getKey()+" : "+shape);
});
});
//실행 결과
C 그룹 발행
C: Mag Cir
C: Cy Cir
C: Yel Cir
T 그룹 발행
T : Yel Tri
T: Mag Tri
T : Cy Tri
Obserable.create(emitter ->{
emitter.onNext("1");
Thread.sleep(100);
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onNext("5");
Thread.sleep(100);
emitter.onNext("6");
})
.debounce(10,TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
Thread.sleep(300);
//실행결과
1
5
6
Obserable.just(1,2,2,1,3)
.distince()
.subscribe(System.out::println)
//실행결과
1
2
3
Obserable.just(1,2,3)
.elementAt(2)
.subscribe(System.out::println);
//실행결과
3
-조건식이 true 인 경우에만 아이템을 발행하도록 함
Obserable.just(1,2,30,40,60,10,0)
.filter(x -> x>10)
.subscribe(System.out::println);
//result
30
40
60
Obserable.interval(100,TimeUnit.MILLISECONDS)
.sample(300,TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
Thread.sleep(1000);
//result
1
4
7
Obserable.just(1,2,3,4)
.skip(2)
.subscribe(System.out::println);
//result
3
4
Obserable.just(1,2,3,4)
.take(2)
.subscribe(System.out::println);
//result
1
2
Obserable.just(2,1)
.all(i -> i>1)
.subscribe(System.out::println);
//result
false
Obserable.just(2,1,0)
.all(i -> i>-1)
.subscribe(System.out::println);
//result
true
ArrayList<Obserable<Integer>> list = new ArrayList<>();
list.add(Obserable.just(20,40,60).delay(100,TimeUnit.MILLISECONDS));
list.add(Obserable.just(1,2,3));
list.add(Obserable.just(0,0,0).delay(200,TimeUnit.MILLISECONDS));
Obserable.amb(list).subscribe(System.out::println);
//result
1
2
3
Obserable<Integer> src1 = Obserable.create(emitter -> {
new Thread(() -> {
for(int i =1 ;i<=5 ;i++){
emitter.onNext(i);
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}).start();
});
Obserable<String> src2 = Obserable.create(emitter -> {
new Thread(() -> {
try{
Thread.sleep(500);
emitter.onNext("A");
Thread.sleep(700);
emitter.onNext("B");
Thread.sleep(100);
emitter.onNext("C");
Thread.sleep(700);
emitter.onNext("D");
}catch(InterruptedExceiption e){
e.printStackTrace();
}
}).start();
});
Obserable.combineLatest(src1,src2,(num,str) -> num+str)
.subscribe(System.out::println);
Thread.sleep(5000);
//result
1A
2A
2B
2C
3C
3D
4D
5D
Obserable<Integer> src1 = Obserable.create(emitter -> {
new Thread(() -> {
for(int i=1;i<=5;i++){
emitter.onNext(i);
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}).start();
});
Obserable<String> src2 = Obserable.create(emitter -> {
new Thread(() ->{
try{
Thread.sleep(500);
emitter.onNext("A");
Thread.sleep(700);
emitter.onNext("B");
Thread.sleep(100);
emitter.onNext("C");
Thread.sleep(700);
emitter.onNext("D");
}catch(InterruptedException e){
e.printStackTrace();
}
}).start();
});
Obserable.zip(src1,src2,(num,str) -> num + str)
.subscribe(System.out::println);
Thread.sleep(5000);
//result
1A
2B
3C
4D
Obserable src1 = Obserable.intervalRange(
1,// 시작값,
5,// 발행 횟수
0,//초기 지연
100,// 빌행간격
TimeUnit.MILLISECONDS
).map(value -> value *20);
Obserable src2 = Oberalbe.create(emitter - > new Thread( () -> {
try{
Thread.sleep(350);
emitter.onNext(1);
Thread.sleep(200);
emitter.onNext(1);
}catch(InterruptedException e){
e.printStackTrace();
}
}).start();
});
Obserable.merge(src1,src2)
.subscribe(System.out::println);
Thread.sleep(1000);
//result
20
40
60
80
1
100
1