2. RxJava를 사용하는데 필요한 배경지식

안석주·2021년 11월 25일
0

RxJava

목록 보기
4/10

서론

이번 장에서는 RxJava를 사용하는데 필요한 기본 지식인 람다식과 비동기 처리에 대해서 알아보겠습니다!
그런데 책은 RxJava이지만 저는 Kotlin을 이용해 실습하기 때문에, 이전에 작성한 포스트에서 코틀린 람다식에 대해서 보시길 권장드리며, 여기서는 자바에의 람다식과 비동기 처리에 대해서만 다루겠습니다.

또한 이번 챕터는 RxJava보다는 RxJava를 사용하는데 있어 알아두면 좋은 "기본" 이기 때문에 RxJava만을 공부하고 싶다면, 넘어가도 됩니다!!!

람다식 읽어보기 (확장함수와 람다식)

람다식

먼저 람다식이란, 함수형 인터페이스를 구현하기 위해서 자바 8에서 도입된 기능으로 람다식을 이용하면 복잡하고 긴 코드를 한줄로 편리하게 사용할 수 있기 때문에, RxJava 뿐 아니라 일반적인 상황에서도 큰 도움을 줍니다.

그렇다면 함수형 인터페이스란 어떤 것일까요??

함수형 인터페이스

함수형 인터페이스란, 인터페이스 안에 구현해야하는 메소드가 단 한개인 경우를 말합니다!

public interface MyFunction<T, U>{
    U apply(T data)
}

람다식은 함수형 인터페이스를 구현함에 있어 반드시 필요한 정보만(최소 정보만)으로 구현할 수 있게 도와줍니다! 이는 불필요한 정보를 다 제거하고, 반드시 필요한 메소드의 인자와 내용만을 포함하면 됩니다.

이를 쉽게 이해해보면 (인자) -> {실행문} 으로 볼 수 있습니다!

이것을 보통의 익명 클래스로 작성한다면 다음과 같습니다!

new 인터페이스 이름() {
    반환 타입 메소드 이름 ( 인자 ) {
        실행문
    }
};

여기서 우리는 1. 구현해야하는 메소드가 단 한개뿐이기 때문에 메소드 이름을 제거할 수 있고, 2. 인터페이스 이름 또한 생략할 수 있습니다. 3. 반환 타입 또한 return구문에서 반환하는 값이 반환타입이므로, 정의한 반환 값과 반환하는 객체의 타입이 같다면 생략할 수 있습니다.

이를 통해 우리가 단순히 작성해줄 것은 1. 전달해주어야할 인자, 2. 함수의 내용만 작성해주면 된다는 것을 알 수 있습니다!

아래의 RxJava의 함수형 인터페이스인 BiFunction을 익명클래스와 람다식을 통해 구현한 것을 보고 차이점에 대해서 생각해봅시다!!!!

// 익명 클래스로 구현
BiFunction<Integer, Integer, BigDecimal> function = 
    new BiFunction<Integer, Integer, BigDecimal>() {
        @Override
        public BigDecimal apply(Integer value1, Integer value2){
            return new BigDecimal(value1 + value2);
        }
|;

// 람다식으로 구현
BiFunction<Integer, Integer, BigDecimal> function = 
    (Integer value1, Integer value2) -> { 
        return new BigDecimal(value1 + value2)
    }
};

BiFunction은 2개의 인자를 받아 특정 결과를 반환하는 apply라는 메소드만 있는 함수형 인터페이스 입니다. 이를 위에 설명한 방식에 따라 구현하면 7줄이던 코드가 4줄로 줄어드는 간결함을 볼 수 있습니다.
또한, 인자의 타입이나 실행문에 따라 return과 중괄호를 생략할 수도 있는데, 이는 더욱 짧고 간결한 결과를 보여줍니다!

BiFunction<Integer, Integer, BigDecimal> function = 
    (value1, value2) -> new BigDecimal(value1 + value2)

