RxJava에서 Operator(연산자)는 메소드

Observable.interval(0L, 1000L, TimeUnit.MILLISECONDS)
.map(num -> num + " count")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000);
onNext() | RxComputationThreadPool-1 | 00:25:41.663 | 0 count
onNext() | RxComputationThreadPool-1 | 00:25:42.637 | 1 count
onNext() | RxComputationThreadPool-1 | 00:25:43.638 | 2 count
onNext() | RxComputationThreadPool-1 | 00:25:44.634 | 3 count

Observable<Integer> source = Observable.range(0, 5);
source.map(number -> number + " count")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 00:30:31.688 | 0 count
onNext() | main | 00:30:31.701 | 1 count
onNext() | main | 00:30:31.701 | 2 count
onNext() | main | 00:30:31.702 | 3 count
onNext() | main | 00:30:31.702 | 4 count

Logger.log(LogType.PRINT, "# start");
Observable<String> timer = Observable.timer(2000, TimeUnit.MILLISECONDS)
.map(count -> "do work");
timer.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(5000);
print() | main | 00:49:51.384 | # start
onNext() | RxComputationThreadPool-1 | 00:49:53.455 | do work

Observable<LocalDateTime> defer = Observable.defer(() -> Observable.just(LocalDateTime.now()));
Observable<LocalDateTime> just = Observable.just(LocalDateTime.now());
defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer1 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just1 시간 : " + data));
Thread.sleep(5000);
defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer2 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just2 시간 : " + data));
onNext() | main | 19:25:15.472 | defer1 시간 : 2023-06-12T19:25:15.455439
onNext() | main | 19:25:15.487 | just1 시간 : 2023-06-12T19:25:15.452306
onNext() | main | 19:25:20.496 | defer2 시간 : 2023-06-12T19:25:20.494970
onNext() | main | 19:25:20.497 | just2 시간 : 2023-06-12T19:25:15.452306

List<String> name = List.of("lee", "kim", "park", "choi");
Observable<String> iterable = Observable.fromIterable(name);
iterable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 19:37:20.725 | lee
onNext() | main | 19:37:20.753 | kim
onNext() | main | 19:37:20.753 | park
onNext() | main | 19:37:20.753 | choi

public static void main(String[] args) {
Logger.log(LogType.PRINT, "# start");
// 처리 시간이 긴 작업
CompletableFuture<String> completableFuture = longWork();
// 처리 시간이 짧은 작업
shortWork();
Observable.fromFuture(completableFuture)
.subscribe(data -> Logger.log(LogType.PRINT, data));
}
private static CompletableFuture<String> longWork() {
return CompletableFuture.supplyAsync(() -> {
Logger.log(LogType.PRINT, "# long work start");
TimeUtil.sleep(10000);
return "# long work finish";
});
}
private static Boolean shortWork() {
Logger.log(LogType.PRINT, "# short work start");
TimeUtil.sleep(3000);
Logger.log(LogType.PRINT, "# short work finish");
return true;
}
onNext() | RxComputationThreadPool-1 | 00:25:41.663 | 0 count
onNext() | RxComputationThreadPool-1 | 00:25:42.637 | 1 count
onNext() | RxComputationThreadPool-1 | 00:25:43.638 | 2 count
onNext() | RxComputationThreadPool-1 | 00:25:44.634 | 3 count

Observable<Integer> source = Observable.range(0, 5);
source.map(number -> number + " count")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 00:30:31.688 | 0 count
onNext() | main | 00:30:31.701 | 1 count
onNext() | main | 00:30:31.701 | 2 count
onNext() | main | 00:30:31.702 | 3 count
onNext() | main | 00:30:31.702 | 4 count

Logger.log(LogType.PRINT, "# start");
Observable<String> timer = Observable.timer(2000, TimeUnit.MILLISECONDS)
.map(count -> "do work");
timer.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(5000);
print() | main | 00:49:51.384 | # start
onNext() | RxComputationThreadPool-1 | 00:49:53.455 | do work

Observable<LocalDateTime> defer = Observable.defer(() -> Observable.just(LocalDateTime.now()));
Observable<LocalDateTime> just = Observable.just(LocalDateTime.now());
defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer1 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just1 시간 : " + data));
Thread.sleep(5000);
defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer2 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just2 시간 : " + data));
onNext() | main | 19:25:15.472 | defer1 시간 : 2023-06-12T19:25:15.455439
onNext() | main | 19:25:15.487 | just1 시간 : 2023-06-12T19:25:15.452306
onNext() | main | 19:25:20.496 | defer2 시간 : 2023-06-12T19:25:20.494970
onNext() | main | 19:25:20.497 | just2 시간 : 2023-06-12T19:25:15.452306

List<String> name = List.of("lee", "kim", "park", "choi");
Observable<String> iterable = Observable.fromIterable(name);
iterable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 19:37:20.725 | lee
onNext() | main | 19:37:20.753 | kim
onNext() | main | 19:37:20.753 | park
onNext() | main | 19:37:20.753 | choi

