RxJava와 Room

Lee Jeong-hwan·2025년 4월 3일

해당 글은 오늘 이슈를 수정하며 느낀 점을 정리한 개인 기록용 글입니다.

RxJava와 Room의 환장 조합.

신입 때 진행한 프로젝트에서 Handler와 Looper 남용으로 UI Thread에서 IO작업이 실행되어 앱이 강제 종료되었다는 이슈를 받았다.

처음 이슈를 전달 받았을 때는 "이정도면 금방 해결 할 수 있지" 했었는데 생각보다 고쳐야할 부분이 많았다.

그래도 다행인 점은 지금은 RxJava를 학습하고 어느정도 활용할 수 있을 때 적절한 문제가 생겼다는 점!


도룡뇽? 미역? RxJava!

RxJava는 함수형 프로그래밍과 비동기에 초점을 둔 라이브러리이다.

예를 들어 아래 코드를 확인해보자.

public Observable<String> getObservableExample() {
        return Observable.create(emitter -> {
            emitter.onNext("hello world");
        });
    }

위 코드는 RxJava에서 Observable<'T'> 를 만들어 "hello world"를 발행하는 코드이다.

발행? Observable?

간단하게 정리해보자면 RxJava는 발행자와 소비자가 존재한다.
발행자는 데이터를 다른 소비자들에게 전달하는 역할을한다.
소비자는 발행자의 데이터를 받아 사용하는 역할을 한다.

RxJava에서는 발행자를 "Observable<'T'>" 로 칭하고 소비자를 "Observer<'T'>" 라고한다.

코드를 간략하게 분석해보자.

Observable<'String'> : String 타입의 자료를 발행하는 발행자
Observable.create : 발행자를 생성하기 위한 create
onNext : 데이터를 소비자에게 발행하는 함수

정리 해보자면 위 코드는 String 자료를 발행하는 발행자이다.


그럼 소비는?

자 그러면 소비는 어떻게 할까?
아래 코드를 살펴보자.

getObservableExample().subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new ResourceObserver<String>() {
                    @Override
                    public void onNext(String s) {
                        Toast.makeText(getApplicationContext(), s, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        Toast.makeText(getApplicationContext(), "완료", Toast.LENGTH_SHORT).show();
                    }
                });

getObservableExample() 함수를 Call하여 Observable<'String'>을 return 받아 Observer를 사용해 데이터를 이용하는 구조이다.

코드를 분석해보자.

subscribeOn : 발행자가 작업을 진행할 쓰레드
observeOn : 소비자가 작업을 진행할 쓰레드
subscribe : 데이터를 사용하기 위한 구독 함수
onNext(String s) : 데이터가 발행되면 작동할 코드
onError(Throwable e) : 데이터 구독 중 Error이 발생하면 처리할 코드
onComplete() : 데이터 발행이 다 끝나 더이상 받을 데이터가 없을 떄 호출

즉, 위 코드는 String s를 발행자로 부터 전달 받아 Toast Message를 띄우는 코드이다. 그 외에도

Schedulers.io() : IO 작업을 진행하는 쓰레드로 갯수 제한이 없음. 작업이 실행되면 무한히 생성됨
Schedulers.computation() : io와 다르게 생성할 수 있는 최대 Thread가 정해져있음.
Schedulers.single() : 단일 Thread로 작업을 진행하며, 작업이 들어오는 순서대로 진행함.
AndroidSchedulers.mainThread() : Android UI 작업을 진행하는 MainThread


나 vs 나

과거의 신입때 나와 지금의 내가 싸우고 있는 기분이었다.
고쳐야할 부분이 너무 많이 보였고 선택지가 존재 했다.

  1. 무한한 Handler와 Looper 납두고 이슈만 고치기.
  2. RxJava로 통으로 교체 하기.

분명 1번을 선택하면 지금 당장은 기분도 좋고 몇 분 투자하면 고칠 수 있는 쉬운 이슈였지만, 차후에 내가 아닌 다른 사람이 내 코드에 작업을 한다면 분명 날 원망할 것 이다.

그래서 나는 2번을 택했다!
오늘은 회고록이기 때문에 Room / RxJava에 대한 내용은 깊게 다루지 않겠다.

핵심 문제는 "Room DB에서 getAll() 와 같이 데이터를 가져오는 함수에 Observable 타입으로 받고, insert()를 진행하면 무한 루프에 빠진다!" 이다.

간단하게 디비를 하나 만들어보자.

DatabaseClient.java
@Database(entities = {Phone.class}, version = 1)
public abstract class DatabaseClient extends RoomDatabase {
    public abstract PhoneDao phoneDao();
}
Phone.java
@Entity
public class Phone {
    @PrimaryKey
    @NonNull
    public String name = "";
    public String phoneNumber;

    public Phone() {}

    public Phone(String name, String phoneNumber) {
        this.name = name;
        this.phoneNumber = phoneNumber;
    }