RxJava의 함수형 인터페이스

방금 설명했던 것처럼, 함수형 인터페이스와 람다식을 잘 이용한다면 인터페이스의 구현이 정말 간편해집니다. 이를 RxJava에서도 이용할 수 있는데, RxJava에서 독자적으로 준비한 함수형 인터페이스를 알아보겠습니다!

  • Function/Predicate : 인자를 전달받아 반환 값을 반환하는 인터페이스
  • BooleanSupplier : 인자 없이 반환 값을 반환하는 인터페이스
  • Action/Consumer : 반환값이 없는 인터페이스
  • Cancellable : Action과 동일하나 실행 의미가 다른 인터페이스

Function / Predicate

이 둘은 구현해야 할 메소드에 인자와 반환 값이 모두 있는 함수형 인터페이스입니다!
다만 Function은 메소드의 반환값에 제한이 없고, Predicate는 메소드의 반환값이 Boolean이어야 한다는 차이점이 있습니다!

이 특징에 따라 Function이라는 이름이 붙은 함수형 인터페이스는 "어떠한 인자를 받아 무언가를 생성하는 처리를 한다"고 유추할 수 있고, Predicate라는 이름이 붙은 함수형 인터페이스는 "인자를 받아 어떠한 결정을 한다"라는 것을 유추할 수 있게 됩니다!!!

또한 함수형 인터페이스 이름에 인자의 개수가 붙어있으니...더 알기 쉽습니다.

함수형 인터페이스반환값메소드
Function<T,R>Rapply(T t)
BiFunction<T1,T2,R>Rapply(T1 t1, T2 t2)
Function3<T1,T2,T3,R>Rapply(T1 t1, T2 t2, T3 t3)
Function4<T1,T2,T3,T4,R>Rapply(T1 t1, T2 t2, T3 t3,T4 t4)
IntFunction<T>Tapply(int i)

이름도 정말 단순해서 외우기 쉽습니다.. 인자가 3개라서 Fucntion3 인자가 4개라서 Function4 .(Function9 까지 있습니다...)

아래의 Predicate까지 보고 넘어가겠습니다!

함수형 인터페이스반환값메소드
Predicate<T>booleantest(T t)
BiPredicate<T1,T2>booleantest(T1 t1, T2 t2)

BooleanSupplier

BooleanSupplier는 인자 없이 boolean을 반환합니다! 일반적으로 Supplier가 붙은 함수형 인터페이스는 인자 없이 다양한 타입을 반환하지만 RxJava에서는 boolean만 반환하는 BooleanSupplier만 제공합니다.

함수형 인터페이스반환값메소드
BooleanSupplierbooleangetAsBoolean()

Action/Consumer

Action과 Consumer는 반환 값이 없는 메소드, 즉 어떤 부가 작용이 발생하는 메소드가 있는 함수형 인터페이스 입니다! Action은 구현해야할 메소드에 인자가 없고, Consumer은 있습니다.

함수형 인터페이스반환값메소드
Action없음run()
Consumer<T>없음accept(T t)
BiConsumer<T1,T2>없음accept(T1 t1, T2 t2)
LongConsumer없음accept(long t)

우리가 보통 사용하는 subscribe{}의 내부를 보면 Consumer로 이루어져있습니다!

(사진 작아서 죄송합니다...)

이를 통해 subscribe 메소드를 이용할 때 람다식을 사용할 수 있으며, 인자를 전달하지만 반환 값은 없이 수행을 할 수 있습니다!

Cancellable

Cancellable은 Action과 마찬가지로 인자도 반환값도 없는 메소드가 있습니다! 그리고 인터페이스의 이름에서 알수 있듯, 어떤 것을 취소할 때 사용됩니다.

함수형 인터페이스반환값메소드
Cancellable없음cancel()

람다식 문법

