3. RXJava 반응형 프로그래밍-1

Jamezz Dev·2020년 7월 25일
0

android-Architecture

목록 보기
6/8

1. RxJava 란

  • ReactivX를 자바로 구현한 라이브러리 이다.

RxJava 설정하기

  • RxJava 를 사용하려면 Build.gradle에 다음과 같이 설정해야 한다
dependencies{
	implementation "io.reactivex.rxjava3:rxjava:3.0.0"
    implementation "io.reactivex.rxjava3:rxjava:3.0.0"
}
  • java 8 언어 기능은 부분적으로 지원하지만, 람다식 및 메서드 기능은 모든 버전에서 호환된다
android{
	compileOptions{
		sourceCompatibility JavaVersion.VERSION_1_8
		targetCompatibility JavaVersion.VERSION_1_8
	}
}

반응형 프로그래밍 과 명령형 프로그래밍의 차이점

1. 명령형 프로그래밍

  • 작성된 코드가 정해진 순서대로 실행되는 방식의 프로그래밍을 의미

2. 반응형 프로그래밍

  • 주변 환경과 끊임없이 상호작용을 하는 프로그래밍을 의미하며, 프로그래밍이 주도 하는것이 아닌 환경이 변하면 이벤트를 받아 동작하도록 만드는 프로그래밍 기법을 의미한다.

마블 다이어 그램

  • 반응형 프로그래밍에서 일어나는 비동기적인 데이터 흐름을 시각화 한 도표로 내용을 이용하는데 도움을 준다 .

2. Obserable

  • RxJava dptjsms Obserable을 구독하는 Observer가 존재하고 Obserable을 순차적으로 발행하는 데이터에 대해서 반응한다.
  • Obserable을 다음 3가지 이벤트를 사용하여 동작한다.

1) onNext()
하나의 소스 Obserable 에서 observer 까지 한 번에 하나씩 순차적으로 데이터를 발행한다.
2) onComplete()
데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext() 호출이 발생하지 않음을 나타낸다.
3) onError()
오류가 발생했음을 Observer 에 전달한다.

Observable 생성하기

1. create() 연산자

  • Observable.create()을 사용하면 emitter를 이용하여 직접 아이템을 발행하고 아이템의 완료 및 오류의 알림을 직접 설정이 가능하다.
Obserable<String> source = Obserable.create(emitter -> { 
	emitter.onNext("hello")
	emitter.onNext("world")
	emitter.onComplete();
});
source.subscribe(system.out::println);

//결과 
Hello
World
  • onComplete 메서드 호출후에는 아이템이 추가로 발행하더라도 구독자는 데이터를 통지 받지 못한다.
Obserable<String> source = Obserable.create(emitter -> { 
	emitter.onNext("hello")
	emitter.onComplete();
	emitter.onNext("world")
});
source.subscribe(system.out::println);

//결과 
Hello
  • 만약 오류가 발생할 시에는 Emitter 를 통해 onError(Throwable)를 호출해야 하며 구독자는 이를 적절히 처리해야 한다.
Obserable<String> source = Obserable.create(emitter -> { 
	emitter.onNext("hello")
	emitter.onError(new Throwable());
	emitter.onNext("world")
});
source.subscribe(system.out::println,
throwable -> System.out.println("error!")
);

//결과 
Hello
error

2.just()연산자

  • 해당 아이템을 그대로 발행하는 Obserable 을 생성해 준다.
  • Obserable.empty() 을 이용해 기존에 null을 기본인자로 넣었을때 오류가 발생하는 것을 방지한다.
Obserable<String> source = Obserable.just("hello","world");
source.subscribe(System.out::println);

//결과 
Hello
World

간단히 Obserable로 변환하기

  • 이미 참조할수 있는 배열 및 리스트 등의 자료 구조나 future, callable 또는 publisher가 있다면 from으로 시작하는 연산자를 통해 Obserable로 변환이 가능하다.

    from 과 관련된 메서드를 확인해 본다
    fromArray()
    fromIterable()
    fromFuture()
    fromPublisher()
    fromCallable()

1. fromArray() 연산자

  • 가지고 있는 아이템들이 배열일때 Obserable Source로 발행할 수 있다.
String[] itemArray = new String[]{"a","b","c"};
Obserable source = Obserable.fromArray(itemArray);
source.subscribe(Sytem.out::println);

//실행결과
a
b
c

2.fromIterable() 연산자

  • ArrayList,HashSet 등과 같이 Iteralbe을 구현한 자료구조 클래스를 Obserable 로 발행이 가능 하다
ArrayList itemList = new ArrayList();
itemList.add("a");
itemList.add("b");
itemList.add("c");
Obserable source = Obserable.fromIterable(itemList);
source.subscribe(System.out::println);
// 실행결과 
a
b
c

