RxJava - Operator(연산자)

개발자 이상규·2023년 6월 11일

rxjava

목록 보기
2/2
post-thumbnail

Operator란


RxJava에서 Operator(연산자)는 메소드

  • 연산자를 이용하여 데이터를 생성하고 통지하는 Flowable이나 Observable 등의 생산자를 생성할 수 있음
  • Flowable이나 Observable에서 통지한 데이터를 다양한 연산자를 사용해 가공 처리 하여 결과값을 만들어냄



Operator의 분류


  • Flowable/Observable 생성 연산자
  • 통지된 데이터를 필터링해주는 연산자
  • 통지된 데이터를 변환 해주는 연산자
  • 여러 개의 Flowable/Observable을 결합하는 연산자
  • 에러 처리 연산자
  • 유틸리티 연산자
  • 조건과 불린 연산자
  • 통지된 데이터를 집계 해주는 연산자





Flowable/Observable 생성 연산자


interval

  • 지정한 시간 간격마다 0부터 시작하는 숫자를 통지
  • initalDelay 파라미터를 이용하여 최초 통지 대기시간 지정 가능
  • 완료 없이 계속 통지
  • 호출한 Thread와는 별도의 Thread에서 실행
  • Polling 용도의 작업을 수행할떄 활용

code

Observable.interval(0L, 1000L, TimeUnit.MILLISECONDS)
        .map(num -> num + " count")
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

TimeUtil.sleep(3000);
    

결과 log

onNext() | RxComputationThreadPool-1 | 00:25:41.663 | 0 count
onNext() | RxComputationThreadPool-1 | 00:25:42.637 | 1 count
onNext() | RxComputationThreadPool-1 | 00:25:43.638 | 2 count
onNext() | RxComputationThreadPool-1 | 00:25:44.634 | 3 count

range

  • 지정한 값(n)부터 m개의 숫자(Integer)를 통지한다.
  • for, while 문 등의 반복문을 대체 할 수 있다.

code

Observable<Integer> source = Observable.range(0, 5);
source.map(number -> number + " count")
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 00:30:31.688 | 0 count
onNext() | main | 00:30:31.701 | 1 count
onNext() | main | 00:30:31.701 | 2 count
onNext() | main | 00:30:31.702 | 3 count
onNext() | main | 00:30:31.702 | 4 count

timer

  • 지정한 시간이 지나면 0(Long)을 통지
  • 0을 통지하고 onComplete() 이벤트가 발생하며 종료
  • 호출한 Thread와는 별도의 Thread에서 실행
  • 특정 시간을 대기한 후에 어떤 처리를 하고자 할때 활용

code

Logger.log(LogType.PRINT, "# start");

Observable<String> timer = Observable.timer(2000, TimeUnit.MILLISECONDS)
        .map(count -> "do work");

timer.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(5000);

결과 log

print() | main | 00:49:51.384 | # start
onNext() | RxComputationThreadPool-1 | 00:49:53.455 | do work

deffer

  • 구독이 발생할 때마다, 즉 subscribe()이 호출 될 때마다 새로운 Observable을 생성.
  • 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지.
  • 데이터 생성을 미루는 효과가 있기 때문에 최신 데이터를 얻고자 할때 활용

code

Observable<LocalDateTime> defer = Observable.defer(() -> Observable.just(LocalDateTime.now()));
Observable<LocalDateTime> just = Observable.just(LocalDateTime.now());

defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer1 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just1 시간 : " + data));

Thread.sleep(5000);

defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer2 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just2 시간 : " + data));

결과 log

onNext() | main | 19:25:15.472 | defer1 시간 : 2023-06-12T19:25:15.455439
onNext() | main | 19:25:15.487 | just1 시간 : 2023-06-12T19:25:15.452306
onNext() | main | 19:25:20.496 | defer2 시간 : 2023-06-12T19:25:20.494970
onNext() | main | 19:25:20.497 | just2 시간 : 2023-06-12T19:25:15.452306