이제 함수형 인터페이스에 대해서는 확실히 알았으니, 람다식의 문법에 대해서 알아보겠습니다!

(타입 인자, 타입 인자 ...) -> {
    실행문
    return 반환값;
}

인자 부분에는 메소드의 인자 개수만큼 선언해야 하고, 만약 인자가 없다면 ()만 작성해줍니다! 만약 0~9의 랜덤 숫자를 반환하는 함수형 인터페이스를 구현한다면

() -> {
    return (int) (Math.random() * 10);
}

위와 같이 작성할 수 있습니다.

인자가 한개 밖에 없는 경우라면 이미 구현한 인터페이스에 선언돼 있어서 추측이 가능하니 생략할 수도 있습니다. 예를 들어, 인수의 값을 10배로 돌려주는 함수를 구현하면, 아래와 같습니다.

value -> {
    return value * 10;
}

그리고 인자가 1개라면 인자를 둘러싼 괄호도 위와 같이 제거가 가능합니다!
더 간편하게 구현하기 위해 함수의 본문이 return문으로만 끝나는 식 하나라면 괄호와 return을 제거할 수 있습니다. (반환 값이 없을 경우에도 동일합니다!)

value -> value * 10

value -> System.out.println(value)

이처럼 람다식은 코드를 간결하고 직관성있게 바꾸어 줍니다!!!(꼭 알아두세용)

비동기 처리

RxJava를 이용하기 위해서는, 그리고 우리 처럼 안드로이드를 공부하는 사람들은 모두 비동기 처리에 대해서 알아야 합니다! 쉽게 생각해보면 서버에서 API를 이용해 어떠한 데이터를 받아올 때, 비동기로 실행시켜주지 않는다면 데이터를 받아오는 동안 안드로이드는 다른 작업을 수행할 수 없고, 이는 사용자가 앱을 사용하는데 있어 불편함을 느끼게 되며 추가로 UI 쓰레드를 오랫동안 차단할 경우 ANR을 유발하기 때문입니다!

비동기 처리가 뭔데?

비동기 처리란 어떤 작업을 실행하는 동안 해당 처리가 끝나기를 기다리지 않고, 다른 작업을 시작할 수 있는 것을 말합니다! 그런데 비동기 처리를 이해하기 위해서는 쓰레드 입니다.

public static void main(String[] args){
    System.out.println("Hi");
}

위와 같은 간단한 프로그램도 하나의 쓰레드에서 작동합니다.

이러한 쓰레드를 여러개 준비하여, 각각의 쓰레드에서 작업을 수행하도록 하면 비동기 처리를 할 수 있는데, 이처럼 쓰레드를 여러개 준비하여 작업을 처리하는 것을 멀티쓰레딩이라고 합니다.

쓰레드 1개를 가지고 3가지 작업을 처리해야한다고 하면, 1번 작업이 끝나고 2번 작업이 실행되고, 2번 작업이 끝나면 3번 작업이 실행됩니다. 이는 순차적으로 진행됩니다.

이런 싱글 쓰레드 처리를 1개의 논리 프로세서에서 멀티 쓰레드로 바꿔주면, 쓰레드를 계속해서 전환하면서 작업을 처리합니다.

쓰레드 1의 작업1을 실행 하다, 쓰레드 2의 작업2를 실행하고, 쓰레드 2의 작업2를 실행 도중 쓰레드 3의 작업 3을 실행합니다. 이는 싱글 쓰레드와 비슷해보이지만 네트워크 통신이나 DB 검색처럼 작업을 요청한 후에 결과를 받을 때까지 기다려야 할 때 싱글 쓰레드로 실행시 한 작업이 실행중일 때 모든 작업은 멈추지만, 멀티 쓰레드라면 작업 처리를 대기하는 동안 다른 작업을 계속 진행할 수 있습니다. 이처럼 대기하는 프로세스가 있다면 멀티쓰레드가 정답이 될 수 있습니다!