    @Override
    public String toString() {
        return "Phone{" +
                "name='" + name + '\'' +
                ", phoneNumber='" + phoneNumber + '\'' +
                '}';
    }
}
PhoneDao
@Dao
public interface PhoneDao {
    @Query("SELECT * FROM Phone")
    Observable<List<Phone>> getAll();

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    Completable insertAll(List<Phone> phones);
}

간단하게 설명 해보자면 DatabaseClinet를 만들어 Dao에 접근해 getAll / insertAll 을 시행할 수 있는 구조이다.
해당 글에서는 인스턴스를 싱글톤으로 만드는 과정 등은 생략했다.


MainActivity.java

DatabaseClient mDatabaseClient;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mDatabaseClient = Room.databaseBuilder(getApplicationContext(), DatabaseClient.class, "PhoneDB").build();
        
        insertDB();
        callPhoneDB();
    }
    
    public void insertDB() {
        mDatabaseClient.phoneDao().insertAll(Arrays.asList(
                new Phone("Jone","010-1111-2222"),
                new Phone("hone","010-2222-3333"),
                new Phone("youn","010-4444-5555"),
                new Phone("son","010-6666-7777"),
                new Phone("hoj","010-8888-9999"),
                new Phone("hiz","010-1212-3434"))
        )
        .subscribeOn(Schedulers.io())
        .doOnComplete(() -> Log.d("태그", "삽입 완료."))
        .subscribe();
    }

    public void callPhoneDB() {
        mDatabaseClient.phoneDao().getAll()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DisposableObserver<List<Phone>>() {
                    @Override
                    public void onNext(List<Phone> phones) {
                        phones.forEach(item -> Log.d("태그", item.toString()));

                        mDatabaseClient.phoneDao()
                                .insertAll(phones)
                                .subscribeOn(Schedulers.io())
                                .doOnComplete(() -> Log.d("태그", "완료"))
                                .subscribe();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
        });
    }

위 코드를 봤을 때 Completable로 성공 실행 여부를 전달 하는 insertAll 함수는 1번 작동하고, getAll 또한, List를 가져오면 더 이상 가져올 정보가 없으니 onComplete()가 호출되면서 자동으로 종료될 것 같은 구조이다.

그런데 여기에는 치명적인 함정이 하나있다.

Room에서 Observable과 같은 형태로 값을 return하기 위해서는 room : room-rxjava 를 implementation 해서 사용하는데

데이터를 저렇게 구독하면 "조회 -> 삽입 -> 조회 -> 삽입" 이 무한 반복된다.
즉, 데이터가 중복되어 REPLACE되어도 변경이라 간주하고 Observable로 데이터를 구독중이기에 계속 함수가 호출된다.

나는 저 부분을 인지하지 못하고 2시간을 날려버렸다.
구독하고 사용하는 입장에서 치명적인 실수 이지 않았나싶다.


그래서 해결 못 하는가?

사실 해결 방법은 아주 간단하다.

  1. Observable이 아닌 Single 사용하기.
  2. REPLACE가 아닌 IGNORE 사용하기.

위 구조를 유지하고 싶다면 우리는 Dao 쪽과 지금까지 implement 했던 method 일부분을 수정해주면 된다.

1번 선택지 부터 확인해보자.

@Dao
public interface PhoneDao {
    @Query("SELECT * FROM Phone")
    Single<List<Phone>> getAll();

    @Insert(onConflict = OnConflictStrategy.IGNORE)
    Completable insertAll(List<Phone> phones);
}

먼저 문제의 발생 근원지인 Observable을 Single로 변경한다.
이후 MainActivity안에 있는 subscribe method를 아래와 같이 변경한다.

public void callPhoneDB() {
        mDatabaseClient.phoneDao().getAll()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<List<Phone>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(List<Phone> phones) {
                        phones.forEach(item -> Log.d("태그", item.toString()));

                        mDatabaseClient.phoneDao()
                                .insertAll(phones)
                                .subscribeOn(Schedulers.io())
                                .doOnComplete(() -> Log.d("태그", "완료"))
                                .subscribe();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
        });
    }

차이는 onNext()나 onComplete()가 아닌 onSuccess() 등의 차이가 보인다.

Single은 말그대로 데이터 발행을 1회 진행하고 이후에는 자동으로 종료된다.
그렇기 때문에 DB가 변경 되더라도 무한 루프에 걸리는 문제가 발생하지 않는 것이다.

IGNORE 사용하기.
@Insert(onConflict = OnConflictStrategy.IGNORE)
    Completable insertAll(List<Phone> phones);

이번에는 getAll 쪽이 아닌 삽입 부분에서 어노테이션을 통해 문제를 해결하는 것이다. REPLACE와 달리 IGNORE를 사용하면 DB에 값이 변경 되더라도, 새로운 데이터로 인식하지 않아 데이터를 발행하지 않는다.


결과

무한 반복되지 않고 삽입 완료 부터 완료까지 딱 1번 이뤄졌다.


오늘은 위와 같은 내용을 회고 해보며, 앞으로도 공부할게 많다는 점을 많이 느꼈던 하루였다.

긴 글 읽어주셔 감사합니다.


Refrence


Image


profile
안녕하세요😁 안드로이드 개발자 이정환 입니다~⭐️

0개의 댓글