github code : https://github.com/BaekGeunYoung/rxjs-practice

Reactive Programming 이란?

리액티브(Reactive, 반응형) 프로그래밍은 비동기 데이터 스트림(Asynchronous data stream)에 기반을 둔 프로그래밍 패러다임이다. 데이터 스트림이란 연속적인 데이터의 흐름을 말하며 리액티브 프로그래밍은 기본적으로 모든 것을 데이터 스트림으로 본다.

기존의 프로그래밍 방식은 배열과 함수 반환값과 같은 동기 데이터를 처리하는 방식과 Ajax 통신 결과, 사용자 이벤트와 같은 비동기 데이터 처리 방식이 제각각이지만, 리액티브 프로그래밍은 동기/비동기와 관계없이 데이터를 생산하는 것이라면 무엇이든 시간축을 따라 연속적으로 흐르는 데이터 스트림으로 처리한다. 리액티브 프로그래밍은 다양한 데이터를 데이터 스트림이라는 하나의 일관된 형식으로 만들고, 이 데이터 스트림을 구독(subscribe)하여 데이터 스트림의 상태 변화에 반응하는 방식으로 동작하는 애플리케이션을 작성하는 것을 말한다.

Reactive X

Reactive X (약칭 Rx) 란 위에서 설명한 Reactive Programming Paradigm을 Observable 과 Observer 및 Operator 등의 개념을 통해 실현할 수 있도록 해주는 라이브러리다. Rx는 특히 비동기 데이터를 처리하는 데 있어서 장점을 갖는데, 기존의 프로미스 기반의 비동기 처리 방식은 다음과 같은 문제점을 갖고 있다.

  1. 연속성을 갖는 데이터를 처리할 수 없다.
  2. 한번 보낸 요청을 취소할 수 없다.

Rx는 이러한 한계를 보완할 수 있고, 여러 Operator들을 이용한 코드 작성은 최근에 각광을 받고 있는 functional programming paradigm에 매우 잘 부합한다고 생각한다. Reactive X에는 RxJS, RxJava, RxSwift 등 다양한 언어를 위한 implementation이 존재하는데, 이 글에서는 이 중 RxJS를 소개하고, 공부해 보려고 한다.

Observable & Observer

앞서 말했듯이 Reactive Programming(약칭 RP)이란 하나의 커다란 데이터의 흐름이 있고, 이 데이터의 변화에 어떻게 반응하는 지를 프로그래밍 하는 것을 뜻한다. RP에는 데이터를 생산하고 보내는 대상과 데이터를 수신하여 가공하는 대상이 존재하는데, 전자에 해당하는 것이 Observable, 후자에 해당하는 것이 Observer라고 생각하면 될 것 같다. RP 패러다임 내에서는 데이터를 생산하는 것이라면 그것이 무엇이든지 Observable이 될 수 있다. 일반적인 배열이나 객체부터 element의 event나 비동기 promise 등을 모두 Observable로 생성할 수 있고, Rx에서 제공하는 다양한 함수들은 굉장히 쉽게, 그리고 함수형 프로그래밍 방식으로 observable을 생산하도록 돕는다.Observer의 개념은 이후 Subscribe의 개념과 함께 설명하는게 좋을 것 같다.

Subscribe

subscribe란 문자 그대로 구독을 의미하는데, "observer가 observable을 관찰하는 행위" 정도로 받아들이면 될 것 같다. observable을 구독하고 있는 observer는 observable에서 notification을 방출하는 지의 여부를 계속 관찰하고 있다가, notification이 방출될 때마다 observer function을 수행함으로써 방출된 데이터를 수신하고, 가공하는 작업을 거친다. 기본적으로 Rx의 observable들은 subscribe가 되어야한 observer가 데이터를 수신할 수 있는 cold observable이다.

Subject

Subject는 특수한 형태의 observable로, observable과 observer의 기능을 동시에 하는 무언가라고 생각하면 될 것 같다. subject는 .next 함수를 통해 notification을 방출할 수 있고, 동시에 observer function에 구독해 notification을 수신할 수도 있다. Observable과의 두드러진 차이는 subject는 multicast방식으로 여러개의 observer에 동시에 구독할 수 있는데, observable은 unicast 방식으로 한 번에 하나의 observer에만 구독할 수 있다.

Operator

Operator는 observable을 파라미터로 받아서 다른 형태의 observable로 변환해 리턴하는 함수이다. Rx는 굉장히 다양한 operator function들을 지원하고 있고, rx기반 코드를 가독성 있고 아름답게 만드는 일등 공신이라고 생각한다. 데이터 스트림의 개념과 그 위에서 작동하는 operator의 개념을 모식적으로 나타내면 아래와 같다.