또한 최근의 PC는 1개의 CPU에 여러 개의 논리 프로세서가 있고, 이런 CPU를 여러개 탑재하기 때문에 각 논리 프로세서에서 각각의 쓰레드를 처리함으로써 여러 개의 작업을 동시에 실행할 수 있습니다.

위와 같은 멀티 프로세서 환경에서 싱글 쓰레드만 사용한다면 프로세서를 하나만 이용하는 것과 같아 장점을 살리지 못합니다. 그래서 성능 향상을 위해 멀티 쓰레드를 채용하는 것이 좋은 방법일 수는 있습니다.

그러나 쓰레드 생성 작업 또한 큰 리소스를 사용하기 때문에 모든 상황에서의 답은 아닐 수도 있습니다. 또한 쓰레드의 전환또한 큰 리소스를 소비하기 때문에 잘 이해하고, 사용해야만 합니다!

비동기 처리 시 주의할 점!

자바에서 비동기 처리를 하게되면, 싱글 쓰레드에서 고려하지 않았던 점을 고려해야 합니다.

메모리와 캐시

비동기 처리시 주의할 점중 하나는 클래스 필드가 가리키는 값과 실제 메모리가 가리키는 값이 동일하지 않을 수 있다는 것입니다! 필드가 다루는 값은 메모리에서 캐시된 값으로, 이곳에서 값 참조와 변경을 하고, 적절한 시점에 실제 메모리 값을 변경해줍니다. 그래서 메모리에 반영되기 전에는 캐시 값과 메모리의 값이 다르게 됩니다. 그리고 이 캐시는 쓰레드 별로 존재하기 때문에 특정 쓰레드에서 필드에 접근한 뒤 값을 변경했을 때, 다른 쓰레드에서 해당 필드를 참조하면 쓰레드가 가진 캐시의 이전 값을 참조해 두 값이 다르다는 문제가 발생합니다!!!

싱글 쓰레드에서는 같은 캐시를 바라보기 때문에 문제가 없지만, 멀티 쓰레드 환경에서는 주의해야 합니다.

원자성

비동기 처리에서 주의해야 할 또 한가지는 처리 도중에 다른 작업이 끼어들 가능성이 있는지 고려해야 합니다. 이 일련의 처리가 분할할 수 없게 돼 있는 것을 원자성이라고 합니다.

싱글 쓰레드에서는 상관이 없지만, 멀티 쓰레드에서 비동기 처리를 할 때, 특정 메소드를 실행하는 동안 다른 작업이 실행될 수 있는데, 필드에 접근하면 원자성이 깨질 수 있습니다.

예제를 통해 좀 더 알아보겠습니다. increment() 메소드를 통해 count를 순차적으로 증가시키는 Counter 클래스가 있습니다.

class Counter{
    private volatile int count;
    
    void increment(){
        count ++;
    }
    
    int get(){
        return count;
    }
}

volatile 제한자는 업데이트한 값은 반드시 메모리에 반영돼 다른 쓰레드에서 참조할 때 같은 값을 얻을 수 있게 해줍니다.

아래 예제에서는 increment() 메소드를 10,000번 호출하여 결과값을 확인해보겠습니다!

public class Counter {
    private volatile int count;

    void increment() {
        count++;
    }

    int get() {
        return count;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Counter counter = new Counter();

        Runnable task = () ->{
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        };

        ExecutorService executorService = Executors.newCachedThreadPool();

        Future<Boolean> future1 = executorService.submit(task, true);
        Future<Boolean> future2 = executorService.submit(task, true);

        if(future1.get() && future2.get()){
            System.out.println(counter.get());
        }else{
            System.err.println("failed");
        }

        executorService.shutdown();
    }
}
-> 12813

결과는 12,813이 나왔습니다! 실행마다 다른 값이 나옵니다.
Counter 클래스를 2개의 쓰레드에서 접근해 10,000번씩 수행했으니 20,000이 나오는 것을 예상했으나, 20,000이 나오지 않습니다. 왜그럴까용?

