RxJS로 상태 관리를 하다 보면 종종 동일한 연산자 조합을 반복해서 사용하게 됩니다. 특히 Angular 프로젝트에서는 로딩 상태 관리, API 에러 처리, 폼 검증 같은 패턴들이 계속해서 등장합니다.
이번 글에서는 RxJS 커스텀 연산자를 만들어 이런 반복적인 패턴을 깔끔하게 추상화하는 방법을 알아보겠습니다. Angular 19의 standalone 컴포넌트와 control flow 문법을 활용하였습니다.
Angular 애플리케이션을 개발하다 보면 api를 호출할 때 아래와 같은 코드들을 보게 됩니다:
// 로딩 상태를 관리하는 반복적인 패턴
this.loading = true;
this.http.get('/api/data').pipe(
tap(() => this.loading = false),
catchError(err => {
this.loading = false;
return throwError(() => err);
})
).subscribe();
또는 아래와 같이 상태 변경을 subscribe하기 위해 RxJS를 적용하기도 합니다.
// 폼 입력값 검증과 디바운싱이 필요한 검색
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(value => value.length > 2),
switchMap(value => this.searchService.search(value))
).subscribe();
만약 이런 패턴들이 동일하게 또는 유사하게 여러번 반복된다면 커스텀 연산자로 만들어 코드를 더 읽기 쉽고 재사용하기 좋게 만들 수 있습니다.
RxJS 연산자는 단순히 Observable을 받아서 새로운 Observable을 반환하는 함수입니다. TypeScript의 타입 시스템을 활용하면 타입 안전성까지 보장할 수 있습니다.
가장 단순한 형태부터 시작해보겠습니다:
import { Observable } from 'rxjs';
// 입력 스트림을 그대로 반환하는 identity 연산자
export function identity<T>(): (source: Observable<T>) => Observable<T> {
return (source: Observable<T>) => source;
}
이제 실용적인 연산자를 만들어보겠습니다. API 호출 시 로딩 상태를 자동으로 관리하는 연산자입니다:
import { Observable, MonoTypeOperatorFunction, defer, finalize } from 'rxjs';
import { signal, WritableSignal } from '@angular/core';
export function withLoading<T>(
loadingSignal: WritableSignal<boolean>
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
loadingSignal.set(true);
return source.pipe(
finalize(() => loadingSignal.set(false))
);
});
}
사용 예제 - Standalone Component:
import { Component, signal } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { withLoading } from './operators/with-loading';
@Component({
selector: 'app-user-list',
standalone: true,
imports: [],
template: `
<div class="user-container">
@if (loading()) {
<div class="spinner">로딩 중...</div>
} @else {
<ul>
@for (user of users(); track user.id) {
<li>{{ user.name }}</li>
}
</ul>
}
<button (click)="loadUsers()">사용자 불러오기</button>
</div>
`
})
export class UserListComponent {
loading = signal(false);
users = signal<User[]>([]);
constructor(private http: HttpClient) {}
loadUsers() {
this.http.get<User[]>('/api/users').pipe(
withLoading(this.loading)// 👈 로딩 상태 자동 관리
).subscribe(data => this.users.set(data));
}
}
API 호출에 에러 처리와 재시도 로직을 추가하는 연산자를 만들어보겠습니다:
import { Observable, throwError, timer } from 'rxjs';
import { retry, catchError, tap } from 'rxjs/operators';
interface RetryConfig {
count: number;
delay: number;
onError?: (error: any, attempt: number) => void;
}
export function retryWithDelay<T>(config: RetryConfig) {
return (source: Observable<T>) => {
let attempt = 0;
return source.pipe(
retry({
count: config.count,
delay: (error) => {
attempt++;
config.onError?.(error, attempt);
return timer(config.delay * attempt);// 점진적 백오프
}
}),
catchError(error => {
console.error(`실패: ${attempt}번 재시도 후에도 실패했습니다.`, error);
return throwError(() => error);
})
);
};
}
검색 입력을 최적화하는 연산자를 만들어보겠습니다:
import { Observable, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, filter, map } from 'rxjs/operators';
export function optimizeSearch(
debounce: number = 300,
minLength: number = 2
) {
return pipe(
debounceTime(debounce),
map(value => value?.trim() || ''),
distinctUntilChanged(),
filter(value => value.length >= minLength)
);
}
사용 예제 - 검색 컴포넌트:
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule],
template: `
<div class="search-container">
<input
type="search"
[formControl]="searchControl"
placeholder="검색어를 입력하세요 (2글자 이상)"
/>
@if (searching()) {
<div class="searching">검색 중...</div>
}
<div class="results">
@for (result of searchResults(); track result.id) {
<div class="result-item">
<h3>{{ result.title }}</h3>
<p>{{ result.description }}</p>
</div>
} @empty {
@if (searchControl.value && !searching()) {
<p>검색 결과가 없습니다.</p>
}
}
</div>
</div>
`
})
export class SearchComponent {
searchControl = new FormControl('');
searchResults = signal<SearchResult[]>([]);
searching = signal(false);
constructor(private searchService: SearchService) {
this.searchControl.valueChanges.pipe(
optimizeSearch(300, 2),// 👈 검색 최적화
tap(() => this.searching.set(true)),
switchMap(term =>
this.searchService.search(term).pipe(
withLoading(this.searching)// 👈 로딩 상태 관리
)
)
).subscribe(results => this.searchResults.set(results));
}
}
마지막으로, 결과를 캐싱하는 연산자를 만들어보겠습니다:
import { Observable, of, tap } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
interface CacheConfig {
ttl?: number;// Time to live in milliseconds
key?: string;
}
const cache = new Map<string, { data: any; timestamp: number }>();
export function withCache<T>(config: CacheConfig = {}) {
const { ttl = 5 * 60 * 1000, key = 'default' } = config;// 기본 5분
return (source: Observable<T>) => {
const cached = cache.get(key);
const now = Date.now();
if (cached && now - cached.timestamp < ttl) {
return of(cached.data as T);
}
return source.pipe(
tap(data => cache.set(key, { data, timestamp: now })),
shareReplay(1)
);
};
}
이제 만든 모든 연산자들을 활용해서 실제 애플리케이션에서 사용할 법한 컴포넌트를 만들어보겠습니다:
@Component({
selector: 'app-product-dashboard',
standalone: true,
imports: [ReactiveFormsModule, CurrencyPipe],
template: `
<div class="dashboard">
<header>
<h1>상품 대시보드</h1>
<input
type="search"
[formControl]="searchControl"
placeholder="상품 검색..."
/>
</header>
@if (loading()) {
<div class="loader">데이터를 불러오는 중...</div>
}
<div class="products">
@for (product of filteredProducts(); track product.id) {
<div class="product-card">
<h3>{{ product.name }}</h3>
<p>{{ product.price | currency:'KRW' }}</p>
</div>
} @empty {
@if (!loading()) {
<p>상품이 없습니다.</p>
}
}
</div>
@if (error()) {
<div class="error">
에러가 발생했습니다: {{ error() }}
<button (click)="reload()">다시 시도</button>
</div>
}
</div>
`
})
export class ProductDashboardComponent implements OnInit {
searchControl = new FormControl('');
products = signal<Product[]>([]);
filteredProducts = signal<Product[]>([]);
loading = signal(false);
error = signal<string | null>(null);
constructor(private http: HttpClient) {}
ngOnInit() {
this.loadProducts();
this.setupSearch();
}
loadProducts() {
this.error.set(null);
this.http.get<Product[]>('/api/products').pipe(
withCache({ key: 'products', ttl: 10 * 60 * 1000 }),// 10분 캐싱
withLoading(this.loading),
retryWithDelay({
count: 3,
delay: 1000,
onError: (err, attempt) => console.log(`재시도 ${attempt}/3`)
}),
catchError(err => {
this.error.set(err.message);
return of([]);
})
).subscribe(products => {
this.products.set(products);
this.filteredProducts.set(products);
});
}
setupSearch() {
this.searchControl.valueChanges.pipe(
optimizeSearch(300, 2)
).subscribe(searchTerm => {
const filtered = this.products().filter(product =>
product.name.toLowerCase().includes(searchTerm.toLowerCase())
);
this.filteredProducts.set(filtered);
});
}
reload() {
this.loadProducts();
}
}
RxJS 커스텀 연산자를 활용하면 복잡한 비동기 로직을 재사용 가능한 단위로 캡슐화할 수 있습니다. 특히 Angular의 새로운 Signal API와 함께 사용하면 반응형 프로그래밍을 더욱 직관적으로 구현할 수 있습니다.
커스텀 연산자를 만들 때는 항상 다음 사항들을 고려하세요:
이런 커스텀 연산자들을 잘 활용하면 더 깨끗하고 유지보수하기 쉬운 Angular 애플리케이션을 만들 수 있습니다.