이 포스팅은 현재 작성중인 포스팅입니다 :) 👨💻 👋👋
리액티브 프로그래밍이란 무엇이고 어떻게 동작하는지 확인하기 앞서, 리액티브 프로그래밍 패러다임의 중요성이 증가하는 이유를 확인할 필요가 있다. 오늘날 소프트웨어의 아키텍쳐는 다음과 같은 세가지 이유로 상황이 변하고 있다.
리액티브 프로그래밍에서는 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리하고 합쳐서 이런 문제를 해결한다. 실제로 이런 패러다임에 맞게 설계된 어플리케이션은 발생한 데이터 항목을 바로 처리함으로써 사용자에게 높은 응답성을 제공한다. 게다가 한 개의 컴포넌트나 어플리케이션뿐만 아니라 전체의 리액티브 시스템을 구성하는 여러 컴포넌트를 조절하는데도 리액티브 기법을 사용할 수 있다.
이런 방식으로 구성된 시스템에서는 고장, 정전 같은 상태에 대처할 뿐 아니라 다양한 네트워크 상태에서 메시지를 교환하고 전달할 수 있으며 무거운 작업을 하고 있는 상황에서도 가용성 을 제공한다.
리액티브 매니패스토는 2013년과 2014년에 걸쳐 개발되었으며, 리액티브 어플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한다.
위 그림은 네가지 기능이 어떤 관계를 맺고 잇으며, 어떻게 다른 기능에 의존하는지 보여준다.
리액티브 매니페스토
는 회복성을 달성할 수 있는 다양한 기법을 제시한다.발송자 = Publisher, 수신자 = Subscriber
회복성
(장애를 메시지로 처리)과 탄력성
(주고받은 메시지의 수를 감시하고 메시지의 야에 따라 적절하게 리소스를 할당)을 얻을 수 있다.어플리케이션 수준 컴포넌트의 리액티브 프로그래밍의 주요 기능은 비동기로 작업을 수행할 수 있다는 점이다. 이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 최신 멀티코어 CPU의 사용률을 극대화 할 수 있는 방법이다. 이 목표를 달성할 수 있도록 리액티브 프레임워크와 라이브러리는 스레드를 퓨처
, 액터
, 일련의 콜백
을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.
스레드를 다시 쪼개는 종류의 기술을 이용할 때는 메인 이벤트 루프 안에서 절대 동작을 블록하지 않아야 한다는 중요한 전제 조건이 따른다. 데이터베이스나 파일 시스템 접근, 작업 완료까지 얼마나 걸릴지 예측이 힘든 원격 서비스 호출 등 모든 I/O 관련 동작이 블록 동작에 속한다. 실용적인 예제를 이용하면 왜 이런 블록 동작을 피해야 하는지 쉽게 이해할 수 있다.
한번에 오직 두 개의 스트림을 처리할 수 있는 상황이므로 가능하면 이들 스트림은 두 스레드를 효율적이고 공정하고 공유해야 한다. 어떤 스트림의 이벤트를 처리하다보니 파일 시스템 기록 또는 블록되는 API를 이용해 데이터베이스에서 파일을 가져오는 등 느린 I/O 작업이 시작되었다.
RxJava
, Akka
같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜 이 문제를 해결한다. 메인 풀의 모든 스레드는 방해받지 않고 실행되므로 모든 CPU코어가 최적의 상황에서 동작할 수 있다. CPU 관련 작업과 I/O 관련 작업을 분리하면 조금 더 정밀하게 풀의 크기 등을 설정할 수 있고 두 종류의 작업의 성능을 관찰할 수 있다.
리액티브 원칙을 따르는 것은 리액티브 프로그래밍의 일부일 뿐이며 가장 어려운 부분은 따로 있다. 리액티브 시스템을 만들려면 훌륭하게 설계된 리액티브 어플리케이션 집합이 서로 잘 조화를 이루게 만들어야 한다.
리액티브 어플리케이션은 비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다.
메시지는 정의된 목적지 하나를 향하는 반면, 이벤트는 관련 이벤트를 관찰하도록 등록한 컴포넌트가 수신한다는 점이 다르다.
장애(회복성)
, 높은 부하(탄력성)
에서도 반응성을 유지할 수 있다.좀 더 자세히 말하자면 리액티브 아키텍쳐에서는 컴포넌트에서 발생한 장애를 고립시킴으로문제가 주변의다른 컴포넌트로 전파되면서 전체 시스템 장애로 이어지는 것을 막음으로써 회복성을 제공한다. (MSA의 Circuit Braker 패턴과 비슷하네)
이런 맥락에서 회복성 은 결함 허용 능력 과 같은 의미를 지닌다.
고립과 비결합이 회복성의 핵심이라면, 탄력성의 핵심은 위치 투명성 이다. 위치 투명성은 리액티브 시스템의 모든 컴포넌트가 수신자의 위치에 상관없이 다른 모든 서비스와 통신할 수 있음을 의미한다. 위치 투명성 덕분에 시스템을 복제할 수 있으며 현재 작업 부하에 따라 어플리케이션을 확장할 수 있다.
발행-구독
프로토콜에서 이벤트 스트림의 구독자(Subscriber)가 발행자(Publisher)가 이벤트를 제공하는 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치다.부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알리거나, 얼마나 많은 이벤트를 수신할 수 있는지 알리거나, 다른 데이터를 받기전에 기존의 데이터를 처리하는데 얼마나 많은 시간이 걸릴지
업스트림 발행자
에게 알려야한다.
스트림 처리의 비동기적인 특성상 역압력 기능의 내장은 필수라는 사실을 알 수 있다.
실제 비동기 작업이 실행되는 동안 시스템에는 암묵적으로 블록 API로 인해 역압력이 제공된다.
물론 안타깝게도 비동기 작업을 실행하는 동안에는 그 작업이 안료될 때까지 다른 유용한 작업을 실행할 수 없으므로 기다리면서 많은 자원을 낭비하게 된다.
반면 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 다른 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성이 생긴다.
따라서 이런 상황을 방지할 수 있도록 역압력
이나 제어흐름기법
이 필요하다.
자바9에서는 리액티브 프로그래밍을 제공하는 클래스 java.util,concurrent.Flow 를 추가했다. 이 클래스는 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화할 수 있다. 리액티브 스트림 프로젝트 표준에 따라 프로그래밍 발행-구독 모델 을 지원할 수 있도록 Flow클래스는 중첩된 인터페이스 네 개를 포함한다.
Publisher
가 항목을 발행하면 Subsciber
가 한개씩 또는 한 번에 여러항목을 소비하는데 Subscription이 이 과정을 관리할 수 있도록 Flow 클래스는 관련된 인터페이스와 정적 메소드를 제공한다. Publihser
는 수많은 일련의 이벤트를 제공할 수 있지만 SubScriber
의 요구사항에 따라 역압력 기법에 의해 이벤트 제공속도가 제한된다. Publisher는 자바의 함수형 인터페이스로, Subscriber는 Publisher가 발행한 이벤트의 리스너로 자신을 등록할 수 있다. Subscription은 Publisher와 Subscriber사이의 제어흐름, 역압력을 관리한다.
@FunctionalInterface
public interface Publisher<T>{
void subscribe(Subscriber<? super T> s);
}
반면 Subscriber 인터페이스는 Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메소드 4개를 정의한다.
public interface Subscriber<T>{
// 처음 호출시
void onSubscribe(Subscription s);
// 여러번 onNext
void onNext(T t);
//error 발생시
void onError(Throwable t);
// 구독 종료
void onComplete();
}
이들 이벤트는 다음 프로토콜에서 정의한 순서로 지정된 메소드 호출을 통해 발행되어야 한다.
위 표기는 onSubscribe
메소드가 항상 처음 호출되고 이어서 onNext
가 여러번 호출될 수 있음을 의미한다. 이벤트 스트림은 영원히 지속되거나 아니면 onComplete
콜백을 통해 더이상의 데이터가 없고 종료됨을 알릴 수 있으며 또는 Publisher
에 장애가 발생했을 때는 onError
를 호출할 수 있다.
Subscriber가 Publisher에 자신을 등록할 때, Publisher는 처음으로 onSubscribe 메소드를 호출해 Subscription 객체를 전달, Subscription 인터페이스는 메소드 두 개를 정의한다. Subscription
은 첫번째 메소드로 Publihser에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알린다. 두 번째 메소드로는 Subscription을 취소, 즉 Publisher에게 더이상 이벤트를 받지 않음을 통지한다.
public interface Subscription {
void request(long n);
void cancel();
}
자바 9 플로 명세서에서는 이들 인터페이스 구현이 어떻게 서로 협력해야 하는지를 설명하는 규칙 집합을 정의한다. 다음은 이들 규칙을 요약한것.
Subscriber
객체에 다시 가입하는 것은 권장하진 않지만 이런 상황에서 예외가 발생해야 한다고 명세서에 강제X,다음은 플로 API에서 정의하는 인터페이스를 구현한 어플리케이션의 평범한 생명주기를 보여준다,
Flow 클래스의 네 번째이자 마지막 멤버 Processor 인터페이스는 단지 Publisher와 subscriber를 상속 받을 뿐 아무 메소드도 추가하지 않는다.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
실제 이 인터페이스는 리액티브 스트림에서 처리하는 이벤트의 변환단계를 나타낸다. Processor가 에러를 수신하면 이로부터 회복하거나(그리고 Subscription은 취소로 간주) 즉시 onError 신호로 모든 Subscriber에 에러를 전파
마지막으로 Subscriber가 Subscription을 취소하면 Processor는 자신의 업스트림 Subscription
도 취소함으로 취소 신호를 전파해야한다.
자바 9 플로 API/ 리액티브 스트림 API에서는 Subscriber 인터페이스의 모든 메소드 구현이 Publisher를 블록하지 않도록 강제하지만 이들 메소드가 이벤트를 동기적으로 처리해야하는지 아니면 비동기적으로 처리해야 하는지는 지정하지 않는다. 하지만 이들 인터페이스에 정의된 모든 메소드는 void를 반환하므로 온전히 비동기 방식으로 이들 메소드를 구현할 수 있다.
Flow 클래스에 정의된 인터페이스 대부분은 직접 구현하도록 의도된 것이 아니다.
그럼에도 자바9 라이브러리는 이들 인터페이스를 구현하는 클래스를 제공하지 않는다. 이전에 언급한 Akka, RxJava 등의 리액티브 라이브러리에서는 이들 인터페이스를 구현했다.
자바 9 java.util.concurrency.Flow
명세는 이들 라이브러리가 준수해야 할 규칙과 다양한 리액티브 라이브러리를 이용해 개발된 리액티브 애플리케이션이 서로 협동하고 소통할 수 있는 공용어를 제시한다.
더욱이 이들 리액티브 라이브러리는 보통 추가적인 기능을 제공한다. (java.util.concurrency.Flow 인터페이스에 정의된 최소하위 집합을 넘어 리액티브 스트림을 변형하고 합치는 기능과 관련된 클래스와 메소드등)
배경은 그렇지만 자바9 플로 API를 직접 이용하는 첫 리액티브 어플리케이션을 개발하면서 지금까지 배운 네 개의 인터페이스가 어떻게 동작하는지 쉽게 확인할 수 있다. 끝에 가서는 리액티브 원칙을 적용해 온도를 보고하는 간단한 프로그램을 완성
다음은 현재 보고된 온도를 전달하는 간단한 클래스를 정의
public class TempInfo {
public static final Random random = new Random();
private final String town;
private final int temp;
public TempInfo(String town, int temp) {
this.town = town;
this.temp = temp;
}
public static TempInfo fetch(String town) {
if (random.nextInt(10) == 0)
throw new RuntimeException("Error!");
return new TempInfo(town, random.nextInt(100));
}
@Override
public String toString() {
return "TempInfo{" +
"town='" + town + '\'' +
", temp=" + temp +
'}';
}
public String getTown() {
return town;
}
public int getTemp() {
return temp;
}
}
간단한 도메인 모델을 정의한 다음에는 다음 예제에서 보여주는 것처럼 Subscriber가 요청할때마다 해당 도시의 온도를 전송하도록 Subscription을 구현한다.
public class TempSubscriber implements Flow.Subscriber<TempInfo> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(TempInfo item) {
System.out.println(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done!");
}
}
다음 예제는 리액티브 어플리케이션이 실제 동작할 수 있도록 Publisher를 만들고 TempSubscriber를 이용해 Publisher에 구독하도록 Main 클래스를 구현할 코드다.
public class Main {
public static void main(String[] args) {
getTemperatures("New york").subscribe(new TempSubscriber());
}
private static Flow.Publisher<TempInfo> getTemperatures(String town) {
return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
}
위 코드의 결과는 다음과 같다.
TempInfo{town='New york', temp=12}
TempInfo{town='New york', temp=25}
TempInfo{town='New york', temp=51}
TempInfo{town='New york', temp=29}
TempInfo{town='New york', temp=80}
TempInfo{town='New york', temp=64}
TempInfo{town='New york', temp=26}
TempInfo{town='New york', temp=75}
Error!
위 코드의 Flow는 아래와 같다.
getTemperatures
메소드는 Subscriber
를 인수로 받아 onSubscribe 메소드르 호출Publisher
를 만들고 새 TempSubscriber 클래스 인스턴스를 자신에게 구독시킨다. 그렇다면 위 코드는 문제가 없는걸까?
Main을 오래 실행하다보면 결국 StackOverFlow
Error가 발생하는 것을 확인할 수 있다.
위와 같은 에러가 발생하는 이유는
TempSubscriber
가 새로운 요소를onNext
메소드로 받을 때마다TempSubscription
으로 새요청을 보내면 request 메소드가 TempSubscriber 자신에게 또 다른 요소를 보내눈 문제가 있다. 이런 재귀 호출은 다음과 같은 스택이 오버플로 될때까지 반복해서 일어난다.
그렇다면 위와 같은 스택 오버플로우 문제를 어떻게 해결할 수 있을까? Executor를 TempSubscription으로 추가한 다음 다른 스레드에서 TempSubscriber로 세 요소를 전달하는 방법이 있다. 그러려면 다음 예제처럼 TempSubscription을 바꿔야한다.
public class TempSubscription implements Flow.Subscription {
//나머지 코드 생략...
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public void request(long n) {
executorService.submit(() -> {
for (long i = 0L; i < n; i++) {
//subscriber가 만든 요청을 1개씩 반복
try {
subscriber.onNext(TempInfo.fetch(town));
} catch (Exception e) {
subscriber.onError(e);
break;
}
}
});
}
//나머지 코드 생략
}
위에서 설명한 것처럼 Processor
는 SubScriber
이며 동시에 Publisher
다. 사실 Processor의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것이다.
화씨로 제공된 데이터를 섭씨로 변환해 다시 방출하는 다음의 예제를 통해 Processor를 구현해보자.
//TempInfo를 다른 TempInfo로 변환해야하므로
public class TempProcessor implements Flow.Processor<TempInfo, TempInfo> {
private Flow.Subscriber<? super TempInfo> subscriber;
@Override
public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(TempInfo item) {
subscriber.onNext(new TempInfo(item.getTown(), (item.getTemp() - 32) * 5 / 9));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
public class Main {
public static void main(String[] args) {
// write your code here
getTemperatures("New york").subscribe(new TempSubscriber());
}
private static Flow.Publisher<TempInfo> getTemperatures(String town) {
return subscriber -> {
TempProcessor processor = new TempProcessor();
processor.subscribe(subscriber);
processor.onSubscribe(new TempSubscription(processor,town));
};
}
}
TempInfo{town='New york', temp=12}
TempInfo{town='New york', temp=-17}
TempInfo{town='New york', temp=0}
TempInfo{town='New york', temp=37}
TempInfo{town='New york', temp=-11}
TempInfo{town='New york', temp=-3}
TempInfo{town='New york', temp=-3}
TempInfo{town='New york', temp=34}
TempInfo{town='New york', temp=17}
TempInfo{town='New york', temp=5}
TempInfo{town='New york', temp=-7}
Error!
개인적인 생각으로는 JPA나 Slf4j 처럼 Bridge 패턴을 이용하여 정의와 구현체를 분리하여 Reactive 프로그래밍에 대한 규격을 제시하고 있지 않나 생각이 든다.
자바9 을 만들 당시 Akka
, Rxjava
등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했다. 이들 라이브러리는 독립적으로 개발되었고 서로 다른 이름 규칙과 API를 사용했다.
자바9의 표준화 과정에서 기존처럼 자신만의 방법이 아닌 이들 라이브러리는 공식적으로 java.util.concurrent.Flow
의 인터페이스를 기반으로 리액티브 개념을 구현하도록 진화했다.
이 덕분에 표준화 작업이 쉽게 진행될 수 있었다.
Rxjava는 자바로 리액티브 어플리케이션을 구현하는데 사용하는 라이브러리다.
Rxjava는 넷플릭스의 Reactive Extension 프로젝트의 일부로 시작되었다.
Rxjava 2.0 버전 이상부터 Reactive Streams API 와 자바9에 적용된 java.util.concurrent.Flow
를 지원하도록 구현되었다.
import java.lang.concurrent.Flow.*;
import io.reactivex.Observable;
이 시점에서 한 가지 중요한 아키텍처의 속성을 강조하고 싶다. 좋은 시스템 아키텍쳐 스타일을 유지하려면 시스템에서 오직 일부에 사용된 개념의 세부 사항을 전체 시스템에서 볼 수 있게 만들지 않아야 한다.
따라서 Observable은 추가 구조가 필요한 상황에서 사용하고 그렇지 않으면, Publisher인터페이스를 사용하는 것이 좋다.
지금부터는 RxJava의 리액티브 스트림의 구현을 이용해서 온도 보고 시스템을 정의한다.
RxJava는 Flow.Publihser 를 구현하는 두 클래스를 제공한다.
자바9에서 리액티브 당김 기반 역압력 기능이 있는 io.reactivex.Flowable
클래스를 확인할 수 있다.
나머지 클래스는 역압력을 지원하지 않는 기존 버전의 Rxjava
에서 제공하던 Publisher io.reactivex.Observable
클래스다.
이 클래스는 단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스 이벤트에 더 적합하다. 이들 이벤트 스트림에는 역압력을 적용하기 어렵기 때문이다.(마우스 움직임을 느리게 하거나 멈출수 없기 때문!)
그래서 안드로이드나 프론트 단에서 많이 사용하지 않나 싶다.
Rxjava는 천 개 이하의 요소를 가진 스트림이나 마우스 움직임, 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트 그리고 자주 발생하지 않는 종류의 이벤트에 역압력을 적용하지 말 것을 권장.
Observable
, Flowable
클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메소드를 제공한다.
다음처럼 미리 정의한 몇 개의 요소를 이용해 간단한 Observable을 만들 수 있다.
Observable<String> strings = observable.just("first", "second");
여기서
just()
팩토리메소드는 한 개 이상의 요소를 이용해 이를 방출하는Observable
로 변환한다. Observable의 구독자는 onNext("first") -> onNext("second") -> onComplete() 순서로 메시지를 받는다.
message 기반 vs event 기반 차이
If the producer must confirm that the information or command is delivered, knows who the intended recipient is, and likely wants some kind of response or action to occur, then it’s messaging.
메시지 기반은 producer가 응답이나 앞으로 일어날 명령등에 대해 반드시 알고 있다면 messaging 기반, 즉 누구에게 보낼지, 어떤행동을 보낼지 알고있다면 message 기반
An event is something that happens and the service where it happens publishes it to an event stream, regardless of what actions occur after that (if any). Other services that are interested in that type of event can subscribe to receive them. There can be any number of subscribers that will receive each event, including zero.
반면 이벤트 기반은 응답이나, 앞으로 일어날 명령등에 상관없이 producer가 event를 발행, 그냥 어떠한 action이 발생하면 listening 하고 있는 컴포넌트 모두에게 data를 발행하는 것
사용자와 실시간으로 상호 작용하면서 지정된 속도로이벤트를 방출하는 상황에서 유용하게 사용할 수 있는 다른 Observable 팩토리 메소드도 있다.
Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
팩토리 메소드 interval
은 onePerSec라는 변수로 Observable을 반환해 할당한다. 이 Observable
은 0에서 시작해 1초간격으로 Long 형식의 값을 무한으로 증가시키며 값을 방출한다. 이제 각 도시에서 매 초마다 온도 보고를방출하는 다른 Observable을 onePerSec로 대신할 것이다.
최종목표를 달성하기 전에 중간과정에서 이들 온도를 매 초마다 출력할 수 있다. 그러려면 onePerSec에 가입해서 매 초마다 온도를 받고 이를 이용해 관심이 있는 도시의 온도를 출력해야 한다.
Rxjava에서 Observable이 플로 API의 Publisher 역할을 하며 Observer는 Flow의 Subscriber와 같은 메소드를 정의하며 onSubscribe
메소드가 Subscription
대신 Disposable 인수를 갖는다는 점만 다르다. 이전에도 설명했듯이 Observable
은 역압력을 지원하지 않으므로 Subscription의 request 메소드를 포함하지 않는다. 다음은 Observer 인터페이스다.
public interface Obeserver<T> {
void onSubscribe(Disposble d);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
하지만 RxJava의 API는 자바9 네이티브 플로 API보다 유연하다. (= 많은 오버로드된 기능을 제공)
예를들어, 다른 세 메소드는 생략하고 onNext 메소드의 시그니처에 해당하는 람다 표현식을 전달해 Observable을 구독할 수 있다. 즉 이벤트를 수신하는 Consumer의 onNext만 구현하고 나머지 완료, 에러 처리 메소드는 아무것도 하지 않는 기본 동작을 가진
Observer
를 만들어 Observable에 가입할 수 있다.
onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("new York")));
위 코드에서 onePerSec Observable은 초당 한개의 이벤트를 방출하며, 메시지를 수신하면 Subscribe가 뉴욕의 온도를 추출해 출력한다. 하지만 위 코드를 main 메소드에 추가해서 실제 실행해보면 아무것도 출력되지 않는데, 이는 매 초마다 정보를 실행하는 Observable이 RxJava의 연산 스레드 풀, 즉 데몬 스레드에서 실행되기 때문이다.
main 프로그램은 실행하자마자 따로 실행할 코드가 없으므로 바로 종료되고 프로그램이 종료되었으므로 어떤 결과를 출력하기도 전에 데몬 스레드도 종료되면서 이런 현상이 일어난다.
위 코드 뒤에 스레드의 sleep 메소드를 추가해 프로그램이 종료되는걸 막는 방법도 있다. (왠지 매우 비효율적이라 생각..)
현재 스레드에서 콜백을 호출하는 blockingSubscribe 메소드를 사용하면 더 깔끔하게 문제를 해결할 수 있다. 위 예제에서는 blockingSubscribe
메소드가 더 적합하다.
TempInfo{town='new York', temp=55}
TempInfo{town='new York', temp=69}
TempInfo{town='new York', temp=92}
TempInfo{town='new York', temp=48}
TempInfo{town='new York', temp=69}
TempInfo{town='new York', temp=39}
TempInfo{town='new York', temp=72}
io.reactivex.exceptions.OnErrorNotImplementedException:
안타 깝게도 설계상 온도를 가져오는 기능이 임의로 실패하기 때문에 종료된다.
예제에서 구현한 Observersms onError 같은 에러 관리 기능을 포함하지 않으믈 위와 같은 처리되지 않은 예외가 사용자에게 직접 보여진다.
이제 예제의 난이도를 조금 높일 차례다. 에러 처리만 추가하는 것이 목표가 아니라 위기능을 일반화해야 한다. 온도를 직접 출력하지 않고 사용자에게 팩토리 메소드를 제공해 매 초마다 온도를 방출(편의상 최대 다섯번 온도를 방출하고 종료시킴)하는 Observable을 반환할 것이다.
public static Observable<TempInfo> getTemperature(String town) {
return Observable.create(observableEmitter ->
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(i -> {
if (!observableEmitter.isDisposed()) {
if (i >= 5) {
observableEmitter.onComplete();
} else {
try {
observableEmitter.onNext(TempInfo.fetch(town));
} catch (Exception e) {
observableEmitter.onError(e);
}
}
}
})
);
}
필요한 이벤트를 전송하는 ObservableEmitter
를 소비하는 함수로 Observable을 만들어 반환했다. RxJava의 ObservableEmitter 인터페이스는 Rxjava의 기본 Emitter를 상속한다
public interface Emitter<T>{
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
내부적으로 매 초마다 무한의 long값을 발행하는 onPerSec 같은 Observable을 구독했다. 다음 코드에서 보여주는 것처럼 getTemperature 메소드가 반환하는 Observable에 가입시킬 Observer를 쉽게 완성해서 전달된 온도를 출력할 수 있다.
public class TempObserver implements Observer<TempInfo> {
@Override
public void onSubscribe(@NonNull Disposable disposable) {
}
@Override
public void onNext(@NonNull TempInfo tempInfo) {
System.out.println(tempInfo);
}
@Override
public void onError(@NonNull Throwable throwable) {
System.out.println("Got problem" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done!");
}
}
public class Main {
public static void main(String[] args) {
// write your code here
Observable<TempInfo> observable = getTemperature("New York");
observable.blockingSubscribe(new TempObserver());
}
///생략...
}