위의 increment() 메소드의 count++ 구문이 보기에는 하나의 작업같지만, 실제로는
1. count의 값을 가져온다
2. 가져온 값에 1을 더한다
3. 계산 결과의 값을 count에 설정한다.

그래서 이 예제에서 실행하는 작업이 count 값을 가져와 결과를 반영하는 사이에 그 값을 가져가서 처리하기 때문에 예상과 다른 값이 나옵니다!
예) 1의 값을 가져와 1을 더해 2가 되고, 2를 덮어쓰는 도중, 다른 쓰레드에서 1의 값을 가져가 똑같이 1을 더한다. 1의 값으로 두번 연산함!

이 처럼 비동기 처리를 할 때는 싱글 쓰레드에서 생각하지 못했던 것들 또한 고려해주어야 합니다. 그렇다면 이 문제를 어떻게 처리해야 할까요??

어떻게 해결할까요?

값이 변하는게 문제라면, 그냥 못 바꾸게하면 됩니다!
즉, 생성 후에 변하지 않는 변수나 객체라면 서로 다른 쓰레드에서 같은 객체와 같은 값을 바라볼 것입니다. 자바에서는 final 제한자를 붙여 재할당을 막을 수 있습니다.

private final Date value;

또한 꼭 변경되어야 한다면, 값을 복사해 복사한 값을 변경하게 하여 외부로부터 영향을 받지 않습니다.

public final class ImmutableObject{
    (중략)
    
    public ImmutableObject changeValue(Data value){
        return new ImmutableObject(value);
    }
}

위의 예제에서는 생성자를 public으로 선언해 외부에서 인스턴스를 생성할 수 있지만, 생성자를 private로 선언한 뒤 static 메소드로 객체를 생성할 수도 있는데, 이렇게 객체를 생성하는 메소드를 정적 팩토리 메소드라고 하며, 호출 때마다 인스턴스를 생성하는 것이 아닌, 싱글톤 패턴으로 해당 클래스의 인스턴스를 하나만 만들고, 계속 재사용할 수도 있습니다!

기본적으로 비동기 처리를 할 때는 상태를 변경할 수 없는 불변객체를 사용해야하지만, 어쩔 수 없이 값을 변경해야한다면 위의 메모리와 캐시, 원자성을 꼭 고려해서 설계해야 합니다.

이전에 사용했던 volatile 생성자는 필드 값으로 캐시된 값이 아닌 최신 메모리 값을 가져오는 것을 보장해줍니다. 그런데 최신 값을 얻을 수만 있을 뿐, 업데이트 할 때는 원자성을 보장하지 못해서 앞의 Counter 클래스 처럼 해당 필드에 접근해 이전 정보로 데이터를 업데이트하면, 의도하지 않은 최종 결과를 얻게됩니다.

그러므로 volatile이 붙은 변수에 업데이트할 때는 하나의 특정 쓰레드로만 업데이트하고, 그 외 쓰레드는 참조만하게하여 업데이트 하는 쓰레드, 참조하는 쓰레드를 분리하게 된다면 올바른 값을 얻을 수 있습니다!

atomic 패키지

그렇다면, 원자성 있게 count++의 3 동작을 하나로 묶고, 이 처리 과정중에서 외부에서 접근하지 못하게 하면 되지 않을까요? 그리고 count++로 1이 증가하는 처리가 시작되고, 끝날 때 까지 외부로부터 접근을 제한하면 되지 않을까요? 네 됩니다!!! 이렇게 하면 멀티 쓰레드에서도 같은 결과를 얻을 수 있습니다. 이를 지원하는 패키지가 자바에 존재하는데, java.util.concurrent.atomic패키지 입니다.

앞선 예제에 Counter를 AtomicCounter로 바꿔보겠습니다!

