Observables and RxJS in Angular¶
Course Objective¶
Master reactive programming with RxJS Observables in Angular, learning to handle asynchronous data streams, implement complex data transformations, and build reactive user interfaces.
1. Introduction to Reactive Programming¶
Reactive programming is a declarative programming paradigm that deals with asynchronous data streams. In Angular, RxJS (Reactive Extensions for JavaScript) provides powerful tools for handling complex asynchronous operations.
Why RxJS in Angular?¶
- Consistent async handling: Unify promises, events, and callbacks
- Powerful operators: Transform, filter, and combine data streams
- Cancellation: Easy request cancellation and cleanup
- Error handling: Robust error recovery patterns
- Testing: Excellent testing support
Key Concepts¶
// Observable: A stream of data over time
// Observer: An object that subscribes to an Observable
// Subscription: The execution of an Observable
// Operators: Pure functions for transforming Observables
2. Understanding Observables¶
Observable vs Promise vs Callback¶
// Callback (old approach)
function fetchUserCallback(id: number, callback: (user: User) => void): void {
// Implementation with callback
}
// Promise (better, but limited)
function fetchUserPromise(id: number): Promise<User> {
return fetch(`/api/users/${id}`).then(response => response.json());
}
// Observable (most powerful)
function fetchUserObservable(id: number): Observable<User> {
return this.http.get<User>(`/api/users/${id}`);
}
// Key differences:
// - Observables are lazy (not executed until subscribed)
// - Observables can emit multiple values over time
// - Observables are cancellable
// - Observables have rich operator ecosystem
Observable Lifecycle¶
import { Observable, Observer } from 'rxjs';
// Creating a simple Observable
const observable$ = new Observable<string>((observer: Observer<string>) => {
console.log('Observable started');
// Emit values
observer.next('First value');
observer.next('Second value');
// Simulate async operation
setTimeout(() => {
observer.next('Third value');
observer.complete(); // Signal completion
}, 1000);
// Cleanup function (called on unsubscribe)
return () => {
console.log('Observable cleanup');
};
});
// Subscribe to the Observable
const subscription = observable$.subscribe({
next: (value) => console.log('Received:', value),
error: (error) => console.error('Error:', error),
complete: () => console.log('Observable completed')
});
// Unsubscribe after 500ms
setTimeout(() => {
subscription.unsubscribe();
}, 500);
3. Creating Observables¶
Creation Operators¶
import { of, from, interval, timer, fromEvent, ajax, EMPTY, NEVER } from 'rxjs';
import { map, take } from 'rxjs/operators';
// of: Create Observable from static values
const numbers$ = of(1, 2, 3, 4, 5);
// from: Convert array, promise, or iterable to Observable
const fromArray$ = from([1, 2, 3, 4, 5]);
const fromPromise$ = from(fetch('/api/data'));
// interval: Emit sequential numbers at intervals
const interval$ = interval(1000); // Emit every second
// timer: Emit after delay, then optionally at intervals
const timer$ = timer(2000, 1000); // Wait 2s, then emit every 1s
// fromEvent: Convert DOM events to Observable
const clicks$ = fromEvent(document, 'click');
const buttonClicks$ = fromEvent(buttonElement, 'click');
// ajax: Make HTTP requests
const httpRequest$ = ajax.getJSON('/api/users');
// Empty and Never
const empty$ = EMPTY; // Immediately completes
const never$ = NEVER; // Never emits or completes
Angular-Specific Observables¶
// services/data.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, BehaviorSubject, combineLatest } from 'rxjs';
import { map, shareReplay, startWith } from 'rxjs/operators';
export interface User {
id: number;
name: string;
email: string;
}
@Injectable({
providedIn: 'root'
})
export class DataService {
// HTTP Observables (cold)
users$ = this.http.get<User[]>('/api/users').pipe(
shareReplay(1) // Share result with multiple subscribers
);
// BehaviorSubject for state management (hot)
private selectedUserIdSubject = new BehaviorSubject<number | null>(null);
selectedUserId$ = this.selectedUserIdSubject.asObservable();
// Derived Observable
selectedUser$ = combineLatest([
this.users$,
this.selectedUserId$
]).pipe(
map(([users, selectedId]) =>
users.find(user => user.id === selectedId) || null
)
);
constructor(private http: HttpClient) {}
selectUser(id: number): void {
this.selectedUserIdSubject.next(id);
}
// Creating Observable from component events
createSearchObservable(searchInput: HTMLInputElement): Observable<string> {
return fromEvent(searchInput, 'input').pipe(
map((event: any) => event.target.value),
startWith('') // Start with empty string
);
}
}
4. Subscribing and Unsubscribing¶
Subscription Management Patterns¶
// components/user-list.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Subscription, takeUntil } from 'rxjs';
import { DataService } from '../services/data.service';
@Component({
selector: 'app-user-list',
template: `
<div class="user-list">
<div *ngFor="let user of users" class="user-item">
{{ user.name }} - {{ user.email }}
</div>
<div *ngIf="selectedUser" class="selected-user">
Selected: {{ selectedUser.name }}
</div>
</div>
`
})
export class UserListComponent implements OnInit, OnDestroy {
users: User[] = [];
selectedUser: User | null = null;
// Pattern 1: takeUntil with destroy Subject (recommended)
private destroy$ = new Subject<void>();
// Pattern 2: Manual subscription management
private subscriptions = new Subscription();
constructor(private dataService: DataService) {}
ngOnInit(): void {
// Pattern 1: Using takeUntil (recommended)
this.dataService.users$
.pipe(takeUntil(this.destroy$))
.subscribe(users => {
this.users = users;
});
this.dataService.selectedUser$
.pipe(takeUntil(this.destroy$))
.subscribe(user => {
this.selectedUser = user;
});
// Pattern 2: Manual subscription management
const userSubscription = this.dataService.users$.subscribe(users => {
this.users = users;
});
this.subscriptions.add(userSubscription);
}
ngOnDestroy(): void {
// Pattern 1: Complete destroy subject
this.destroy$.next();
this.destroy$.complete();
// Pattern 2: Unsubscribe all
this.subscriptions.unsubscribe();
}
selectUser(id: number): void {
this.dataService.selectUser(id);
}
}
Async Pipe (Automatic Subscription Management)¶
// components/user-list-async.component.ts
@Component({
selector: 'app-user-list-async',
template: `
<div class="user-list">
<!-- Async pipe handles subscription/unsubscription automatically -->
<div *ngFor="let user of users$ | async" class="user-item">
{{ user.name }} - {{ user.email }}
</div>
<div *ngIf="selectedUser$ | async as selectedUser" class="selected-user">
Selected: {{ selectedUser.name }}
</div>
<!-- Loading state -->
<div *ngIf="!(users$ | async)" class="loading">
Loading users...
</div>
</div>
`
})
export class UserListAsyncComponent {
users$ = this.dataService.users$;
selectedUser$ = this.dataService.selectedUser$;
constructor(private dataService: DataService) {}
selectUser(id: number): void {
this.dataService.selectUser(id);
}
}
5. Operators and Transformations¶
Transformation Operators¶
import { of, from, interval } from 'rxjs';
import {
map,
filter,
tap,
switchMap,
mergeMap,
concatMap,
exhaustMap,
scan,
reduce,
pluck,
distinctUntilChanged,
debounceTime,
throttleTime,
startWith,
share,
shareReplay
} from 'rxjs/operators';
// map: Transform each emitted value
const numbers$ = of(1, 2, 3, 4, 5);
const doubled$ = numbers$.pipe(
map(x => x * 2)
); // Emits: 2, 4, 6, 8, 10
// filter: Only emit values that pass a test
const evenNumbers$ = numbers$.pipe(
filter(x => x % 2 === 0)
); // Emits: 2, 4
// tap: Perform side effects without changing the stream
const withSideEffects$ = numbers$.pipe(
tap(x => console.log('Processing:', x)),
map(x => x * 2),
tap(x => console.log('Result:', x))
);
Flattening Operators¶
// services/search.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, of } from 'rxjs';
import { switchMap, mergeMap, concatMap, exhaustMap, catchError } from 'rxjs/operators';
@Injectable({
providedIn: 'root'
})
export class SearchService {
constructor(private http: HttpClient) {}
// switchMap: Cancel previous requests (good for search)
searchUsers(term: string): Observable<User[]> {
if (!term.trim()) {
return of([]);
}
return of(term).pipe(
switchMap(searchTerm =>
this.http.get<User[]>(`/api/search?q=${searchTerm}`)
),
catchError(error => {
console.error('Search error:', error);
return of([]);
})
);
}
// mergeMap: Run requests in parallel
getUserDetails(userIds: number[]): Observable<User> {
return from(userIds).pipe(
mergeMap(id => this.http.get<User>(`/api/users/${id}`))
);
}
// concatMap: Run requests sequentially
updateUsersSequentially(updates: UserUpdate[]): Observable<User> {
return from(updates).pipe(
concatMap(update =>
this.http.put<User>(`/api/users/${update.id}`, update.data)
)
);
}
// exhaustMap: Ignore new requests while current is active
saveUserData(userData: User): Observable<User> {
return of(userData).pipe(
exhaustMap(data => this.http.post<User>('/api/users', data))
);
}
}
Utility Operators¶
// components/search.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { FormControl } from '@angular/forms';
import { Subject } from 'rxjs';
import {
debounceTime,
distinctUntilChanged,
filter,
switchMap,
tap,
takeUntil,
startWith,
catchError
} from 'rxjs/operators';
@Component({
selector: 'app-search',
template: `
<div class="search-container">
<input
[formControl]="searchControl"
placeholder="Search users..."
class="search-input"
>
<div *ngIf="loading" class="loading">Searching...</div>
<div class="results">
<div *ngFor="let user of searchResults" class="result-item">
{{ user.name }} - {{ user.email }}
</div>
</div>
<div *ngIf="searchResults.length === 0 && searchControl.value" class="no-results">
No users found
</div>
</div>
`
})
export class SearchComponent implements OnInit, OnDestroy {
searchControl = new FormControl('');
searchResults: User[] = [];
loading = false;
private destroy$ = new Subject<void>();
constructor(private searchService: SearchService) {}
ngOnInit(): void {
this.searchControl.valueChanges.pipe(
startWith(''), // Start with empty string
debounceTime(300), // Wait 300ms after user stops typing
distinctUntilChanged(), // Only if search term changed
filter(term => term === '' || term.length >= 2), // Empty or at least 2 chars
tap(() => this.loading = true), // Show loading
switchMap(term =>
this.searchService.searchUsers(term).pipe(
catchError(error => {
console.error('Search failed:', error);
return of([]);
})
)
),
tap(() => this.loading = false), // Hide loading
takeUntil(this.destroy$)
).subscribe(results => {
this.searchResults = results;
});
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
6. Subjects and BehaviorSubjects¶
Types of Subjects¶
import {
Subject,
BehaviorSubject,
ReplaySubject,
AsyncSubject
} from 'rxjs';
// Subject: No initial value, only new subscribers get new values
const subject = new Subject<string>();
subject.next('Hello'); // Won't be received by future subscribers
// BehaviorSubject: Has initial value, new subscribers get current value
const behaviorSubject = new BehaviorSubject<string>('Initial');
behaviorSubject.next('Hello'); // New subscribers will get 'Hello'
// ReplaySubject: Replays specified number of previous values
const replaySubject = new ReplaySubject<string>(2); // Remember last 2 values
replaySubject.next('First');
replaySubject.next('Second');
replaySubject.next('Third');
// New subscriber will get 'Second' and 'Third'
// AsyncSubject: Only emits the last value when completed
const asyncSubject = new AsyncSubject<string>();
asyncSubject.next('First');
asyncSubject.next('Last'); // Only this will be emitted
asyncSubject.complete(); // Triggers emission
State Management with BehaviorSubject¶
// services/cart.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';
export interface CartItem {
id: number;
name: string;
price: number;
quantity: number;
}
export interface CartState {
items: CartItem[];
total: number;
itemCount: number;
}
@Injectable({
providedIn: 'root'
})
export class CartService {
// Private BehaviorSubject for internal state management
private cartItemsSubject = new BehaviorSubject<CartItem[]>([]);
// Public Observable for components to subscribe to
cartItems$ = this.cartItemsSubject.asObservable();
// Derived observables
itemCount$ = this.cartItems$.pipe(
map(items => items.reduce((count, item) => count + item.quantity, 0))
);
total$ = this.cartItems$.pipe(
map(items => items.reduce((total, item) => total + (item.price * item.quantity), 0))
);
// Combined cart state
cartState$: Observable<CartState> = combineLatest([
this.cartItems$,
this.total$,
this.itemCount$
]).pipe(
map(([items, total, itemCount]) => ({ items, total, itemCount }))
);
// Methods to update state
addItem(item: Omit<CartItem, 'quantity'>): void {
const currentItems = this.cartItemsSubject.value;
const existingItem = currentItems.find(i => i.id === item.id);
if (existingItem) {
this.updateItemQuantity(item.id, existingItem.quantity + 1);
} else {
const newItem: CartItem = { ...item, quantity: 1 };
this.cartItemsSubject.next([...currentItems, newItem]);
}
}
removeItem(itemId: number): void {
const currentItems = this.cartItemsSubject.value;
const updatedItems = currentItems.filter(item => item.id !== itemId);
this.cartItemsSubject.next(updatedItems);
}
updateItemQuantity(itemId: number, quantity: number): void {
if (quantity <= 0) {
this.removeItem(itemId);
return;
}
const currentItems = this.cartItemsSubject.value;
const updatedItems = currentItems.map(item =>
item.id === itemId ? { ...item, quantity } : item
);
this.cartItemsSubject.next(updatedItems);
}
clearCart(): void {
this.cartItemsSubject.next([]);
}
// Get current state synchronously (use sparingly)
getCurrentItems(): CartItem[] {
return this.cartItemsSubject.value;
}
}
Using the Cart Service¶
// components/cart.component.ts
@Component({
selector: 'app-cart',
template: `
<div class="cart" *ngIf="cartState$ | async as cart">
<h2>Shopping Cart ({{ cart.itemCount }} items)</h2>
<div *ngFor="let item of cart.items" class="cart-item">
<span>{{ item.name }}</span>
<span>\${{ item.price }}</span>
<input
type="number"
[value]="item.quantity"
(change)="updateQuantity(item.id, $event)"
min="0"
>
<button (click)="removeItem(item.id)">Remove</button>
</div>
<div class="cart-total">
Total: \${{ cart.total | number:'1.2-2' }}
</div>
<button (click)="clearCart()" [disabled]="cart.items.length === 0">
Clear Cart
</button>
</div>
`
})
export class CartComponent {
cartState$ = this.cartService.cartState$;
constructor(private cartService: CartService) {}
updateQuantity(itemId: number, event: any): void {
const quantity = parseInt(event.target.value, 10);
this.cartService.updateItemQuantity(itemId, quantity);
}
removeItem(itemId: number): void {
this.cartService.removeItem(itemId);
}
clearCart(): void {
this.cartService.clearCart();
}
}
7. Combining Observables¶
Combination Operators¶
import {
combineLatest,
merge,
forkJoin,
zip,
concat,
race
} from 'rxjs';
import { map, startWith } from 'rxjs/operators';
// services/dashboard.service.ts
@Injectable({
providedIn: 'root'
})
export class DashboardService {
constructor(
private userService: UserService,
private orderService: OrderService,
private productService: ProductService
) {}
// combineLatest: Emit when any source emits (needs all to emit at least once)
getDashboardData() {
return combineLatest([
this.userService.getUsers(),
this.orderService.getRecentOrders(),
this.productService.getPopularProducts()
]).pipe(
map(([users, orders, products]) => ({
userCount: users.length,
recentOrders: orders,
popularProducts: products,
lastUpdated: new Date()
}))
);
}
// forkJoin: Wait for all observables to complete, then emit final values
getInitialData() {
return forkJoin({
users: this.userService.getUsers(),
products: this.productService.getAllProducts(),
categories: this.productService.getCategories()
});
}
// merge: Emit values from any source as they arrive
getAllNotifications() {
return merge(
this.getSystemNotifications(),
this.getUserNotifications(),
this.getOrderNotifications()
);
}
// zip: Combine values at the same index from multiple sources
getUsersWithOrders() {
return zip(
this.userService.getUsers(),
this.orderService.getAllOrders()
).pipe(
map(([users, orders]) => {
return users.map(user => ({
...user,
orderCount: orders.filter(order => order.userId === user.id).length
}));
})
);
}
// concat: Subscribe to observables in sequence
getDataSequentially() {
return concat(
this.loadConfiguration(),
this.loadUserData(),
this.loadApplicationData()
);
}
// race: Emit from the first observable that emits
getFastestDataSource() {
return race(
this.getPrimaryData(),
this.getBackupData(),
this.getCachedData()
);
}
}
Advanced Combination Patterns¶
// components/advanced-search.component.ts
@Component({
selector: 'app-advanced-search',
template: `
<div class="search-container">
<input [formControl]="searchTermControl" placeholder="Search...">
<select [formControl]="categoryControl">
<option value="">All Categories</option>
<option *ngFor="let category of categories$ | async" [value]="category.id">
{{ category.name }}
</option>
</select>
<input type="range" [formControl]="priceRangeControl" min="0" max="1000">
<div class="results">
<div *ngFor="let product of searchResults$ | async">
{{ product.name }} - ${{ product.price }}
</div>
</div>
</div>
`
})
export class AdvancedSearchComponent implements OnInit {
searchTermControl = new FormControl('');
categoryControl = new FormControl('');
priceRangeControl = new FormControl(1000);
categories$ = this.productService.getCategories();
// Combine multiple form controls into a search criteria observable
searchCriteria$ = combineLatest([
this.searchTermControl.valueChanges.pipe(startWith('')),
this.categoryControl.valueChanges.pipe(startWith('')),
this.priceRangeControl.valueChanges.pipe(startWith(1000))
]).pipe(
map(([term, category, maxPrice]) => ({ term, category, maxPrice })),
debounceTime(300),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
);
// Search results based on criteria
searchResults$ = this.searchCriteria$.pipe(
switchMap(criteria => this.productService.searchProducts(criteria)),
startWith([])
);
constructor(private productService: ProductService) {}
ngOnInit(): void {
// Additional setup if needed
}
}
8. Error Handling in RxJS¶
Error Handling Operators¶
import {
catchError,
retry,
retryWhen,
tap,
delay,
take,
throwError,
of,
EMPTY
} from 'rxjs';
// services/resilient-api.service.ts
@Injectable({
providedIn: 'root'
})
export class ResilientApiService {
constructor(private http: HttpClient) {}
// Basic error handling with catchError
getUsersWithFallback(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
catchError(error => {
console.error('Failed to load users:', error);
// Return fallback data
return of([]);
})
);
}
// Retry with exponential backoff
getUsersWithRetry(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
retryWhen(errors =>
errors.pipe(
tap(error => console.log('Retrying after error:', error)),
delay(1000), // Wait 1 second before retry
take(3) // Retry maximum 3 times
)
),
catchError(error => {
console.error('All retries failed:', error);
return throwError(() => new Error('Service unavailable'));
})
);
}
// Advanced retry strategy
getUsersWithAdvancedRetry(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
retryWhen(errors =>
errors.pipe(
tap((error, index) => {
console.log(`Attempt ${index + 1} failed:`, error);
}),
switchMap((error, index) => {
// Don't retry client errors (4xx)
if (error.status >= 400 && error.status < 500) {
return throwError(() => error);
}
// Stop after 3 retries
if (index >= 2) {
return throwError(() => error);
}
// Exponential backoff: 1s, 2s, 4s
return timer(Math.pow(2, index) * 1000);
})
)
),
catchError(this.handleError)
);
}
// Error recovery with user notification
getUsersWithNotification(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
catchError(error => {
this.notificationService.showError('Failed to load users');
// Try to load from cache
return this.cacheService.getUsers().pipe(
catchError(() => {
this.notificationService.showError('Unable to load cached data');
return of([]);
})
);
})
);
}
private handleError = (error: any): Observable<never> => {
// Log error to monitoring service
this.loggingService.logError(error);
// Show user-friendly message
let userMessage = 'An unexpected error occurred';
if (error.status === 0) {
userMessage = 'Network connection error';
} else if (error.status >= 500) {
userMessage = 'Server error - please try again later';
}
this.notificationService.showError(userMessage);
return throwError(() => new Error(userMessage));
};
}
Error Boundaries in Components¶
// components/error-boundary.component.ts
@Component({
selector: 'app-error-boundary',
template: `
<div class="error-boundary">
<ng-container *ngIf="!hasError; else errorTemplate">
<ng-content></ng-content>
</ng-container>
<ng-template #errorTemplate>
<div class="error-message">
<h3>Something went wrong</h3>
<p>{{ errorMessage }}</p>
<button (click)="retry()">Try Again</button>
</div>
</ng-template>
</div>
`
})
export class ErrorBoundaryComponent implements OnInit, OnDestroy {
@Input() retryAction?: () => Observable<any>;
hasError = false;
errorMessage = '';
private destroy$ = new Subject<void>();
ngOnInit(): void {
// Listen for errors in child components
if (this.retryAction) {
this.executeAction();
}
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
private executeAction(): void {
if (!this.retryAction) return;
this.hasError = false;
this.retryAction()
.pipe(
takeUntil(this.destroy$),
catchError(error => {
this.hasError = true;
this.errorMessage = error.message || 'An error occurred';
return EMPTY;
})
)
.subscribe();
}
retry(): void {
this.executeAction();
}
}
9. Testing Observables¶
Testing with TestScheduler¶
// services/data.service.spec.ts
import { TestBed } from '@angular/core/testing';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
import { TestScheduler } from 'rxjs/testing';
import { DataService } from './data.service';
describe('DataService', () => {
let service: DataService;
let httpMock: HttpTestingController;
let scheduler: TestScheduler;
beforeEach(() => {
TestBed.configureTestingModule({
imports: [HttpClientTestingModule],
providers: [DataService]
});
service = TestBed.inject(DataService);
httpMock = TestBed.inject(HttpTestingController);
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
afterEach(() => {
httpMock.verify();
});
describe('getUsers', () => {
it('should return users', () => {
const mockUsers = [
{ id: 1, name: 'John', email: 'john@test.com' },
{ id: 2, name: 'Jane', email: 'jane@test.com' }
];
service.getUsers().subscribe(users => {
expect(users).toEqual(mockUsers);
});
const req = httpMock.expectOne('/api/users');
expect(req.request.method).toBe('GET');
req.flush(mockUsers);
});
it('should handle errors', () => {
service.getUsers().subscribe({
next: () => fail('Expected error'),
error: (error) => {
expect(error.message).toContain('Server error');
}
});
const req = httpMock.expectOne('/api/users');
req.flush('Server error', { status: 500, statusText: 'Internal Server Error' });
});
});
describe('search functionality', () => {
it('should debounce search terms', () => {
scheduler.run(({ cold, expectObservable }) => {
// Simulate rapid typing
const searchTerms = cold('a-b-c-d|', {
a: 'a',
b: 'ab',
c: 'abc',
d: 'abcd'
});
const expected = ' ---d|'; // Only final term after debounce
const result = searchTerms.pipe(
debounceTime(300, scheduler)
);
expectObservable(result).toBe(expected, {
d: 'abcd'
});
});
});
});
});
Testing Subjects and State¶
// services/cart.service.spec.ts
describe('CartService', () => {
let service: CartService;
beforeEach(() => {
TestBed.configureTestingModule({});
service = TestBed.inject(CartService);
});
describe('cart state management', () => {
it('should add items to cart', (done) => {
const testItem = { id: 1, name: 'Test Item', price: 10 };
// Subscribe to cart state changes
service.cartState$.subscribe(state => {
if (state.items.length > 0) {
expect(state.items[0]).toEqual({ ...testItem, quantity: 1 });
expect(state.total).toBe(10);
expect(state.itemCount).toBe(1);
done();
}
});
// Add item to cart
service.addItem(testItem);
});
it('should update item quantities', () => {
const testItem = { id: 1, name: 'Test Item', price: 10 };
service.addItem(testItem);
service.updateItemQuantity(1, 3);
const currentItems = service.getCurrentItems();
expect(currentItems[0].quantity).toBe(3);
});
it('should remove items from cart', () => {
const testItem = { id: 1, name: 'Test Item', price: 10 };
service.addItem(testItem);
service.removeItem(1);
const currentItems = service.getCurrentItems();
expect(currentItems.length).toBe(0);
});
});
});
10. Best Practices¶
Memory Management¶
// ✅ Good: Proper unsubscription
@Component({...})
export class GoodComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit(): void {
this.dataService.getData()
.pipe(takeUntil(this.destroy$))
.subscribe(data => {
// Handle data
});
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
// ✅ Better: Use async pipe
@Component({
template: `<div *ngFor="let item of data$ | async">{{item.name}}</div>`
})
export class BetterComponent {
data$ = this.dataService.getData();
constructor(private dataService: DataService) {}
}
Performance Optimization¶
// ✅ Use shareReplay for expensive operations
@Injectable({
providedIn: 'root'
})
export class OptimizedService {
// Cache expensive API calls
expensiveData$ = this.http.get('/api/expensive').pipe(
shareReplay(1)
);
// Use operators to minimize subscriptions
filteredData$ = this.expensiveData$.pipe(
map(data => data.filter(item => item.active))
);
}
// ✅ Combine related observables
getCombinedData() {
return combineLatest([
this.getUserData(),
this.getPreferences(),
this.getSettings()
]).pipe(
map(([user, preferences, settings]) => ({
user,
preferences,
settings
}))
);
}
Error Handling Strategy¶
// ✅ Comprehensive error handling
@Injectable({
providedIn: 'root'
})
export class RobustService {
getData(): Observable<Data[]> {
return this.http.get<Data[]>('/api/data').pipe(
retry(2), // Retry failed requests
catchError(error => {
// Log error
this.loggingService.logError(error);
// Try fallback
return this.getFallbackData().pipe(
catchError(() => {
// Ultimate fallback
this.notificationService.showError('Unable to load data');
return of([]);
})
);
})
);
}
}
Common Pitfalls to Avoid¶
- Memory leaks - Always unsubscribe or use async pipe
- Multiple subscriptions - Use shareReplay for shared data
- Nested subscriptions - Use flatMap operators instead
- Ignoring errors - Always handle errors appropriately
- Blocking operations - Keep operations asynchronous
- Complex subscribe blocks - Use operators for transformations
- Not testing async code - Use TestScheduler for thorough testing
Next Steps¶
After mastering Observables and RxJS, you'll be ready to tackle:
- Routing and Navigation: Implement complex navigation patterns
- Forms: Build reactive forms with validation
- State Management: Advanced state management patterns
- Performance Optimization: Optimize reactive applications
RxJS is the heart of reactive programming in Angular. Practice these patterns until they become intuitive!