3. fromFuture() 연산자

  • Future 인터페이스는 비동기적인 작업의 결과를 구할때 이용된다. Future 또한 fromFuture연산자를 통해 Obserable로 변경이 가능하다.
Future<String> future = Executors.newSingleThreadExecutor().submit(() - > {
          Thread.sleep(5000);
          return "Hello World";
      });
Obserable source = Obserable.fromFuture(future);
source.subscribe(System.out::println); 

//실행결과
Hello World

4. fromPublisher()

  • Publisher 는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로부터 받은 요청을 받아 아이템을 발행한다.
  • publisher 를 Obserable Source로 다음과 같이 변환 가능하다
Publisher<String> publisher = subscriber ->{
    subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onComplete();
};
Obserable<String> source = Obserable.fromPublisher(publisher);
source.subscribe(System.out::println);

5. fromCallable() 연산자

  • 비동기적인 실행 결과를 반환한다는 점에서 Runnable()과 다르다
  • callable을 Obserable로 변환하고 비동기적으로 아이템을 발행할수 있다.
Callable<String> callable = () ->"Hello World";
Obserable source = Obserable.fromCallable(callable);
source.subscribe(System.out::println);

//실행결과 
hello world

다양한 Obserable 형태

  • Obserable 이외에 다른 스트림이 존재한다 바로 Single ,Maybe 그리고 Completable이다.

Single

  • Obserable과는 다르게 단 하나의 아이템만을 발행하는 특징이 있다. 그러므로 just() 연산자에는 하나의 인자만을 취할수 있다.
Single.just("hello world").subscribe(Systm.out::println)
  • create() 연산자를 사용하는 경우 Emitter 를 사용하여 데이터를 발행한다. 데이터를 단 한번만 발행하기에 onNext()와 onComplete()메서드를 호출하는 대신 onSuccess() 로 두 메서드를 한번에 대체한다
Single.create(emitter -> emitter.success("Hello"))
.subscribe(System.out::println);
  • 오류를 다루는 경우 Emitter 와 동일하게 onError()를 호출하여 오류를 구독자들에게 통지할 수 있다.
Obserable<Integer> src = Obserable.just(1,2,3);
Single<Boolean> singleSrc1 = src.all(i -> i>0);
Single<Integer> singleSrc2 = src.first(-1);
Single<List<Integer>> singleSrc3 = src.toList();
  • Single 도 필요에 따라 Obserable로 변환해야 하는경우가 있는데 toObserable() 연산자를 사용할 수 있다.
Single<String> singleSrc = Single.just("Hello World");
Obserable<String> obserableSrc = singleSrc.toObserable();

Maybe

  • Single 과는 비슷하지만 아이템을 발행하거나 발행하지 않을 수 있다는 점에서 차이가 있다.
  • 아이템 발행시 onSuccess(T) 호출
  • 아이템 발행 하지 않을 때 onComplete()호출한다.
  • onSuccess() 호출할때 onComplete()를 호출할 필요 없다.
Maybe.create(emitter ->{
    emitter.onSuccess(100);
    emitter.onComplete();    
})
    .doOnSuccess(item ->System.out.println("doOnSuccess"))
    .doOnComplete(() -> System.out.println("doOnComplete"))
    .subscribe(Sytem.out::println);

Maybe.create(emitter -> emitter.onComplete())
    .doOnSuccess(item -> System.out.println("doOnSuccess2"))
    .doOnComplete(()->System.out.println("doOnComplete2"))
    .subscribe(System.out::println);
    
//실행결과 
doOnSuccess
100
doOnComplete2
  • 몇가지 Obserable 연산자는 반환 타입을 Maybe로 반환한다.
Obserable<Integer> src1 = Obserable.just(1,2,3);
Maybe<Integer> srcMaybe1 = src1.firstElement();
srcMaybe1.subscribe(System.out::println);

Obserable<Integer> src2 = Obserable.empty();
Maybe<Integer> srcMaybe2 = src2.firstElement();
srcMaybe2.subscribe(System.out::println,throwable -> {},
	() -> System.out.println("onComplete!"));
//실행결과 
1
onComplete!

Completable

  • Completable은 아이템을 발행하지 않고 단지 정상적으로 실행이 종료되었는지에 대해 관심갖는다.