fromIterable

  • Iterable 인터페이스를 구현한 클래스 ArrayList 등 을 파라미터로 받음
  • Iterable에 담긴 데이터를 순서대로 통지

code

List<String> name = List.of("lee", "kim", "park", "choi");

Observable<String> iterable = Observable.fromIterable(name);
iterable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 19:37:20.725 | lee
onNext() | main | 19:37:20.753 | kim
onNext() | main | 19:37:20.753 | park
onNext() | main | 19:37:20.753 | choi

fromFuture

  • Future 인터페이스는 java5에서 비동기 처리를 위해 생성된 동시성 API
  • 시간이 오래 걸리는 작업은 Future를 반환하는 ExcutorService에게 맡기고 비동기로 다른 작업을 진행
  • java8에서 CompletableFuture클래스를 통해 구현이 간결해짐

code

 public static void main(String[] args) {
    Logger.log(LogType.PRINT, "# start");

    // 처리 시간이 긴 작업
    CompletableFuture<String> completableFuture = longWork();
    // 처리 시간이 짧은 작업
    shortWork();

    Observable.fromFuture(completableFuture)
            .subscribe(data -> Logger.log(LogType.PRINT, data));
}

private static CompletableFuture<String> longWork() {
    return CompletableFuture.supplyAsync(() -> {
        Logger.log(LogType.PRINT, "# long work start");
        TimeUtil.sleep(10000);
        return "# long work finish";
    });
}

private static Boolean shortWork() {
    Logger.log(LogType.PRINT, "# short work start");
    TimeUtil.sleep(3000);
    Logger.log(LogType.PRINT, "# short work finish");
    return true;
}
    

결과 log

onNext() | RxComputationThreadPool-1 | 00:25:41.663 | 0 count
onNext() | RxComputationThreadPool-1 | 00:25:42.637 | 1 count
onNext() | RxComputationThreadPool-1 | 00:25:43.638 | 2 count
onNext() | RxComputationThreadPool-1 | 00:25:44.634 | 3 count

range

  • 지정한 값(n)부터 m개의 숫자(Integer)를 통지한다.
  • for, while 문 등의 반복문을 대체 할 수 있다.

code

Observable<Integer> source = Observable.range(0, 5);
source.map(number -> number + " count")
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 00:30:31.688 | 0 count
onNext() | main | 00:30:31.701 | 1 count
onNext() | main | 00:30:31.701 | 2 count
onNext() | main | 00:30:31.702 | 3 count
onNext() | main | 00:30:31.702 | 4 count

timer

  • 지정한 시간이 지나면 0(Long)을 통지
  • 0을 통지하고 onComplete() 이벤트가 발생하며 종료
  • 호출한 Thread와는 별도의 Thread에서 실행
  • 특정 시간을 대기한 후에 어떤 처리를 하고자 할때 활용

code

Logger.log(LogType.PRINT, "# start");

Observable<String> timer = Observable.timer(2000, TimeUnit.MILLISECONDS)
        .map(count -> "do work");

timer.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(5000);

결과 log

print() | main | 00:49:51.384 | # start
onNext() | RxComputationThreadPool-1 | 00:49:53.455 | do work

deffer

  • 구독이 발생할 때마다, 즉 subscribe()이 호출 될 때마다 새로운 Observable을 생성.
  • 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지.
  • 데이터 생성을 미루는 효과가 있기 때문에 최신 데이터를 얻고자 할때 활용

code

Observable<LocalDateTime> defer = Observable.defer(() -> Observable.just(LocalDateTime.now()));
Observable<LocalDateTime> just = Observable.just(LocalDateTime.now());

defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer1 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just1 시간 : " + data));

Thread.sleep(5000);

defer.subscribe(data -> Logger.log(LogType.ON_NEXT, "defer2 시간 : " + data));
just.subscribe(data -> Logger.log(LogType.ON_NEXT, "just2 시간 : " + data));

결과 log

