with one click
angular-rxjs-patterns
// Use when handling async operations in Angular applications with observables, operators, and subjects.
// Use when handling async operations in Angular applications with observables, operators, and subjects.
Use when building modular Angular applications requiring dependency injection with providers, injectors, and services.
Use when building Angular 16+ applications requiring fine-grained reactive state management and zone-less change detection.
Guides end-to-end feature development through 8 phases: discover requirements, explore codebase patterns, clarify ambiguities with the user, design architecture, implement with TDD, run multi-agent code review, validate all quality gates, and write a blog post. Use when asked to add a feature, implement a new capability, build functionality, or develop a feature end-to-end.
Use when creating or modifying Han plugins. Covers plugin structure, configuration, hooks, skills, and best practices.
Minimize token consumption through efficient tool usage patterns
Prefer explicit configuration over framework defaults to prevent environment-dependent failures
| name | angular-rxjs-patterns |
| user-invocable | false |
| description | Use when handling async operations in Angular applications with observables, operators, and subjects. |
| allowed-tools | ["Bash","Read"] |
Master RxJS in Angular for handling async operations, data streams, and reactive programming patterns.
import { Observable, of, from, interval, fromEvent } from 'rxjs';
// of - emit values in sequence
const numbers$ = of(1, 2, 3, 4, 5);
// from - convert array, promise, or iterable
const fromArray$ = from([1, 2, 3]);
const fromPromise$ = from(fetch('/api/data'));
// interval - emit numbers at intervals
const timer$ = interval(1000); // Every second
// fromEvent - DOM events
const clicks$ = fromEvent(document, 'click');
// Custom observable
const custom$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});
import { HttpClient } from '@angular/common/http';
import { Injectable, inject } from '@angular/core';
import { Observable } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class DataService {
private readonly http = inject(HttpClient);
getData(): Observable<Data[]> {
return this.http.get<Data[]>('/api/data');
}
getItem(id: string): Observable<Data> {
return this.http.get<Data>(`/api/data/${id}`);
}
createItem(data: Data): Observable<Data> {
return this.http.post<Data>('/api/data', data);
}
updateItem(id: string, data: Data): Observable<Data> {
return this.http.put<Data>(`/api/data/${id}`, data);
}
deleteItem(id: string): Observable<void> {
return this.http.delete<void>(`/api/data/${id}`);
}
}
import { map, pluck, switchMap, mergeMap, concatMap } from 'rxjs/operators';
import { of } from 'rxjs';
// map - transform values
const numbers$ = of(1, 2, 3).pipe(
map(n => n * 2) // 2, 4, 6
);
// pluck - extract property (deprecated, use map)
const users$ = of(
{ name: 'John', age: 30 },
{ name: 'Jane', age: 25 }
).pipe(
map(user => user.name) // 'John', 'Jane'
);
// switchMap - cancel previous, emit new
searchControl.valueChanges.pipe(
switchMap(term => this.searchService.search(term))
).subscribe(results => {
this.results = results;
});
// mergeMap - run in parallel
const ids$ = of(1, 2, 3);
ids$.pipe(
mergeMap(id => this.getUser(id)) // All requests in parallel
).subscribe();
// concatMap - run in sequence
ids$.pipe(
concatMap(id => this.getUser(id)) // One at a time
).subscribe();
import { filter, take, takeUntil, takeWhile, distinctUntilChanged } from 'rxjs/operators';
// filter - only emit matching values
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0) // 2, 4
);
// take - first N values
interval(1000).pipe(
take(5) // First 5 emissions
);
// takeUntil - until another observable emits
const destroy$ = new Subject();
source$.pipe(
takeUntil(destroy$)
).subscribe();
// distinctUntilChanged - skip duplicate consecutive values
of(1, 1, 2, 2, 3, 3).pipe(
distinctUntilChanged() // 1, 2, 3
);
import { combineLatest, merge, concat, forkJoin, zip } from 'rxjs';
import { startWith } from 'rxjs/operators';
// combineLatest - emit when any source emits
combineLatest([
this.user$,
this.settings$
]).pipe(
map(([user, settings]) => ({ user, settings }))
).subscribe();
// merge - emit from any source
merge(
this.clicks$,
this.hovers$
).subscribe();
// concat - emit in sequence
concat(
this.loadUser$,
this.loadSettings$
).subscribe();
// forkJoin - wait for all to complete
forkJoin({
user: this.getUser(),
posts: this.getPosts(),
comments: this.getComments()
}).subscribe(({ user, posts, comments }) => {
// All complete
});
// zip - pair values from sources
zip(
of(1, 2, 3),
of('a', 'b', 'c')
).pipe(
map(([num, letter]) => `${num}${letter}`)
); // '1a', '2b', '3c'
import { tap, delay, debounceTime, throttleTime, distinctUntilChanged } from 'rxjs/operators';
// tap - side effects (logging, etc.)
source$.pipe(
tap(value => console.log('Value:', value)),
map(value => value * 2)
);
// delay - delay emissions
of(1, 2, 3).pipe(
delay(1000) // Delay 1 second
);
// debounceTime - wait for pause in emissions
searchControl.valueChanges.pipe(
debounceTime(300) // Wait 300ms after user stops typing
);
// throttleTime - emit first value, ignore for duration
clicks$.pipe(
throttleTime(1000) // Only once per second
);
// distinctUntilChanged - skip duplicates
input$.pipe(
distinctUntilChanged() // Only when value changes
);
import { catchError } from 'rxjs/operators';
import { of, EMPTY, throwError } from 'rxjs';
// Return fallback value
this.http.get('/api/data').pipe(
catchError(error => {
console.error('Error:', error);
return of([]); // Return empty array
})
);
// Return empty observable
source$.pipe(
catchError(() => EMPTY) // Complete without emitting
);
// Re-throw error
source$.pipe(
catchError(error => {
console.error('Error:', error);
return throwError(() => new Error('Custom error'));
})
);
// Handle different error types
source$.pipe(
catchError(error => {
if (error.status === 404) {
return of(null);
}
return throwError(() => error);
})
);
import { retry, retryWhen, delay, take } from 'rxjs/operators';
// Simple retry
this.http.get('/api/data').pipe(
retry(3) // Retry up to 3 times
);
// Retry with delay
this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
delay(1000), // Wait 1 second
take(3) // Max 3 retries
)
)
);
// Exponential backoff
this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
mergeMap((error, index) => {
if (index >= 3) {
return throwError(() => error);
}
const delayMs = Math.pow(2, index) * 1000;
return of(error).pipe(delay(delayMs));
})
)
)
);
Use takeUntilDestroyed() from @angular/core/rxjs-interop — no ngOnDestroy
needed, and no manual Subject<void> to manage:
import { Component, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({
selector: 'app-my-component',
standalone: true
})
export class MyComponent {
private readonly dataService = inject(DataService);
constructor() {
this.dataService.data$.pipe(
takeUntilDestroyed() // automatically unsubscribes when component destroys
).subscribe(data => {
this.data = data;
});
this.dataService.other$.pipe(
takeUntilDestroyed()
).subscribe(other => {
this.other = other;
});
}
}
When subscribing imperatively outside the constructor (e.g. on user action),
use DestroyRef directly:
import { Component, inject } from '@angular/core';
import { DestroyRef } from '@angular/core';
@Component({
selector: 'app-my-component',
standalone: true
})
export class MyComponent {
readonly #destroyRef = inject(DestroyRef);
startPolling() {
const sub = interval(5000).subscribe(() => this.poll());
this.#destroyRef.onDestroy(() => sub.unsubscribe());
}
}
import { Component, inject } from '@angular/core';
import { AsyncPipe } from '@angular/common';
import { Observable } from 'rxjs';
@Component({
selector: 'app-user-list',
standalone: true,
imports: [AsyncPipe],
template: `
@if (users$ | async; as users) {
@for (user of users; track user.id) {
<div>{{ user.name }}</div>
}
}
@if (loading$ | async) {
<div>Loading...</div>
}
@if (error$ | async; as error) {
<div>Error: {{ error }}</div>
}
`
})
export class UserListComponent {
private readonly userService = inject(UserService);
users$: Observable<User[]> = this.userService.getUsers();
loading$: Observable<boolean> = this.userService.loading$;
error$: Observable<string | null> = this.userService.error$;
}
import { Subject } from 'rxjs';
const subject = new Subject<number>();
// Multiple subscribers
subject.subscribe(val => console.log('A:', val));
subject.subscribe(val => console.log('B:', val));
subject.next(1); // A: 1, B: 1
subject.next(2); // A: 2, B: 2
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject<number>(0); // Initial value
subject.subscribe(val => console.log('A:', val)); // A: 0
subject.next(1); // A: 1
subject.next(2); // A: 2
subject.subscribe(val => console.log('B:', val)); // B: 2 (latest value)
// Common pattern for state management
@Injectable({
providedIn: 'root'
})
export class StateService {
private stateSubject = new BehaviorSubject<State>(initialState);
state$ = this.stateSubject.asObservable();
updateState(newState: State) {
this.stateSubject.next(newState);
}
get currentState(): State {
return this.stateSubject.value;
}
}
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject<number>(2); // Buffer last 2 values
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(val => console.log('A:', val)); // A: 2, A: 3
subject.next(4); // A: 4
subject.subscribe(val => console.log('B:', val)); // B: 3, B: 4
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject<number>();
subject.subscribe(val => console.log('A:', val));
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete(); // A: 3 (only last value when complete)
// Each subscription creates new execution
const cold$ = interval(1000);
cold$.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
cold$.subscribe(val => console.log('B:', val)); // B: 0, 1, 2... (separate execution)
}, 2000);
import { Subject, interval } from 'rxjs';
import { share, shareReplay } from 'rxjs/operators';
// Using Subject
const subject = new Subject();
const source$ = interval(1000);
source$.subscribe(subject);
subject.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
subject.subscribe(val => console.log('B:', val)); // B: 2, 3, 4... (shared)
}, 2000);
// Using share operator
const shared$ = interval(1000).pipe(share());
shared$.subscribe(val => console.log('A:', val));
setTimeout(() => {
shared$.subscribe(val => console.log('B:', val)); // Shares source
}, 2000);
// Using shareReplay
const cached$ = this.http.get('/api/data').pipe(
shareReplay(1) // Cache last 1 value
);
// Multiple subscribers get cached result
cached$.subscribe();
cached$.subscribe(); // No second HTTP request
Use signals for synchronous state. Use observables for async HTTP operations,
converting them to signals with toSignal() when needed in templates:
import { Injectable, inject, signal, computed } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, catchError, finalize, tap } from 'rxjs';
import { of } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class UserService {
private readonly http = inject(HttpClient);
// Signals for state
readonly users = signal<User[]>([]);
readonly loading = signal(false);
readonly error = signal<string | null>(null);
// Computed signal
readonly hasUsers = computed(() => this.users().length > 0);
loadUsers(): void {
this.loading.set(true);
this.error.set(null);
this.http.get<User[]>('/api/users').pipe(
tap(users => this.users.set(users)),
catchError(err => {
this.error.set(err.message);
return of([]);
}),
finalize(() => this.loading.set(false))
).subscribe();
}
getUser(id: string): Observable<User> {
return this.http.get<User>(`/api/users/${id}`);
}
}
Template with signals (no async pipe needed for signal state):
@Component({
selector: 'app-user-list',
standalone: true,
template: `
@if (userService.loading()) {
<div>Loading...</div>
}
@if (userService.error(); as error) {
<div>Error: {{ error }}</div>
}
@for (user of userService.users(); track user.id) {
<div>{{ user.name }}</div>
}
`
})
export class UserListComponent {
protected readonly userService = inject(UserService);
constructor() {
this.userService.loadUsers();
}
}
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, Subject, of } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
@Injectable({
providedIn: 'root'
})
export class SearchService {
private readonly http = inject(HttpClient);
private readonly searchTerms = new Subject<string>();
readonly results$: Observable<SearchResult[]> = this.searchTerms.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => this.search(term))
);
search(term: string): Observable<SearchResult[]> {
if (!term.trim()) {
return of([]);
}
return this.http.get<SearchResult[]>(`/api/search?q=${term}`);
}
setSearchTerm(term: string): void {
this.searchTerms.next(term);
}
}
import { TestBed } from '@angular/core/testing';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
describe('UserService', () => {
let service: UserService;
let httpMock: HttpTestingController;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [HttpClientTestingModule],
providers: [UserService]
});
service = TestBed.inject(UserService);
httpMock = TestBed.inject(HttpTestingController);
});
afterEach(() => {
httpMock.verify();
});
it('should fetch users', () => {
const mockUsers = [{ id: 1, name: 'John' }];
service.getUsers().subscribe(users => {
expect(users).toEqual(mockUsers);
});
const req = httpMock.expectOne('/api/users');
expect(req.request.method).toBe('GET');
req.flush(mockUsers);
});
});
import { TestScheduler } from 'rxjs/testing';
describe('Marble tests', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('should debounce', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c|');
const expected = '-----c|';
const result$ = source$.pipe(debounceTime(20));
expectObservable(result$).toBe(expected);
});
});
});
Use angular-rxjs-patterns when building modern, production-ready applications that require:
takeUntilDestroyed() - Automatic cleanup without ngOnDestroyBehaviorSubject → signal() for synchronous state@if/@for control flow - Replaces *ngIf/*ngForinject() - Cleaner than constructor injection