public class AtomicCounter {
    private final AtomicInteger count = new AtomicInteger(0);

    void increment() {
        count.incrementAndGet();
    }

    int get() {
        return count.get();
    }
}

=> > Task :AtomicCounter.main()
20000

고맙게도... 20,000이 나와줬습니다!!!! (너무 고마워!!!!)

하지만, atomic 패키지의 클래스는 자신의 변경 처리에만 원자성이 있습니다. 이전의 예제는 필드가 한개뿐이지만, 여러 필드를 가진 클래스 일때, 한 번의 처리로 변경한다면 이를 처리하는 동안 다른 쓰레드가 해당 객체에 접근하게 되면 처리 중인 상태의 객체에 접근하게 됩니다.

public class Point {
    private final AtomicInteger x = new AtomicInteger(0);

    private final AtomicInteger y = new AtomicInteger(0);

    void rightUp() {
        x.incrementAndGet();
        y.incrementAndGet();
    }

    int getX() {
        return x.get();
    }

    int getY() {
        return y.get();
    }
}

어떤 쓰레드가 이 클래스의 rightUp 메소드를 호출해 x 값만 변경한 시점에 다른 쓰레드가 이 객체에 접근해 x와 y의 값을 가져간다면 변경된 x값과 변경되지 않은 y값을 가져가게 됩니다. 이런 현상을 방지하기 위해선 rightUp 메소드를 실행하는 동안 다른 쓰레드의 접근을 막아야 합니다.

Synchronized 블록

synchronized 블록은 자신의 쓰레드가 Synchronized 블록을 처리하는 중에는 다른 쓰레드에서 해당 블록을 접근하는 것을 막습니다. 그래서 블록 내에서 실행하는 처리가 완료될 때까지 다른 쓰레드는 대기합니다. 이처럼 다른 쓰레드의 접근을 막는 것을 배타 제어라고 하며, 이 배타 제어를 적절히 사용하면 필드가 여러개여도 일관성 유지가 가능합니다!

public class SynchronizedPoint {
    private final Object lock = new Object();

    private int x;

    private int y;

    void rightUp(){
        synchronized (lock){
            x++;
            y++;
        }
    }

    int getX(){
        synchronized (lock){
            return x;
        }
    }

    int getY(){
        synchronized (lock){
            return y;
        }
    }
}

이렇게 작성해주면, rightUp() 메소드가 실행되는 동안 다른 쓰레드가 getX() 메소드로 x값을 가져오려고 해도, lock 소유권이 없기 때문에, rightUp 메소드 처리가 끝날 때까지 기다려야합니다!
또한 AtomicInteger에서 일반 int로 바꿨는데, synchronized 블록에서 나올 때 값이 메모리에 업데이트돼 다음에 다른 쓰레드가 synchronized 블록에 들어갈 때 그 값이 메모리에 반영되기 때문입니다. 그 때문에 int라도 메모리와 캐시에서 차이가 없습니다!

또한 synchronized는 같은 락 객체를 가진 synchronized 블록이며 해당 제어가 적용되지 않은 곳에서는 아무런 제한 없이 접근이 가능합니다!

추가로 메소드에도 synchronized 키워드를 붙일 수 있습니다! 이런 경우 메소드의 처음부터 끝까지 모두가 배타 제어의 대상입니다.

synchronized void rightUp() {
        x++;
        y++;
    }

    synchronized int getX() {
        return x;
    }

    synchronized int getY() {
        return y;
    }

이러한 synchronized 키워드는 자체로 무거운 처리기 때문에 남용할 시 프로그램의 성능에 문제를 줄 수 있습니다. 또한 잘못 설계하여 교착 상태에 들게되면 응용프로그램이 멈춰버리니 조심해서 사용해야 합니다!

이제 3장으로 넘어갑니다!!!!!

profile
뜻을 알고 코딩하기

0개의 댓글