리액티브 프로그래밍 패러다임
- 빅데이터: 페타바이트 단위로 구성
- 다양한 환경: 모바일 디바이스 ~ 클라우드 기반 클러스터
- 사용패턴: 24시간 서비스, 밀리초 단위 응답
리액티브 매니패스토
- 리액티브 프로그래밍의 핵심 원칙
- 반응성 : 빠르면서 일정하고 예상할 수 있는 반응 시간을 제공한다.
- 회복성 : 장애가 발생해도 시스템은 반응성은 유지된다.
- 탄력성 : 무거운 작업 부하가 발생하면 자동으로 컴포넌트에 할당된 자원 수를 늘린다.
- 메시지 주도 : 컴포넌트 간의 약한 결합, 고립, 위치 투명성이 유지되도록 시스템은 비동기 메시지 전달에 의존한다.
애플리케이션 수준의 리액티브
- 이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 멀티코어 CPU의 사용률을 극대화할 수 있는 방법이다.
- 이를 위해 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 관리한다.
- 개발자 입장에서는 저수준의 멀티 스레드 문제를 직접 처리할 필요가 없어진다.
- 이벤트 루프 안에서는 절대 동작을 블락하지 않는다는 전제조건이 따른다.
- 데이터베이스, 파일 시스템 접근, 원격 소비스 호출 등 I/O 관련 동작 등등
- 비교적 짧은 시간동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며, 보통 이벤트 주도로 분류된다.
시스템 수준의 리액티브
- 여러 애플리케이션이 한개의 일관적이고 회복할 수 있는 플랫폼을 구성할 수 있게 해준다.
- 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 한다.
- 애플리케이션을 조립하고 상호소통을 조절한다. (메시지 주도)
- 컴포넌트에서 발생한 장애를 고립시킴으로 문제가 다른 컴포넌트로 전파되면서 전체 시스템 장애로 이어지는 것을 막는다.(회복성)
- 모든 컴포넌트는 수신자의 위치와 상관 없이 다른 모든 서비스와 통신할 수 있는 위치 투명성을 제공한다.
- 이를 통해 시스템을 복제할 수 있으며 작업 부하에 따라 애플리케이션을 확장할 수 있다.(탄력성)
Flow
- 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화할 수 없다.
- Flow 클래스의 인터페이스
첫 번째 리액티브 애플리케이션
public final class TempInfo {
private final String town;
private final int temp;
public TempInfo(final String town, final int temp) {
this.town = town;
this.temp = temp;
}
public static TempInfo fetch(String town) {
if (10분의 1 확률로 실패) {
throw new RuntimeException("ERROR");
}
return new TempInfo(town, random.nextInt(100));
}
}
public final class TempSubscriber implements Subscriber<TempInfo> {
private Subscription subscription;
@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(final TempInfo tempInfo) {
System.out.println(tempInfo);
subscription.request(1);
}
@Override
public void onError(final Throwable error) {
System.out.println(error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
public final class TempSubscription implements Subscription {
private final Subscriber<? super TempInfo> subscriber;
private final String town;
public TempSubscription(final Subscriber<? super TempInfo> subscriber, final String town) {
this.subscriber = subscriber;
this.town = town;
}
@Override
public void request(final long n) {
for (n 동안) {
try {
subscriber.onNext(TempInfo.fetch(town));
} catch (Exception e) {
subscriber.onError(e);
break;
}
}
}
@Override
public void cancel() {
subscriber.onComplete();
}
}
public final class Main {
public static void main(String[] args) {
getTemperatures("Busan").subscribe(new TempSubscriber());
}
private static Flow.Publisher<TempInfo> getTemperatures(String town) {
return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
}
Processor로 데이터 변환하기
public class TempProcessor implements Processor<TempInfo, TempInfo> {
private Subscriber<? super TempInfo> subscriber;
@Override
public void subscribe(Subscriber<? super TempInfo> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onNext(TempInfo tempInfo) {
subscriber.onNext(new TempInfo(tempInfo.getTown(), (tempInfo.getTemp() - 32) * 5 / 9));
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
public final class Main {
public static void main(String[] args) {
getTemperatures("Busan").subscribe(new TempSubscriber());
}
private static Flow.Publisher<TempInfo> getTemperatures(String town) {
return subscriber -> {
TempProcessor processor = new TempProcessor();
processor.subscribe(subscriber);
processor.onSubscribe(new TempSubscription(processor, town));
};
}
}
자바는 왜 플로 API 구현을 제공하지 않는가
- 해당 API가 나올 때 AKKA, RXJava 등의 여러 리액티브 프로그래밍 라이브러리가 존재
- 여러 라이브러리의 표준화를 위한 인터페이스로 활용
RxJava
- 넷플릭스에서 개발한 라이브러리로 자바에서 리액티브 애플리케이션을 구현하는 데 사용한다.
- RxJava는 Flow.Publisher를 구현하는 두 클래스를 제공한다.
io.reactivex.Flowable: 역압력을 지원하는 Flow
io.reactivex.Observable: 역압력을 지원하지 않는 Flow
- 역압력 적용을 권장하지 않는 이벤트
천 개 이하의 요소를 가진 스트림
마우스 움직임, 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트
* 자주 발생하지 않는 종류의 이벤트
Observable 만들고 사용하기
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
observable.subscribe(
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("onNext : " + integer);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
System.out.println("onError : " + throwable);
}
},
new Action() {
@Override
public void run() throws Throwable {
System.out.println("onComplete");
}
}
);
Observable을 변환하고 합치기
private Observable<TempInfo> getCelsiusTemperatureInRxJava(String town) {
return getTemperature(town)
.filter(tempInfo -> tempInfo.getTemp() > 0)
.map(temp -> new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9));
}
private Observable<TempInfo> getCelsiusTemperaturesInRxJava(String... towns) {
return Observable.merge(Arrays.stream(towns)
.map(TempObservable::getCelsiusTemperature)
.collect(toList()));
}