public static void main(String[] args) {
Logger.log(LogType.PRINT, "# start");
// 처리 시간이 긴 작업
CompletableFuture<String> completableFuture = longWork();
// 처리 시간이 짧은 작업
shortWork();
Observable.fromFuture(completableFuture)
.subscribe(data -> Logger.log(LogType.PRINT, data));
}
private static CompletableFuture<String> longWork() {
return CompletableFuture.supplyAsync(() -> {
Logger.log(LogType.PRINT, "# long work start");
TimeUtil.sleep(10000);
return "# long work finish";
});
}
private static Boolean shortWork() {
Logger.log(LogType.PRINT, "# short work start");
TimeUtil.sleep(3000);
Logger.log(LogType.PRINT, "# short work finish");
return true;
}
print() | main | 20:13:33.918 | # start
print() | main | 20:13:33.949 | # short work start
print() | ForkJoinPool.commonPool-worker-3 | 20:13:33.950 | # long work start
print() | main | 20:13:36.955 | # short work finish
print() | main | 20:13:43.956 | # long work finish

List<String> name = List.of("lee", "kim", "park", "choi", "lerer", "lol");
Observable.fromIterable(name)
.filter(nameEl -> nameEl.contains("l"))
.filter(nameEl -> nameEl.contains("e"))
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 21:14:15.127 | lee
onNext() | main | 21:14:15.151 | lerer

List<String> name = List.of("lee", "lee", "kem", "aaa");
Observable.fromIterable(name)
.distinct(data -> data.split("")[1])
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
System.out.println("--------------------");
Observable.fromIterable(name)
.distinct()
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 21:53:31.321 | lee
onNext() | main | 21:53:31.344 | aaa
--------------------
onNext() | main | 21:53:31.345 | lee
onNext() | main | 21:53:31.345 | kem
onNext() | main | 21:53:31.345 | aaa

List<String> name = List.of("lee", "lee", "kem", "aaa");
Observable.fromIterable(name)
.take(2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 21:58:42.039 | lee
onNext() | main | 21:58:42.063 | lee
파라미터로 지정한 조건이 true가 될 때까지 데이터를 계속 통지
파라미터로 지정한 Observable이 최초 데이터를 통지할 때까지 데이터를 계속 통지List<String> name = List.of("lee", "kim", "park", "choi");
Observable.fromIterable(name)
.takeUntil((String data) -> data.equals("kim"))
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 23:12:01.752 | lee
onNext() | main | 23:12:01.776 | kim
Observable.interval(1000L, TimeUnit.MILLISECONDS)
.takeUntil(Observable.timer(5000L, TimeUnit.MILLISECONDS))
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(10000L);
onNext() | RxComputationThreadPool-2 | 23:20:38.276 | 0
onNext() | RxComputationThreadPool-2 | 23:20:39.251 | 1
onNext() | RxComputationThreadPool-2 | 23:20:40.252 | 2
onNext() | RxComputationThreadPool-2 | 23:20:41.251 | 3
onNext() | RxComputationThreadPool-2 | 23:20:42.250 | 4

List<String> name = List.of("lee", "kim", "park", "choi", "lerer", "lol");
Observable.fromIterable(name)
.skip(2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 23:25:54.845 | park
onNext() | main | 23:25:54.868 | choi
onNext() | main | 23:25:54.868 | lerer
onNext() | main | 23:25:54.868 | lol

List<Integer> numberList = List.of(1, 2, 3, 4);
Observable.fromIterable(numberList)
.map(number -> number * 3)
.subscribe(number -> Logger.log(LogType.ON_NEXT, number));
onNext() | main | 22:42:52.314 | 3
onNext() | main | 22:42:52.338 | 6
onNext() | main | 22:42:52.338 | 9
onNext() | main | 22:42:52.338 | 12


(1번 유형) 구구단 2단 출력
Observable.range(2, 1)
.flatMap(
dan -> Observable.range(1, 9)
.map(num -> dan + " * " + num + " = " + dan * num)
)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
(2번 유형) 구구단 2단 출력
Observable.range(2, 1)
.flatMap(
dan -> Observable.range(1, 9),
(sourceData, transformData) -> sourceData + " * " + transformData + " = " + sourceData * transformData
)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
onNext() | main | 21:32:16.024 | 2 * 1 = 2
onNext() | main | 21:32:16.034 | 2 * 2 = 4
onNext() | main | 21:32:16.034 | 2 * 3 = 6
onNext() | main | 21:32:16.035 | 2 * 4 = 8
onNext() | main | 21:32:16.035 | 2 * 5 = 10
onNext() | main | 21:32:16.035 | 2 * 6 = 12
onNext() | main | 21:32:16.035 | 2 * 7 = 14
onNext() | main | 21:32:16.035 | 2 * 8 = 16
onNext() | main | 21:32:16.035 | 2 * 9 = 18