Rxjs 와 uni/multicast

motiveko·2021년 12월 14일
0
post-thumbnail

Rxjs를 사용하면서 많은것들이 Observable stream으로 추상화되다보니 동작이 눈에 보이지 않아 놓치는 문제점들이 많은 것 같다.

🥨 Case

Angular 프로젝트에서 아래 내용을 개발하였다.

  • 데이터 테이블에서 row를 클릭시 상세 정보 Dialog창이 나온다.
  • Dialog 컴포넌트의 생성자에서 해당 row의 정보를 Ngrx의 store에서 select하고 Observable 타입 변수 row$에 할당한다.
  • row의 Observable에 pipe를 연결해, defined된 값이 넘어오면 추가 정보(history)를 HistoryService를 통해 서버에 요청한다. 그리고 이를 Observable 타입 변수 history$에 할당한다.
  • Dialog의 Template에서 row$, history$async 파이프를 이용해 구독하고 정보를 화면에 보여준다.
  • 추가적으로, Dialog의 scrollTop값을 query param에 저장하고 있다가, 랜더링이 완료되면 해당 스크롤값을 템플릿에 설정해주는데, 이는 컴포넌트에서 ngAfterViewInit lifecycle 메서드에서 history$를 구독하고 defined된 값이 넘어오면 실행한다.

컴포넌트와 템플릿의 관련 코드는 대충 아래와 같다.

// 컴포넌트
@Component({
  selector: 'app-detail-dialog',
})
export class DetailDialogComponent implements AfterViewInit {
  top: number;
  row$: Observable<Row>;
  history$: Observable<History[]>;
  
  constructor (
    private store: Store<AppState>,
    private service: HistoryService,
    private activatedRoute: ActivatedRoute
  ) {

    this.top = this.activatedRoute.snapshot.queryParams.top as number;
    this.row$ = this.store.pipe(
      takeUntil(this.unsubscriber),
      select(selectSelectedRow),
      filter(isDefined)
    );
    this.history$ = this.row$.pipe(
      switchMap(row => this.service.findHistory(row.uid))
    )
  }
  
  ngAfterViewInit(): void {
    // Dialog의 content요소, scrollTop을 가진다.
    const matDialogContent = document.querySelector('mat-dialog-content');

    if (!matDialogContent) {
      throw new Error('mat-dialog-content 랜더링중 문제가 발생하였습니다.');
    }
    this.history$.pipe(
      filter(history => !!history)
    )
    .subscribe(
      () => matDialogContent.scrollTop = this.top;
    )
  }
}
<!--템플릿-->

<!--...-->
<mat-dialog-content data-testid="dialogContent">
  <div data-testid="innerContent">
    <h4> Block </h4>
    <ngx-json-viewer *ngIf="block$ | async" [json]="block$ | async" data-testid="block"></ngx-json-viewer>
    <h4> Block History</h4>
    <ng-container *ngIf="(history$ | async) as history else loading">
      <ngx-json-viewer *ngIf="history" [json]="(history)"></ngx-json-viewer>
    </ng-container>
    <ng-template #loading>
      <h2>Loading.....</h2>
    </ng-template>
  </div>
</mat-dialog-content>
 <!--...-->

❗️ 문제

화면에는 문제없이 정보가 정상적으로 출력되고 있었다. 그래서 잊고 있었는데, http요청 캐싱 기능을 구현하던 중 history 요청 API가 두번씩 호출되는걸 인지하였다.
row$history$tap 연산자로 넘어가는 값을 찍어보면 undefined값을 제외하면 row$4번, history$2번 찍히는것을 확인할 수 있다.


❗️❗️ 원인

tap이 여러번 찍히는건 row$history$unicast로 동작했기 때문이다. Template에서 row$history$ 를 각각 2/1번씩 구독하고 Component에서 history$를 1번 구독했고, history$row$를 기반으로 만든 옵저버블이므로 총 4/2번의 tap(side effect)이 호출된 것이다.