onNext() | main | 19:25:15.472 | defer1 시간 : 2023-06-12T19:25:15.455439
onNext() | main | 19:25:15.487 | just1 시간 : 2023-06-12T19:25:15.452306
onNext() | main | 19:25:20.496 | defer2 시간 : 2023-06-12T19:25:20.494970
onNext() | main | 19:25:20.497 | just2 시간 : 2023-06-12T19:25:15.452306

fromIterable

  • Iterable 인터페이스를 구현한 클래스 ArrayList 등 을 파라미터로 받음
  • Iterable에 담긴 데이터를 순서대로 통지

code

List<String> name = List.of("lee", "kim", "park", "choi");

Observable<String> iterable = Observable.fromIterable(name);
iterable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 19:37:20.725 | lee
onNext() | main | 19:37:20.753 | kim
onNext() | main | 19:37:20.753 | park
onNext() | main | 19:37:20.753 | choi

fromFuture

  • Future 인터페이스는 java5에서 비동기 처리를 위해 생성된 동시성 API
  • 시간이 오래 걸리는 작업은 Future를 반환하는 ExcutorService에게 맡기고 비동기로 다른 작업을 진행
  • java8에서 CompletableFuture클래스를 통해 구현이 간결해짐

code

public static void main(String[] args) {
    Logger.log(LogType.PRINT, "# start");

    // 처리 시간이 긴 작업
    CompletableFuture<String> completableFuture = longWork();
    // 처리 시간이 짧은 작업
    shortWork();

    Observable.fromFuture(completableFuture)
            .subscribe(data -> Logger.log(LogType.PRINT, data));
}

private static CompletableFuture<String> longWork() {
    return CompletableFuture.supplyAsync(() -> {
        Logger.log(LogType.PRINT, "# long work start");
        TimeUtil.sleep(10000);
        return "# long work finish";
    });
}

private static Boolean shortWork() {
    Logger.log(LogType.PRINT, "# short work start");
    TimeUtil.sleep(3000);
    Logger.log(LogType.PRINT, "# short work finish");
    return true;
}

결과 log

print() | main | 20:13:33.918 | # start
print() | main | 20:13:33.949 | # short work start
print() | ForkJoinPool.commonPool-worker-3 | 20:13:33.950 | # long work start
print() | main | 20:13:36.955 | # short work finish
print() | main | 20:13:43.956 | # long work finish





Flowable/Observable 데이터 필터링 연산자


filter

  • 전달 받은 데이터가 조건에 맞는지 확인한 후, 결과가 true인 데이터만 통지
  • 파라미터로 받는 Predicate 함수형 인터페이스에서 조건을 확인

code

List<String> name = List.of("lee", "kim", "park", "choi", "lerer", "lol");

Observable.fromIterable(name)
        .filter(nameEl -> nameEl.contains("l"))
        .filter(nameEl -> nameEl.contains("e"))
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 21:14:15.127 | lee
onNext() | main | 21:14:15.151 | lerer

distinct

  • 이미 통지된 동일한 데이터가 있다면 이후 동일한 데이터는 통지하지 않음

code

List<String> name = List.of("lee", "lee", "kem", "aaa");

Observable.fromIterable(name)
        .distinct(data -> data.split("")[1])
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

System.out.println("--------------------");

Observable.fromIterable(name)
        .distinct()
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 21:53:31.321 | lee
onNext() | main | 21:53:31.344 | aaa
--------------------
onNext() | main | 21:53:31.345 | lee
onNext() | main | 21:53:31.345 | kem
onNext() | main | 21:53:31.345 | aaa

take

  • 파라미터로 지정한 갯수나 기간이 될 때까지 데이터를 통지

code

 List<String> name = List.of("lee", "lee", "kem", "aaa");

Observable.fromIterable(name)
        .take(2)
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 21:58:42.039 | lee
onNext() | main | 21:58:42.063 | lee

takeUntil

  • 조건
    1. 파라미터로 지정한 조건이 true가 될 때까지 데이터를 계속 통지
    2. 파라미터로 지정한 Observable이 최초 데이터를 통지할 때까지 데이터를 계속 통지
      timer와 함께 사용하여 특정 시점이 되기 전까지 데이터를 발행하는데 활용

