이 글은 인프런의 Kevin의 알기 쉬운 RXJava 1부를 참고합니다.
위키피디아에서는 리액티브 프로그래밍에 대해 다음과 같이 정의 합니다.
In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
(변화의 전파와 데이터 흐름과 관련된 선언적 프로그래밍 패러다임입니다.)
명령형 프로그래밍과 선언형 프로그래밍의 차이를 코드의 예시로 보여드리겠습니다.
public class ImperativeProgramming {
public static void main(String[] args){
// List에 있는 숫자들 중에서 6보다 큰 홀수들의 합계를 구하세요.
List<Integer> numbers = Arrays.asList(1, 3, 21, 10, 8, 11);
int sum = 0;
for(int number : numbers){
if(number > 6 && (number % 2 != 0)){
sum += number;
}
}
System.out.println("# 명령형 프로그래밍 사용: " + sum);
}
}
public class DeclarativePrograming {
public static void main(String[] args){
// List에 있는 숫자들 중에서 6보다 큰 홀수들의 합계를 구하세요.
List<Integer> numbers = Arrays.asList(1, 3, 21, 10, 8, 11);
int sum = numbers.stream()
.filter(number -> number > 6 && (number % 2 != 0))
.mapToInt(number -> number)
.sum();
System.out.println("# 선언형 프로그래밍 사용: " + sum);
}
}
Reactive programming은 Push 방식과 같다고 볼 수 있습니다.
Push방식 : 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식
Pull방식 : 변경된 데이터가 있는지 요청을 보내 질의하고 변경된 데이터를 가져오는 방식
주석으로 처리된 부분은 하나씩 해제해보면서 변화를 확인해볼 예정입니다.
public class ToDoSample {
public static void main(String[] args) throws InterruptedException {
Observable.just(100, 200, 300, 400, 500)
// .doOnNext(data -> System.out.println(getThreadName() + " : " + "#doOnNext() : " + data))
// .subscribeOn(Schedulers.io())
// .observeOn(Schedulers.computation())
.filter(number -> number > 300)
.subscribe(num -> System.out.println(getThreadName() + " : result : " + num));
// Thread.sleep(500);
}
public static String getThreadName(){
return Thread.currentThread().getName();
}
}
just
함수를 통해 데이터를 발행합니다.filter
함수를 통해 숫자가 300이상인 것만 필터링합니다.subscirbe
함수를 통해 필터링된 데이터만 출력되도록 하였습니다.위의 코드에서 알 수 있듯이 리엑티브 프로그래밍의 기본적인 규칙은
입니다.
위의 코드를 실행하게되면 아래와 같은 결과를 확인할 수 있습니다.
결과를 보시면 메인 스레드를 통해 결과값이 출력된 것을 알 수 있습니다.
두번째로 doOnNext
함수쪽의 라인만 주석해제하고 실행해보겠습니다.
결과는 아래와 같습니다.
쓰레드는 여전히 메인스레드에서 실행이 되고 있습니다.
doOnNext
는 각각의 데이터가 발행이 될 때 실행이 됩니다.
세번째로는 subscribeOn
라인을 주석해제 한 후 실행해보겠습니다.
이상하게도 아무것도 출력되지 않고 프로그램이 종료되었습니다.
이유가 뭘까요?
subscribeOn
라인을 주석해제 하기 전까지는 메인스레드에서 실행이되었지만 주석해제 한 후에는 메인스레드가 아닌 다른 스레드에서 실행되게 됩니다. 그러므로 해당 스레드가 실행되기 전에 메인스레드가 종료되게 되면서 프로그램이 종료되는 것입니다.
이를 방지하기 위해서
Thread.sleep(500);
부분을 주석해제 한후 실행해보겠습니다. 아래와 같은 결과를 얻을 수 있습니다.
이제는 메인 스레드가 아닌 RxCachedThreadScheduler-1이라는 스레드에서 실행된 것을 알 수 있습니다.
이렇게 subscribeOn
함수를 사용하게 되면 메인스레드가 아닌 다른스레드에서 동작하게 됩니다.
마지막으로 observeOn
라인을 주석해제 한 후 실행해보겠습니다. 아래와 같은 결과가 나왔습니다.
위의 결과를 보시면 아시겠지만 데이터가 발행될 때의 스레드와 발행된 데이터를 구독하고 처리하는 스레드가 다른 것을 볼 수 있습니다.
결론적으로 subscribeOn
함수는 데이터의 발행, 데이터의 흐름을 결정짓는 스레드를 결정하고 observeOn
함수는 발행된 데이터를 가공하고 구독해서 처리하는 스레드를 결정하게 됩니다.
subscribeOn
함수와 observeOn
함수는 스케줄러를 지정해주게 됩니다.
public class ToDoSample {
public static void main(String[] args) throws InterruptedException {
Observable.just(100, 200, 300, 400, 500)
.doOnNext(data -> System.out.println(getThreadName() + " : " + "#doOnNext() : " + data))
.subscribeOn(Schedulers.io()) // 같은 스케줄러
.observeOn(Schedulers.io()) // 같은 스케줄러
.filter(number -> number > 300)
.subscribe(num -> System.out.println(getThreadName() + " : result : " + num));
Thread.sleep(500);
}
public static String getThreadName(){
return Thread.currentThread().getName();
}
}
위와 같이 실행했을 때 해당 스케줄러에 있는 쓰레드의 풀에서 쓰레드를 꺼내와 처리를 한다고 생각하였고 결론은 아래와 같이 생각한 것과 동일했습니다.
꼭 기억해야 할 것은
기본적으로 리액티브 프로그래밍은
데이터를 발행하고
데이터를 가공하고
데이터를 구독해서 처리한다는
기본적인 동작흐름을 가지고 있습니다.
리액티브 프로그래밍에 대해 간단히 알아보았습니다. 상당히 어렵다는 느낌이 강한 개념입니다. 하지만 열심히 공부해서 실무에도 빨리 적용하고 사용할 수 있도록 해야겠습니다!