row$의 소스 옵저버블인 Ngrx store에서 selectstate는 상태 변화가 없어도 여러 컴포넌트/서비스 들에서 구독하면 값이 나왔기 때문에 당연히 multicast로 동작한다고 생각했었다. 이건 맞는 생각이었다.

// Ngrx store.ts 
@Injectable()
export class Store<T = object>
  extends Observable<T>
  implements Observer<Action>
{
  constructor(
    state$: StateObservable,
    private actionsObserver: ActionsSubject,
    private reducerManager: ReducerManager
  ) {
    super();
    this.source = state$;
  }
  // ...
  
  export function select<T, K>(
    mapFn: (state: T) => K
  ): (source$: Observable<T>) => Observable<K>;
  
  // ...
}

storeselect()로 받은 selector(==mapFn)를 이용해 source의 형태를 바꾼 뒤 반환한다. source의 타입은 StateObservable이다.

// Ngrx state.ts 
export abstract class StateObservable extends Observable<any> {}
@Injectable()
export class State<T> extends BehaviorSubject<any> implements OnDestroy {
  // ...
}
export const STATE_PROVIDERS: Provider[] = [
  State,
  { provide: StateObservable, useExisting: State },
];

StateObservable을 토큰으로 State 객체가 주입되고, StateBehaviorSubject를 상속하는데, BehaviorSubjectmulticast로 동작해, 값 방출 이후에 구독해도 구독자는 최신값을 받을 수 있다.

결국 내가 컴포넌트의 생성자에서 store에서 가져온 statemulticast로 동작하는 것이다. 그런데 왜 row$history$는 유니캐스트로 동작한것일까?

그것은 내가 multicast 동작에 대해 잘못 이해했기 때문이다. 아래를 보자.

const multicast = interval(1000).pipe( 
  tap(() => console.log('se')), 
  shareReplay(1),
  );

const sub1 = multicast.subscribe(console.log);
const sub2 = multicast.subscribe(console.log);

위 코드의 결과는 어떨까? 결과는 se11se22...로 Sideeffect가 한번만 발생했다. 그런데 아래의 결과는 다르다.

const multicast = interval(1000).pipe( 
  shareReplay(1),
  tap(() => console.log('se')), 
  );

const sub1 = multicast.subscribe(console.log);
const sub2 = multicast.subscribe(console.log);

결과는 se1se1se2se2...shareReplay 연산자 뒤의 tap이 구독 횟수만큼 호출되고있다. 소스 옵저버블이 multicast로 동작해도, 그 뒤의 연산자들이 어떻게 구성되느냐에 unicast처럼 side effect가 구독횟수 만큼 여러번 발생할 수 있는 것이다.

사실 이렇게 써놓고 보면 간단하지만 나는 이걸 헷갈렸고, 삽질을 했다.


❗️❗️❗️ 해결

가져온 state의 Observable을 컴포넌트 내부에서multicast로 만들어서 사용하면 된다.

값이 방출되는 시점이 템플릿에서의 구독 시점 이전이라 shareReplay(1)으로 최신값 1개를 모두 공유하도록 했다. history$는 두번 구독되는데, http request를 보내는 switchMap내부 함수가 한번만 호출되기 위해서는 switchMap 뒤에도 반드시 멀티캐스팅 연산자를 붙여줘야한다.

수정한 컴포넌트의 코드는 아래와 같다.


export class DetailDialogComponent implements AfterViewInit {
  // ...
  constructor (
    // ...
  ) {

    // ...
    this.row$ = this.store.pipe(
      takeUntil(this.unsubscriber),
      select(selectSelectedRow),
      filter(isDefined),
      shareReplay(1)
    );
    this.history$ = this.row$.pipe(
      switchMap(row => this.service.findHistory(row.uid)),
      shareReplay(1)
    )
  }

 // ...

RxJs는 편한 만큼 꼼꼼하게 코딩하지 않으면 어디서 메모리 누수나 이런 종류의 실수가 발생하는지 알기가 쉽지 않다. 조금만 이해를 잘못써도 이런 문제가 발생한다. 실수는 막을 수 없으니 유닛테스트를 정말 꼼꼼하게 작성해야 함을 느꼈다.

0개의 댓글