Reactive Programming(리액티브 프로그래밍) - RxJava란

개발자 이상규·2023년 4월 13일

rxjava

목록 보기
1/2
post-thumbnail

Reactive Programming(리액티브 프로그래밍) 이란?


In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

변화의 전파데이터 흐름과 관련된 선언적 프로그래밍 패러다임이다.

  • 변화의 전파와 데이터 흐름 : 데이터가 변경 될 때마다 이벤트를 발생시켜서 데이터를 계속적으로 전달
  • 선언적 프로그래밍 : 실행할 동작을 구체적으로 명시하는 명령형 프로그래밍과 달리 선언형 프로그래밍은 단순히 목표를 선언



리액티브 프로그래밍을 사용하는 이유


비동기적인 이벤트 처리

리액티브 프로그래밍에서는 이벤트를 비동기적으로 처리하기 때문에, 이벤트가 발생하는 즉시 처리할 수 있습니다. 이는 일반적으로 스레드와 락에 의존하는 전통적인 동기적인 프로그래밍 방식보다 훨씬 효율적이며, 대규모 시스템에서 높은 성능을 보장합니다.

데이터 스트림 처리

리액티브 프로그래밍에서는 데이터 스트림을 다룰 수 있습니다. 이러한 데이터 스트림은 무한하거나 큰 양일 수 있으며, 이벤트가 발생할 때마다 해당 이벤트를 처리하는 콜백 함수를 등록합니다. 이러한 방식은 대규모 데이터 처리에 적합합니다.

반응형 시스템 구현

리액티브 프로그래밍은 반응형 시스템을 구현하는 데 적합합니다. 이는 시스템이 외부 요청에 신속하게 반응하고, 빠른 응답 시간을 제공할 수 있도록 하는 기능입니다.



리액티브의 개념이 적용된 예


Push 방식 : 데이터의 변화가 발생했을 때 변경이 발행한 곳에서 데이터를 보내주는 방식

  • RTC (Real Time Communication)
  • Socket
  • Push message 등

Pull 방식 : 변경된 데이터가 있는지 요청을 보내 질의하고 변경된 데이터를 가져오는 방식

  • 클라이언트 요청 - 서버 응답 방식의 어플리케이션
  • Java와 같은 절차지향형 프로그래밍 언어 등



Reactive Streams의 인터페이스


  • Publisher : 데이터를 생성하고 통지
  • Subscriber : 통지된 데이터를 전달받아서 처리
  • Subscription : 전달받을 데이터의 갯수를 요청하고 구독을 해지
  • Processor : Publisher와 Subscriber의 기능이 모두 있음



Publisher와 Subscriber간의 프로세스 흐름




RxJava 예제


코드

Observable.just(100, 200, 300, 400, 500)	// Observable
	.doOnNext(data -> log.info("doOnNext() => {}", data))
    .subscribeOn(Schedulers.io())			// 데이터의 발행, 데이터의 흐름을 결정짓는 Tread 지정, Scheduler
    .observeOn(Schedulers.computation())	// 데이터를 구독하고 처리하는 Tread 지정, Scheduler
	.filter(num -> num > 300)				// Operator
    .subscribe(num -> log.info("{} : {}", getThreadName(), num));		// Subscriber



용어


  • Observable : 데이터 소스
  • Operator (리액티브 연산자): 데이터 소스를 처리하는 함수
  • Scheduler : Thread 관리자
  • Subscriber : Observable이 발행하는 데이터를 구독하는 구독자

로그 결과

RxCachedThreadScheduler-1 : 100
RxCachedThreadScheduler-1 : 200
RxCachedThreadScheduler-1 : 300
RxCachedThreadScheduler-1 : 400
RxCachedThreadScheduler-1 : 500

RxComputationThreadPool-1 : 400
RxComputationThreadPool-1 : 400



Publisher의 종류


Cold Publisher

  • 생산자는 소비자가 구독할 때마다 데이터를 처음부터 새로 통지함
  • 데이터를 통지하는 새로운 타임라인이 생성됨
  • 소비자는 구독 시점과 상관없이 통지된 데이터를 처음부터 전달 받을 수 있음

Hot Publisher

  • 생산자는 소비자 수와 상관없이 데이터를 한번만 통지함
  • 즉, 데이터를 통지하는 타임라인은 하나임
  • 소비자는 발행된 데이터를 처음부터 전달받는게 아니라 구독한 시점에 통지된 데이터들만 전달받을 수 있음



배압 전략 (BackpressureStrategy)


Missing 전략

  • 배압을 적용하지 않음.
  • 나중에 onBackpressureXXX() 로 배압을 적용할 수 있음

ERROR 전략

  • 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지함
  • 소비자가 생산자의 통지 속도를 따라 잡지 못할때 에러 발생

BUFFER 전략 : DROP_LATEST

  • 버퍼가 가득 찬 시점에 버퍼내에서 가장 최근에 버퍼로 들어온 데이터를 DROP 함
  • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 넣음

BUFFER 전략 : DROP_OLDEST

  • 버퍼가 가득 찬 시점에 버퍼내에서 가장 먼저 들어온 데이터를 DROP 함
  • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 넣음

DROP 전략

  • 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 DROP하고 버퍼가 완전 비워진 시점부터 버퍼에 데이터를 담음

LATEST 전략

  • 버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서 대기하며 버퍼가 비워지는 시점에 가장 최근에 통지된 데이터부터 버퍼에 담음



데이터를 1건만 발행하는 생산자


Single

데이터를 1건만 통지하거나 에러를 통지

maybe

데이터를 1건만 통지하거나 1건도 통지하지 않고 완료 또는 에러를 통지

completable

데이터 생산자이지만 데이터를 1건도 통지하지 않고 완료 또는 에러를 통지하는 생산자





Reference :

  • Kevin의 알기 쉬운 RxJava
profile
Contact: leeeesanggyu@gmail.com

0개의 댓글