Chapter 17 리액티브 프로그래밍

·2023년 3월 24일
0

모던 자바 스터디

목록 보기
3/3

리액티브 프로그래밍 패러다임

  • 빅데이터: 페타바이트 단위로 구성
  • 다양한 환경: 모바일 디바이스 ~ 클라우드 기반 클러스터
  • 사용패턴: 24시간 서비스, 밀리초 단위 응답

리액티브 매니패스토

  • 리액티브 프로그래밍의 핵심 원칙
    • 반응성 : 빠르면서 일정하고 예상할 수 있는 반응 시간을 제공한다.
    • 회복성 : 장애가 발생해도 시스템은 반응성은 유지된다.
    • 탄력성 : 무거운 작업 부하가 발생하면 자동으로 컴포넌트에 할당된 자원 수를 늘린다.
    • 메시지 주도 : 컴포넌트 간의 약한 결합, 고립, 위치 투명성이 유지되도록 시스템은 비동기 메시지 전달에 의존한다.

애플리케이션 수준의 리액티브

  • 이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 멀티코어 CPU의 사용률을 극대화할 수 있는 방법이다.
    • 이를 위해 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 관리한다.
  • 개발자 입장에서는 저수준의 멀티 스레드 문제를 직접 처리할 필요가 없어진다.
  • 이벤트 루프 안에서는 절대 동작을 블락하지 않는다는 전제조건이 따른다.
    • 데이터베이스, 파일 시스템 접근, 원격 소비스 호출 등 I/O 관련 동작 등등
  • 비교적 짧은 시간동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며, 보통 이벤트 주도로 분류된다.

시스템 수준의 리액티브

  • 여러 애플리케이션이 한개의 일관적이고 회복할 수 있는 플랫폼을 구성할 수 있게 해준다.
  • 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 한다.
  • 애플리케이션을 조립하고 상호소통을 조절한다. (메시지 주도)
  • 컴포넌트에서 발생한 장애를 고립시킴으로 문제가 다른 컴포넌트로 전파되면서 전체 시스템 장애로 이어지는 것을 막는다.(회복성)
  • 모든 컴포넌트는 수신자의 위치와 상관 없이 다른 모든 서비스와 통신할 수 있는 위치 투명성을 제공한다.
    • 이를 통해 시스템을 복제할 수 있으며 작업 부하에 따라 애플리케이션을 확장할 수 있다.(탄력성)

Flow

  • 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화할 수 없다.
  • Flow 클래스의 인터페이스

첫 번째 리액티브 애플리케이션

public final class TempInfo {
    private final String town;
    private final int temp;

    public TempInfo(final String town, final int temp) {
        this.town = town;
        this.temp = temp;
    }

    public static TempInfo fetch(String town) {
        if (10분의 1 확률로 실패) {
            throw new RuntimeException("ERROR");
        }

        return new TempInfo(town, random.nextInt(100));
    }
}
public final class TempSubscriber implements Subscriber<TempInfo> {
    private Subscription subscription;

    @Override
    public void onSubscribe(final Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final TempInfo tempInfo) {
        System.out.println(tempInfo);
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable error) {
        System.out.println(error.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
}
public final class TempSubscription implements Subscription {
    private final Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(final Subscriber<? super TempInfo> subscriber, final String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(final long n) {
        for (n 동안) {
            try {
                subscriber.onNext(TempInfo.fetch(town));
            } catch (Exception e) {
                subscriber.onError(e);
                break;
            }
        }
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}
public final class Main {
    public static void main(String[] args) {
        getTemperatures("Busan").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
    }
}

Processor로 데이터 변환하기

public class TempProcessor implements Processor<TempInfo, TempInfo> {
    private Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        // 온도를 섭씨로 변환한 값으로 TempInfo를 만들어 다시 전송
        subscriber.onNext(new TempInfo(tempInfo.getTown(), (tempInfo.getTemp() - 32) * 5 / 9));
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}
public final class Main {
    public static void main(String[] args) {
        getTemperatures("Busan").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor, town));
        };
    }
}

자바는 왜 플로 API 구현을 제공하지 않는가

  • 해당 API가 나올 때 AKKA, RXJava 등의 여러 리액티브 프로그래밍 라이브러리가 존재
  • 여러 라이브러리의 표준화를 위한 인터페이스로 활용

RxJava

  • 넷플릭스에서 개발한 라이브러리로 자바에서 리액티브 애플리케이션을 구현하는 데 사용한다.
  • RxJava는 Flow.Publisher를 구현하는 두 클래스를 제공한다.
    io.reactivex.Flowable: 역압력을 지원하는 Flow
    io.reactivex.Observable: 역압력을 지원하지 않는 Flow
  • 역압력 적용을 권장하지 않는 이벤트
    천 개 이하의 요소를 가진 스트림
    마우스 움직임, 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트
    * 자주 발생하지 않는 종류의 이벤트

Observable 만들고 사용하기

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
});

observable.subscribe(
        // onNext
        new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                System.out.println("onNext : " + integer);
            }
        },
        // onError
        new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {
                System.out.println("onError : " + throwable);
            }
        },
        // onComplete
        new Action() {
            @Override
            public void run() throws Throwable {
                System.out.println("onComplete");
            }
        }
);

Observable을 변환하고 합치기

private Observable<TempInfo> getCelsiusTemperatureInRxJava(String town) {
    return getTemperature(town)
            .filter(tempInfo -> tempInfo.getTemp() > 0)
            .map(temp -> new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9));
}

private Observable<TempInfo> getCelsiusTemperaturesInRxJava(String... towns) {
    return Observable.merge(Arrays.stream(towns)
            .map(TempObservable::getCelsiusTemperature)
            .collect(toList()));
}
profile
渽晛

0개의 댓글