<img src=https://s3.ap-northeast-2.amazonaws.com/s3.geunyoung.image/blog/image/rxjs_stream.png width="100%" />

이런식으로 흘러들어오는 데이터 스트림을 원하는 형태로 조작할 수 있다. 그리고 이 operator들의 사용은 functional programming 방식과 굉장히 잘 부합한다.

Practice

어떤 것을 주제로 RxJS를 연습해볼까, 꽤 오랜 시간을 고민했던 것 같다. 그도 그럴 것이 reactive programming이라는 새로운 패러다임이 굉장히 흥미로운건 사실이지만, 기존의 비동기 처리 방식에 큰 불편함을 느끼지 못하고 있었고 rxJS가 가진 이점을 잘 활용하기 위해서는 꽤나 복잡한 상황에서의 비동기 처리 예시를 생각해내야 했기 때문이다. 다행히 양질의 글들을 읽어봄으로써 적당한 예시 상황을 생각해볼 수 있었고, 그 구체적인 내용은 아래와 같다.

  • 검색어에 따라 stack overflow 검색결과를 브라우저에 띄운다. (stack exchange API 이용)
  • 불필요한 요청을 방지하기 위해, 1초동안 검색어에 변화가 없을 경우에만 요청을 보낸다.
  • 이전에 보냈던 요청의 응답이 그 이후 요청의 응답보다 늦게 도착할 경우, 이를 무시한다.
  • 중복되는 검색어가 연속으로 들어왔을 때 요청을 한 번만 보낸다.
  • 요청이 실패할 경우 자동으로 최대 3번까지 요청을 재시도한다.

이와 같은 상황은 애플리케이션이 UI/UX 친화적으로 잘 작동하기 위해서 당연히 지켜져야하는, 어떻게 보면 굉장히 사소한 요구사항들이다. 하지만 이를 promise 기반의 비동기 처리 방식으로 코드를 작성하려면 꽤나 많은 생각을 해야하며, 절차적 프로그래밍 방식은 가독성을 심각하게 떨어뜨릴 것이다. 우리가 배운 RxJS의 operator는 굉장히 쉽고 읽기 좋게 이 모든 기능을 구현할 수 있다.

sbj.pipe(filter((value, index) => value !== ''))
            .pipe(debounceTime(1000))
            .pipe(distinctUntilChanged())
            .pipe(switchMap((value:any, index : number) => {
                return ajax.get(`https://api.stackexchange.com/2.2/search?order=desc&sort=activity&intitle=${value}&site=stackoverflow`)
                    .pipe(map(r => r.response.items))
                    .pipe(retry(3))
                }
            ))
            .subscribe(
                (value: any) => {
                    setResult(value);
                },
                (err : any) => setError('error!'),
        );

위 코드에서 살짝 아쉬운 부분은 observable들을 operator를 통해 조작하고 싶을 때, 매번 pipe함수를 사용해야 한다는 것이다. operator함수들이 observable class의 member function이 아니기 때문에 observable.operator 의 형태로 코드를 작성할 수 없었다. (내가 봤던 문서는 그렇게 하던데..) 아무튼 코드를 간략히 설명하자면,

  1. subject로부터 들어오는 notification이 빈 문자열이면 filter 함수를 통해 거른다 ->
  2. debounceTime 함수를 이용해 1초간 비동기 요청을 미룬다 ->
  3. switchMap 함수를 통해 일반 string으로 들어오는 stream을 비동기 요청 리턴값의 stream으로 바꾼다. ->
    3 - 1. map 함수를 이용해 response 객체에서 원하는 item 객체만 가져온다.
    3 - 2. 요청이 실패할 경우, retry함수를 이용해 최대 3번까지 재시도한다.
  4. 이를 구독하여 컴포넌트 내의 state를 바꾼다.

위와 같이 rx의 다양한 operator들을 이용하면 복잡한 연산과정 없이 선언적으로 굉장히 깔끔하게 코드를 작성할 수 있다. 처음 RxJS를 접한다면 이게 대체 무슨 코드인지 감조차 안 올 것이다. (나도 그랬다..) 하지만 Rx에 대한 이해가 어느정도 생기고 코드 작성에 나름 익숙해진다면 기존의 비동기 처리 방식보다 훨씬 알아보기 쉬운 코드가 된다. 나도 여러 문서들을 열심히 읽어보고 코드를 몇 번 작성한 후에야 비로소 Rx의 매력을 알게되었다. 앞으로 이러한 프로그래밍 방식을 자주 이용해 볼 생각이다.