리액티브 프로그래밍에 대하여

모지리 개발자·2022년 12월 3일
0

Reactive Programming

목록 보기
1/2

이 글은 인프런의 Kevin의 알기 쉬운 RXJava 1부를 참고합니다.

리액티브 프로그래밍이란

위키피디아에서는 리액티브 프로그래밍에 대해 다음과 같이 정의 합니다.

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
(변화의 전파와 데이터 흐름과 관련된 선언적 프로그래밍 패러다임입니다.)

  • 변화의 전파와 데이터 흐름 : 데이터가 변경 될 때마다 이벤트를 발생시켜서 데이터를 계속적으로 전달합니다.
  • 선언적 프로그래밍 : 실행할 동작을 구체적으로 명시하는 명령형 프로그래밍과 달리 선언형 프로그래밍은 단순히 목표를 선언한다.

명령형 프로그래밍과 선언형 프로그래밍의 차이를 코드의 예시로 보여드리겠습니다.

명령형 프로그래밍

public class ImperativeProgramming {
    public static void main(String[] args){
        // List에 있는 숫자들 중에서 6보다 큰 홀수들의 합계를 구하세요.
        List<Integer> numbers = Arrays.asList(1, 3, 21, 10, 8, 11);
        int sum = 0;

        for(int number : numbers){
             if(number > 6 && (number % 2 != 0)){
                 sum += number;
             }
        }

        System.out.println("# 명령형 프로그래밍 사용: " + sum);
    }
}

선언형 프로그래밍

public class DeclarativePrograming {
    public static void main(String[] args){
        // List에 있는 숫자들 중에서 6보다 큰 홀수들의 합계를 구하세요.
        List<Integer> numbers = Arrays.asList(1, 3, 21, 10, 8, 11);

        int sum = numbers.stream()
                .filter(number -> number > 6 && (number % 2 != 0))
                .mapToInt(number -> number)
                .sum();

        System.out.println("# 선언형 프로그래밍 사용: " + sum);
    }
}

Push 방식과 Pull 방식

Reactive programming은 Push 방식과 같다고 볼 수 있습니다.

Push방식 : 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식

  • RTC(Real Time Communication)
  • 소켓 프로그래밍
  • DB Trigger
  • Spring의 ApplicationEvent
  • Angular의 데이터 바인딩
  • 스마트폰의 Push 메시지

Pull방식 : 변경된 데이터가 있는지 요청을 보내 질의하고 변경된 데이터를 가져오는 방식

  • 클라이언트 요청 & 서버 응답 방식의 에플리케이션
  • Java와 같은 절차형 프로그래밍 언어

리액티브 프로그래밍을 위해 알아야 될 것들

  • Observable : 데이터 소스(지속적으로 변경이 가능한 데이터의 집합)
  • 리액티브 연산자(Operators) : 데이터 소스를 처리하는 함수
  • 스케줄러 : 스레드 관리자
  • Subscriber : Observable이 발행하는 데이터를 구독하는 구독자
  • 함수형 프로그래밍 : RxJava에서 제공하는 연산자(Operator) 함수를 사용

코드로 알아보기

주석으로 처리된 부분은 하나씩 해제해보면서 변화를 확인해볼 예정입니다.

public class ToDoSample {
    public static void main(String[] args) throws InterruptedException {
        Observable.just(100, 200, 300, 400, 500)
//                .doOnNext(data -> System.out.println(getThreadName() + " : " + "#doOnNext() : " + data))
//                .subscribeOn(Schedulers.io())
//                .observeOn(Schedulers.computation())
                .filter(number -> number > 300)
                .subscribe(num -> System.out.println(getThreadName() + " : result : " + num));

//        Thread.sleep(500);
    }

    public static String getThreadName(){
        return Thread.currentThread().getName();
    }
}
  1. just함수를 통해 데이터를 발행합니다.
  2. filter함수를 통해 숫자가 300이상인 것만 필터링합니다.
  3. subscirbe함수를 통해 필터링된 데이터만 출력되도록 하였습니다.