Completable.create(emitter ->{
    //do something here
    emitter.onComplete();
}).subscribe(()->System.out.println("completed1");

Completable.fromRunnable(() ->{
	//do Something here
}).subscribe(() -> System.out.println("completed2"));

//실행 결과 
completed1
completed2 

Cold Obserable 과 Hot Obserable의 차이

  • Obserable 을 구현하는 방식에서 Obserable 과 Observer에서 미묘한 동작 차이가 있다

Cold Obserable

  • Obserable에 구독을 요청하면 아이템을 발행하기 시작한다. 아이템을 처음부터 끝까지 발행되고 임의로 종료를 하지 않는 이상 여러번 요청을 하더라고 처음부터 끝까지 발행하는 것을 보장한다 .
Obserable src = Obserable.interval(1,TimeUnit.SECONDS);
src.subscribe(value -> System.out.println("#1 : "+value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("#2 : "+value));
Thread.sleep(3000);

//실행결과 
#1:0
#1:1
#1:2
#1:3
#2:0
#1:4
#2:1
#1:5
#2:2

Hot Obserable

  • Hot Obserable 은 아이템이 발행이 시작된 이후로 모든 구독자에게 동시에 같은 아이템을 발행한다.
    -예를 들어 첫번째 구독자가 아이템을 발행하는 Obserable을 구독하고 몇초뒤에 두번째 구독자가 같은 Obserable을 구독한다면 이 둘을 같은 Obserable을 구독하지만 두번째 구독자는 구독하기 전에 발행된 아이템을 놓칠 수도 있다.

publish 연산자와 connect 연산자

  • ConnectableObserable은 Hot Obserable을 구현할 수 있도록 도와주는 타입으로 Obserable , publish 연산자를 통해 ConnectableObserable로 변환할 수 있다.
  • connect() 연산자를 호출할 때 비로소 아이템을 발행하기 시작한다.
ConnectableObserable src = Obserable.interval(1,TimeUnit.SECONDS).publish();
src.connect();
src.subscribe(value -> System.out.println("#1 : "+value));
Thead.sleep(3000);
src.subscribe(value -> System.out.println("#2: "+value));
Thread.sleep(3000);

//실행결과 
#1:0
#1:1
#1:2
#1:3
#2:3
#1:4
#2:4
#1:5
#2:5

autoConnect 연산자

  • autoConnect 연산자는 connect를 호출하지 않더라도 구독시에 즉각 아이템을 발행할 수 있도록 도와주는 연산자다.
  • autoConnect 연산자의 매개변수는 아이템을 발행하는 구독자 수로 , 지정된 구독자가 되어야만 아이템을 발행하기 시작한다.
Obserable<Long> src = Obserable.interval(100,TimeUnit.MILLISECONDS)
.publish()
.autoConnect(2);
src.subscribe(i -> System.out.println("A:"+i));
src.subscribe(i ->System.out.println("B:"+i));
Thread.sleep(500);

//실행결과 
A:0
B:0
A:1
B:1
A:2
B:2
A:3
B:3
A:4
B:4

Disposable 다루기

  • subscribe() 메서드를 호출하면 Disposable 객체를 반환한다.
Obserable source = Obserable.just("A","B","C");
Disposable disposable = source.subscribe(o ->System.out.println(source));
  • 유한한 아이템을 발행하는 Obserable의 경우 onComplete 호출로 안전하게 종료 된다. 하지만 무한하게 아이템을 발행하거나, 안드로이드의 생명주기와 관련된 경우는 메모리 누수를 방지하기 위해 dispose 가 필요하다

  • Disposable.dispose() 메서드 이용시 아이템 발행을 중단할 수 있다.

Obserable source = Obserable.interval(1000,TimeUnit.MILISECONDS);
//1초에 한번씩 아이템 발행 
Disposable disposable = source.subscribe(System.out::println);
new Thread(() ->{
try{
Thread.sleep(3500);
}catch(InterruptedException e){
e.printStackTrace();
}
disposable.dispose();
}).start();

//실행결과
0
1
2
  • Ovseable을 Dispose() 하면 아이템의 발행이 중지되고 모든 리소스가 폐기된다. 리소스를 폐기가 되었는지 확인하려면 Disposable.isDisposed() 메서드를 활용할 수 있으며 dispose() 메서드 내부에는 폐기여부를 체크하기에 isDisposed()의 결과를 확인하고 Dispose()를 호출할 필요는 없다.

CompositeDisposable

  • 만약 구독자가 여러곳에 있고 이들을 폐기하려면 각각의 Disposable() 객체에 대해서 dispose()를 호출해야 한다 하지만 CompositDisposable을 이용하면 한번에 폐기가 가능하다
Obserable Source = Obserable.interval(1000,TimeUnit.MILLISECONDS);

Disposable d1 = source.subscribe(System.out::println);
Disposable d2 = source.subscribe(System.out::println);
Disposable d3 = source.subscribe(System.out::println);
CompositeDisposable cd = new CompositDisposable();
cd.add(d1);
cd.add(d2);
cd.add(d3);
//또는 
cd.addAll(d1,d2,d3);
//특정 시점에 폐기하기 
cd.dispose();
profile
💻디지털 노마드를 🚀꿈꾸는 🇯🇲자메즈 🐥개발자 입니다.

0개의 댓글