1 파라미터 지정 조건 takeUntil - code

List<String> name = List.of("lee", "kim", "park", "choi");

Observable.fromIterable(name)
        .takeUntil((String data) -> data.equals("kim"))
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

1 파라미터 지정 조건 takeUntil - 결과 log

onNext() | main | 23:12:01.752 | lee
onNext() | main | 23:12:01.776 | kim

2 파라미터 지정 Observable takeUntil - code

Observable.interval(1000L, TimeUnit.MILLISECONDS)
         .takeUntil(Observable.timer(5000L, TimeUnit.MILLISECONDS))
         .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

TimeUtil.sleep(10000L);

2 파라미터 지정 Observable takeUntil - 결과 log

onNext() | RxComputationThreadPool-2 | 23:20:38.276 | 0
onNext() | RxComputationThreadPool-2 | 23:20:39.251 | 1
onNext() | RxComputationThreadPool-2 | 23:20:40.252 | 2
onNext() | RxComputationThreadPool-2 | 23:20:41.251 | 3
onNext() | RxComputationThreadPool-2 | 23:20:42.250 | 4

skip

  • 파라미터로 지정한 숫자만큼 데이터를 건너뛴후 데이터를 통지

code

List<String> name = List.of("lee", "kim", "park", "choi", "lerer", "lol");

Observable.fromIterable(name)
        .skip(2)
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 23:25:54.845 | park
onNext() | main | 23:25:54.868 | choi
onNext() | main | 23:25:54.868 | lerer
onNext() | main | 23:25:54.868 | lol





Flowable/Observable 데이터 변환 연산자


map

  • 원본 Observable에서 통지하는 데이터를 원하는 값으로 변환 후 통지
  • null을 반환하면 NullPointException이 발생하므로 null이 아닌 데이터 하나를 반드시 반환해야 함

code

List<Integer> numberList = List.of(1, 2, 3, 4);

Observable.fromIterable(numberList)
        .map(number -> number * 3)
        .subscribe(number -> Logger.log(LogType.ON_NEXT, number));

결과 log

onNext() | main | 22:42:52.314 | 3
onNext() | main | 22:42:52.338 | 6
onNext() | main | 22:42:52.338 | 9
onNext() | main | 22:42:52.338 | 12

FlatMap

  • 원본 데이터를 원하는 값으로 변환 후 통지하는것은 map과 동일
  • map은 일대일 변환이고 flatMap은 일대다 변환하므로 한개로 여러 데이터를 통지 할 수 있음
  • map은 변환된 데이터를 반환하지만 flatMap은 변환 된 여러개의 데이터를 담고 있는 새로운 Observable을 반환
  • 원본 데이터와 변환된 데이터를 조합해서 새로운 데이터를 통지
  • 즉, Observable에 원본 데이터 + 변환된 데이터 = 최종 데이터 를 실어서 반환

code

(1번 유형) 구구단 2단 출력

Observable.range(2, 1)
        .flatMap(
                dan -> Observable.range(1, 9)
                        .map(num -> dan + " * " + num + " = " + dan * num)
        )
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

(2번 유형) 구구단 2단 출력

Observable.range(2, 1)
        .flatMap(
                dan -> Observable.range(1, 9),
                (sourceData, transformData) -> sourceData + " * " + transformData + " = " + sourceData * transformData
        )
        .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

결과 log

onNext() | main | 21:32:16.024 | 2 * 1 = 2
onNext() | main | 21:32:16.034 | 2 * 2 = 4
onNext() | main | 21:32:16.034 | 2 * 3 = 6
onNext() | main | 21:32:16.035 | 2 * 4 = 8
onNext() | main | 21:32:16.035 | 2 * 5 = 10
onNext() | main | 21:32:16.035 | 2 * 6 = 12
onNext() | main | 21:32:16.035 | 2 * 7 = 14
onNext() | main | 21:32:16.035 | 2 * 8 = 16
onNext() | main | 21:32:16.035 | 2 * 9 = 18
profile
Contact: leeeesanggyu@gmail.com

0개의 댓글