위의 코드에서 알 수 있듯이 리엑티브 프로그래밍의 기본적인 규칙은

  • 데이터를 발행하고
  • 데이터를 가공하고
  • 데이터를 구독해서 처리한다

입니다.

위의 코드를 실행하게되면 아래와 같은 결과를 확인할 수 있습니다.

결과를 보시면 메인 스레드를 통해 결과값이 출력된 것을 알 수 있습니다.

두번째로 doOnNext 함수쪽의 라인만 주석해제하고 실행해보겠습니다.
결과는 아래와 같습니다.

쓰레드는 여전히 메인스레드에서 실행이 되고 있습니다.
doOnNext는 각각의 데이터가 발행이 될 때 실행이 됩니다.

세번째로는 subscribeOn 라인을 주석해제 한 후 실행해보겠습니다.

이상하게도 아무것도 출력되지 않고 프로그램이 종료되었습니다.
이유가 뭘까요?
subscribeOn 라인을 주석해제 하기 전까지는 메인스레드에서 실행이되었지만 주석해제 한 후에는 메인스레드가 아닌 다른 스레드에서 실행되게 됩니다. 그러므로 해당 스레드가 실행되기 전에 메인스레드가 종료되게 되면서 프로그램이 종료되는 것입니다.

이를 방지하기 위해서

Thread.sleep(500);

부분을 주석해제 한후 실행해보겠습니다. 아래와 같은 결과를 얻을 수 있습니다.

이제는 메인 스레드가 아닌 RxCachedThreadScheduler-1이라는 스레드에서 실행된 것을 알 수 있습니다.

이렇게 subscribeOn함수를 사용하게 되면 메인스레드가 아닌 다른스레드에서 동작하게 됩니다.

마지막으로 observeOn 라인을 주석해제 한 후 실행해보겠습니다. 아래와 같은 결과가 나왔습니다.

위의 결과를 보시면 아시겠지만 데이터가 발행될 때의 스레드와 발행된 데이터를 구독하고 처리하는 스레드가 다른 것을 볼 수 있습니다.

결론적으로 subscribeOn 함수는 데이터의 발행, 데이터의 흐름을 결정짓는 스레드를 결정하고 observeOn 함수는 발행된 데이터를 가공하고 구독해서 처리하는 스레드를 결정하게 됩니다.

subscribeOn 함수와 observeOn 함수는 스케줄러를 지정해주게 됩니다.

만약 같은 스케줄러를 호출한다면 어떻게 될까요?

public class ToDoSample {
    public static void main(String[] args) throws InterruptedException {
        Observable.just(100, 200, 300, 400, 500)
                .doOnNext(data -> System.out.println(getThreadName() + " : " + "#doOnNext() : " + data))
                .subscribeOn(Schedulers.io()) // 같은 스케줄러
                .observeOn(Schedulers.io()) // 같은 스케줄러
                .filter(number -> number > 300)
                .subscribe(num -> System.out.println(getThreadName() + " : result : " + num));

        Thread.sleep(500);
    }

    public static String getThreadName(){
        return Thread.currentThread().getName();
    }
}

위와 같이 실행했을 때 해당 스케줄러에 있는 쓰레드의 풀에서 쓰레드를 꺼내와 처리를 한다고 생각하였고 결론은 아래와 같이 생각한 것과 동일했습니다.

결론

꼭 기억해야 할 것은

기본적으로 리액티브 프로그래밍은
데이터를 발행하고
데이터를 가공하고
데이터를 구독해서 처리한다는
기본적인 동작흐름을 가지고 있습니다.

리액티브 프로그래밍에 대해 간단히 알아보았습니다. 상당히 어렵다는 느낌이 강한 개념입니다. 하지만 열심히 공부해서 실무에도 빨리 적용하고 사용할 수 있도록 해야겠습니다!

profile
항상 부족하다 생각하며 발전하겠습니다.

0개의 댓글