재사용 가능한 RxJS 패턴 만들기

Adam Kim·2026년 1월 9일

angular

목록 보기
93/102

RxJS로 상태 관리를 하다 보면 종종 동일한 연산자 조합을 반복해서 사용하게 됩니다. 특히 Angular 프로젝트에서는 로딩 상태 관리, API 에러 처리, 폼 검증 같은 패턴들이 계속해서 등장합니다.

이번 글에서는 RxJS 커스텀 연산자를 만들어 이런 반복적인 패턴을 깔끔하게 추상화하는 방법을 알아보겠습니다. Angular 19의 standalone 컴포넌트와 control flow 문법을 활용하였습니다.

문제 상황: 반복되는 RxJS 패턴들

Angular 애플리케이션을 개발하다 보면 api를 호출할 때 아래와 같은 코드들을 보게 됩니다:

// 로딩 상태를 관리하는 반복적인 패턴
this.loading = true;

this.http.get('/api/data').pipe(
  tap(() => this.loading = false),
  catchError(err => {
    this.loading = false;
    return throwError(() => err);
  })
).subscribe();

또는 아래와 같이 상태 변경을 subscribe하기 위해 RxJS를 적용하기도 합니다.

// 폼 입력값 검증과 디바운싱이 필요한 검색
this.searchControl.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  filter(value => value.length > 2),
  switchMap(value => this.searchService.search(value))
).subscribe();

만약 이런 패턴들이 동일하게 또는 유사하게 여러번 반복된다면 커스텀 연산자로 만들어 코드를 더 읽기 쉽고 재사용하기 좋게 만들 수 있습니다.

RxJS 커스텀 연산자의 구조와 샘플 코드

RxJS 연산자는 단순히 Observable을 받아서 새로운 Observable을 반환하는 함수입니다. TypeScript의 타입 시스템을 활용하면 타입 안전성까지 보장할 수 있습니다.

1. 기본 연산자: Identity

가장 단순한 형태부터 시작해보겠습니다:

import { Observable } from 'rxjs';

// 입력 스트림을 그대로 반환하는 identity 연산자
export function identity<T>(): (source: Observable<T>) => Observable<T> {
  return (source: Observable<T>) => source;
}

2. 로딩 상태 관리 연산자

이제 실용적인 연산자를 만들어보겠습니다. API 호출 시 로딩 상태를 자동으로 관리하는 연산자입니다:

import { Observable, MonoTypeOperatorFunction, defer, finalize } from 'rxjs';
import { signal, WritableSignal } from '@angular/core';

export function withLoading<T>(
  loadingSignal: WritableSignal<boolean>
): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      loadingSignal.set(true);
      return source.pipe(
        finalize(() => loadingSignal.set(false))
      );
    });
}

사용 예제 - Standalone Component:

import { Component, signal } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { withLoading } from './operators/with-loading';

@Component({
  selector: 'app-user-list',
  standalone: true,
  imports: [],
  template: `
    <div class="user-container">
      @if (loading()) {
        <div class="spinner">로딩 중...</div>
      } @else {
        <ul>
          @for (user of users(); track user.id) {
            <li>{{ user.name }}</li>
          }
        </ul>
      }

      <button (click)="loadUsers()">사용자 불러오기</button>
    </div>
  `
})
export class UserListComponent {
  loading = signal(false);
  users = signal<User[]>([]);

  constructor(private http: HttpClient) {}

  loadUsers() {
    this.http.get<User[]>('/api/users').pipe(
      withLoading(this.loading)// 👈 로딩 상태 자동 관리
    ).subscribe(data => this.users.set(data));
  }
}

3. 에러 처리와 재시도 연산자

API 호출에 에러 처리와 재시도 로직을 추가하는 연산자를 만들어보겠습니다:

import { Observable, throwError, timer } from 'rxjs';
import { retry, catchError, tap } from 'rxjs/operators';

interface RetryConfig {
  count: number;
  delay: number;
  onError?: (error: any, attempt: number) => void;
}

export function retryWithDelay<T>(config: RetryConfig) {
  return (source: Observable<T>) => {
    let attempt = 0;

    return source.pipe(
      retry({
        count: config.count,
        delay: (error) => {
          attempt++;
          config.onError?.(error, attempt);
          return timer(config.delay * attempt);// 점진적 백오프
        }
      }),
      catchError(error => {
        console.error(`실패: ${attempt}번 재시도 후에도 실패했습니다.`, error);
        return throwError(() => error);
      })
    );
  };
}

4. 검색 입력 최적화 연산자

검색 입력을 최적화하는 연산자를 만들어보겠습니다:

import { Observable, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, filter, map } from 'rxjs/operators';

export function optimizeSearch(
  debounce: number = 300,
  minLength: number = 2
) {
  return pipe(
    debounceTime(debounce),
    map(value => value?.trim() || ''),
    distinctUntilChanged(),
    filter(value => value.length >= minLength)
  );
}

사용 예제 - 검색 컴포넌트:

@Component({
  selector: 'app-search',
  standalone: true,
  imports: [ReactiveFormsModule],
  template: `
    <div class="search-container">
      <input
        type="search"
        [formControl]="searchControl"
        placeholder="검색어를 입력하세요 (2글자 이상)"
      />

      @if (searching()) {
        <div class="searching">검색 중...</div>
      }

      <div class="results">
        @for (result of searchResults(); track result.id) {
          <div class="result-item">
            <h3>{{ result.title }}</h3>
            <p>{{ result.description }}</p>
          </div>
        } @empty {
          @if (searchControl.value && !searching()) {
            <p>검색 결과가 없습니다.</p>
          }
        }
      </div>
    </div>
  `
})
export class SearchComponent {
  searchControl = new FormControl('');
  searchResults = signal<SearchResult[]>([]);
  searching = signal(false);

  constructor(private searchService: SearchService) {
    this.searchControl.valueChanges.pipe(
      optimizeSearch(300, 2),// 👈 검색 최적화
      tap(() => this.searching.set(true)),
      switchMap(term =>
        this.searchService.search(term).pipe(
          withLoading(this.searching)// 👈 로딩 상태 관리
        )
      )
    ).subscribe(results => this.searchResults.set(results));
  }
}

5. 캐싱 연산자

마지막으로, 결과를 캐싱하는 연산자를 만들어보겠습니다:

import { Observable, of, tap } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

interface CacheConfig {
  ttl?: number;// Time to live in milliseconds
  key?: string;
}

const cache = new Map<string, { data: any; timestamp: number }>();

export function withCache<T>(config: CacheConfig = {}) {
  const { ttl = 5 * 60 * 1000, key = 'default' } = config;// 기본 5분

  return (source: Observable<T>) => {
    const cached = cache.get(key);
    const now = Date.now();

    if (cached && now - cached.timestamp < ttl) {
      return of(cached.data as T);
    }

    return source.pipe(
      tap(data => cache.set(key, { data, timestamp: now })),
      shareReplay(1)
    );
  };
}

실전 예제

이제 만든 모든 연산자들을 활용해서 실제 애플리케이션에서 사용할 법한 컴포넌트를 만들어보겠습니다:

@Component({
  selector: 'app-product-dashboard',
  standalone: true,
  imports: [ReactiveFormsModule, CurrencyPipe],
  template: `
    <div class="dashboard">
      <header>
        <h1>상품 대시보드</h1>
        <input
          type="search"
          [formControl]="searchControl"
          placeholder="상품 검색..."
        />
      </header>

      @if (loading()) {
        <div class="loader">데이터를 불러오는 중...</div>
      }

      <div class="products">
        @for (product of filteredProducts(); track product.id) {
          <div class="product-card">
            <h3>{{ product.name }}</h3>
            <p>{{ product.price | currency:'KRW' }}</p>
          </div>
        } @empty {
          @if (!loading()) {
            <p>상품이 없습니다.</p>
          }
        }
      </div>

      @if (error()) {
        <div class="error">
          에러가 발생했습니다: {{ error() }}
          <button (click)="reload()">다시 시도</button>
        </div>
      }
    </div>
  `
})
export class ProductDashboardComponent implements OnInit {
  searchControl = new FormControl('');
  products = signal<Product[]>([]);
  filteredProducts = signal<Product[]>([]);
  loading = signal(false);
  error = signal<string | null>(null);

  constructor(private http: HttpClient) {}

  ngOnInit() {
    this.loadProducts();
    this.setupSearch();
  }

  loadProducts() {
    this.error.set(null);

    this.http.get<Product[]>('/api/products').pipe(
      withCache({ key: 'products', ttl: 10 * 60 * 1000 }),// 10분 캐싱
      withLoading(this.loading),
      retryWithDelay({
        count: 3,
        delay: 1000,
        onError: (err, attempt) => console.log(`재시도 ${attempt}/3`)
      }),
      catchError(err => {
        this.error.set(err.message);
        return of([]);
      })
    ).subscribe(products => {
      this.products.set(products);
      this.filteredProducts.set(products);
    });
  }

  setupSearch() {
    this.searchControl.valueChanges.pipe(
      optimizeSearch(300, 2)
    ).subscribe(searchTerm => {
      const filtered = this.products().filter(product =>
        product.name.toLowerCase().includes(searchTerm.toLowerCase())
      );
      this.filteredProducts.set(filtered);
    });
  }

  reload() {
    this.loadProducts();
  }
}

결론

RxJS 커스텀 연산자를 활용하면 복잡한 비동기 로직을 재사용 가능한 단위로 캡슐화할 수 있습니다. 특히 Angular의 새로운 Signal API와 함께 사용하면 반응형 프로그래밍을 더욱 직관적으로 구현할 수 있습니다.

커스텀 연산자를 만들 때는 항상 다음 사항들을 고려하세요:

  • 단일 책임 원칙: 하나의 연산자는 하나의 기능에만 집중
  • 타입 안전성: TypeScript의 제네릭을 활용해 타입 추론이 제대로 동작하도록 구현
  • 테스트 가능성: 각 연산자를 독립적으로 테스트할 수 있도록 설계
  • 재사용성: 프로젝트 전반에서 활용할 수 있을 만큼 범용적으로 만들기

이런 커스텀 연산자들을 잘 활용하면 더 깨끗하고 유지보수하기 쉬운 Angular 애플리케이션을 만들 수 있습니다.

profile
Angular2+ Developer

0개의 댓글