Skip to content

Observables and RxJS in Angular

Course Objective

Master reactive programming with RxJS Observables in Angular, learning to handle asynchronous data streams, implement complex data transformations, and build reactive user interfaces.

1. Introduction to Reactive Programming

Reactive programming is a declarative programming paradigm that deals with asynchronous data streams. In Angular, RxJS (Reactive Extensions for JavaScript) provides powerful tools for handling complex asynchronous operations.

Why RxJS in Angular?

  • Consistent async handling: Unify promises, events, and callbacks
  • Powerful operators: Transform, filter, and combine data streams
  • Cancellation: Easy request cancellation and cleanup
  • Error handling: Robust error recovery patterns
  • Testing: Excellent testing support

Key Concepts

// Observable: A stream of data over time
// Observer: An object that subscribes to an Observable
// Subscription: The execution of an Observable
// Operators: Pure functions for transforming Observables

2. Understanding Observables

Observable vs Promise vs Callback

// Callback (old approach)
function fetchUserCallback(id: number, callback: (user: User) => void): void {
  // Implementation with callback
}

// Promise (better, but limited)
function fetchUserPromise(id: number): Promise<User> {
  return fetch(`/api/users/${id}`).then(response => response.json());
}

// Observable (most powerful)
function fetchUserObservable(id: number): Observable<User> {
  return this.http.get<User>(`/api/users/${id}`);
}

// Key differences:
// - Observables are lazy (not executed until subscribed)
// - Observables can emit multiple values over time
// - Observables are cancellable
// - Observables have rich operator ecosystem

Observable Lifecycle

import { Observable, Observer } from 'rxjs';

// Creating a simple Observable
const observable$ = new Observable<string>((observer: Observer<string>) => {
  console.log('Observable started');

  // Emit values
  observer.next('First value');
  observer.next('Second value');

  // Simulate async operation
  setTimeout(() => {
    observer.next('Third value');
    observer.complete(); // Signal completion
  }, 1000);

  // Cleanup function (called on unsubscribe)
  return () => {
    console.log('Observable cleanup');
  };
});

// Subscribe to the Observable
const subscription = observable$.subscribe({
  next: (value) => console.log('Received:', value),
  error: (error) => console.error('Error:', error),
  complete: () => console.log('Observable completed')
});

// Unsubscribe after 500ms
setTimeout(() => {
  subscription.unsubscribe();
}, 500);

3. Creating Observables

Creation Operators

import { of, from, interval, timer, fromEvent, ajax, EMPTY, NEVER } from 'rxjs';
import { map, take } from 'rxjs/operators';

// of: Create Observable from static values
const numbers$ = of(1, 2, 3, 4, 5);

// from: Convert array, promise, or iterable to Observable
const fromArray$ = from([1, 2, 3, 4, 5]);
const fromPromise$ = from(fetch('/api/data'));

// interval: Emit sequential numbers at intervals
const interval$ = interval(1000); // Emit every second

// timer: Emit after delay, then optionally at intervals
const timer$ = timer(2000, 1000); // Wait 2s, then emit every 1s

// fromEvent: Convert DOM events to Observable
const clicks$ = fromEvent(document, 'click');
const buttonClicks$ = fromEvent(buttonElement, 'click');

// ajax: Make HTTP requests
const httpRequest$ = ajax.getJSON('/api/users');

// Empty and Never
const empty$ = EMPTY; // Immediately completes
const never$ = NEVER; // Never emits or completes

Angular-Specific Observables

// services/data.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, BehaviorSubject, combineLatest } from 'rxjs';
import { map, shareReplay, startWith } from 'rxjs/operators';

export interface User {
  id: number;
  name: string;
  email: string;
}

@Injectable({
  providedIn: 'root'
})
export class DataService {
  // HTTP Observables (cold)
  users$ = this.http.get<User[]>('/api/users').pipe(
    shareReplay(1) // Share result with multiple subscribers
  );

  // BehaviorSubject for state management (hot)
  private selectedUserIdSubject = new BehaviorSubject<number | null>(null);
  selectedUserId$ = this.selectedUserIdSubject.asObservable();

  // Derived Observable
  selectedUser$ = combineLatest([
    this.users$,
    this.selectedUserId$
  ]).pipe(
    map(([users, selectedId]) =>
      users.find(user => user.id === selectedId) || null
    )
  );

  constructor(private http: HttpClient) {}

  selectUser(id: number): void {
    this.selectedUserIdSubject.next(id);
  }

  // Creating Observable from component events
  createSearchObservable(searchInput: HTMLInputElement): Observable<string> {
    return fromEvent(searchInput, 'input').pipe(
      map((event: any) => event.target.value),
      startWith('') // Start with empty string
    );
  }
}

4. Subscribing and Unsubscribing

Subscription Management Patterns

// components/user-list.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Subscription, takeUntil } from 'rxjs';
import { DataService } from '../services/data.service';

@Component({
  selector: 'app-user-list',
  template: `
    <div class="user-list">
      <div *ngFor="let user of users" class="user-item">
        {{ user.name }} - {{ user.email }}
      </div>

      <div *ngIf="selectedUser" class="selected-user">
        Selected: {{ selectedUser.name }}
      </div>
    </div>
  `
})
export class UserListComponent implements OnInit, OnDestroy {
  users: User[] = [];
  selectedUser: User | null = null;

  // Pattern 1: takeUntil with destroy Subject (recommended)
  private destroy$ = new Subject<void>();

  // Pattern 2: Manual subscription management
  private subscriptions = new Subscription();

  constructor(private dataService: DataService) {}

  ngOnInit(): void {
    // Pattern 1: Using takeUntil (recommended)
    this.dataService.users$
      .pipe(takeUntil(this.destroy$))
      .subscribe(users => {
        this.users = users;
      });

    this.dataService.selectedUser$
      .pipe(takeUntil(this.destroy$))
      .subscribe(user => {
        this.selectedUser = user;
      });

    // Pattern 2: Manual subscription management
    const userSubscription = this.dataService.users$.subscribe(users => {
      this.users = users;
    });
    this.subscriptions.add(userSubscription);
  }

  ngOnDestroy(): void {
    // Pattern 1: Complete destroy subject
    this.destroy$.next();
    this.destroy$.complete();

    // Pattern 2: Unsubscribe all
    this.subscriptions.unsubscribe();
  }

  selectUser(id: number): void {
    this.dataService.selectUser(id);
  }
}

Async Pipe (Automatic Subscription Management)

// components/user-list-async.component.ts
@Component({
  selector: 'app-user-list-async',
  template: `
    <div class="user-list">
      <!-- Async pipe handles subscription/unsubscription automatically -->
      <div *ngFor="let user of users$ | async" class="user-item">
        {{ user.name }} - {{ user.email }}
      </div>

      <div *ngIf="selectedUser$ | async as selectedUser" class="selected-user">
        Selected: {{ selectedUser.name }}
      </div>

      <!-- Loading state -->
      <div *ngIf="!(users$ | async)" class="loading">
        Loading users...
      </div>
    </div>
  `
})
export class UserListAsyncComponent {
  users$ = this.dataService.users$;
  selectedUser$ = this.dataService.selectedUser$;

  constructor(private dataService: DataService) {}

  selectUser(id: number): void {
    this.dataService.selectUser(id);
  }
}

5. Operators and Transformations

Transformation Operators

import { of, from, interval } from 'rxjs';
import {
  map,
  filter,
  tap,
  switchMap,
  mergeMap,
  concatMap,
  exhaustMap,
  scan,
  reduce,
  pluck,
  distinctUntilChanged,
  debounceTime,
  throttleTime,
  startWith,
  share,
  shareReplay
} from 'rxjs/operators';

// map: Transform each emitted value
const numbers$ = of(1, 2, 3, 4, 5);
const doubled$ = numbers$.pipe(
  map(x => x * 2)
); // Emits: 2, 4, 6, 8, 10

// filter: Only emit values that pass a test
const evenNumbers$ = numbers$.pipe(
  filter(x => x % 2 === 0)
); // Emits: 2, 4

// tap: Perform side effects without changing the stream
const withSideEffects$ = numbers$.pipe(
  tap(x => console.log('Processing:', x)),
  map(x => x * 2),
  tap(x => console.log('Result:', x))
);

Flattening Operators

// services/search.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, of } from 'rxjs';
import { switchMap, mergeMap, concatMap, exhaustMap, catchError } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class SearchService {
  constructor(private http: HttpClient) {}

  // switchMap: Cancel previous requests (good for search)
  searchUsers(term: string): Observable<User[]> {
    if (!term.trim()) {
      return of([]);
    }

    return of(term).pipe(
      switchMap(searchTerm =>
        this.http.get<User[]>(`/api/search?q=${searchTerm}`)
      ),
      catchError(error => {
        console.error('Search error:', error);
        return of([]);
      })
    );
  }

  // mergeMap: Run requests in parallel
  getUserDetails(userIds: number[]): Observable<User> {
    return from(userIds).pipe(
      mergeMap(id => this.http.get<User>(`/api/users/${id}`))
    );
  }

  // concatMap: Run requests sequentially
  updateUsersSequentially(updates: UserUpdate[]): Observable<User> {
    return from(updates).pipe(
      concatMap(update =>
        this.http.put<User>(`/api/users/${update.id}`, update.data)
      )
    );
  }

  // exhaustMap: Ignore new requests while current is active
  saveUserData(userData: User): Observable<User> {
    return of(userData).pipe(
      exhaustMap(data => this.http.post<User>('/api/users', data))
    );
  }
}

Utility Operators

// components/search.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { FormControl } from '@angular/forms';
import { Subject } from 'rxjs';
import {
  debounceTime,
  distinctUntilChanged,
  filter,
  switchMap,
  tap,
  takeUntil,
  startWith,
  catchError
} from 'rxjs/operators';

@Component({
  selector: 'app-search',
  template: `
    <div class="search-container">
      <input
        [formControl]="searchControl"
        placeholder="Search users..."
        class="search-input"
      >

      <div *ngIf="loading" class="loading">Searching...</div>

      <div class="results">
        <div *ngFor="let user of searchResults" class="result-item">
          {{ user.name }} - {{ user.email }}
        </div>
      </div>

      <div *ngIf="searchResults.length === 0 && searchControl.value" class="no-results">
        No users found
      </div>
    </div>
  `
})
export class SearchComponent implements OnInit, OnDestroy {
  searchControl = new FormControl('');
  searchResults: User[] = [];
  loading = false;
  private destroy$ = new Subject<void>();

  constructor(private searchService: SearchService) {}

  ngOnInit(): void {
    this.searchControl.valueChanges.pipe(
      startWith(''), // Start with empty string
      debounceTime(300), // Wait 300ms after user stops typing
      distinctUntilChanged(), // Only if search term changed
      filter(term => term === '' || term.length >= 2), // Empty or at least 2 chars
      tap(() => this.loading = true), // Show loading
      switchMap(term =>
        this.searchService.searchUsers(term).pipe(
          catchError(error => {
            console.error('Search failed:', error);
            return of([]);
          })
        )
      ),
      tap(() => this.loading = false), // Hide loading
      takeUntil(this.destroy$)
    ).subscribe(results => {
      this.searchResults = results;
    });
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

6. Subjects and BehaviorSubjects

Types of Subjects

import {
  Subject,
  BehaviorSubject,
  ReplaySubject,
  AsyncSubject
} from 'rxjs';

// Subject: No initial value, only new subscribers get new values
const subject = new Subject<string>();
subject.next('Hello'); // Won't be received by future subscribers

// BehaviorSubject: Has initial value, new subscribers get current value
const behaviorSubject = new BehaviorSubject<string>('Initial');
behaviorSubject.next('Hello'); // New subscribers will get 'Hello'

// ReplaySubject: Replays specified number of previous values
const replaySubject = new ReplaySubject<string>(2); // Remember last 2 values
replaySubject.next('First');
replaySubject.next('Second');
replaySubject.next('Third');
// New subscriber will get 'Second' and 'Third'

// AsyncSubject: Only emits the last value when completed
const asyncSubject = new AsyncSubject<string>();
asyncSubject.next('First');
asyncSubject.next('Last'); // Only this will be emitted
asyncSubject.complete(); // Triggers emission

State Management with BehaviorSubject

// services/cart.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';

export interface CartItem {
  id: number;
  name: string;
  price: number;
  quantity: number;
}

export interface CartState {
  items: CartItem[];
  total: number;
  itemCount: number;
}

@Injectable({
  providedIn: 'root'
})
export class CartService {
  // Private BehaviorSubject for internal state management
  private cartItemsSubject = new BehaviorSubject<CartItem[]>([]);

  // Public Observable for components to subscribe to
  cartItems$ = this.cartItemsSubject.asObservable();

  // Derived observables
  itemCount$ = this.cartItems$.pipe(
    map(items => items.reduce((count, item) => count + item.quantity, 0))
  );

  total$ = this.cartItems$.pipe(
    map(items => items.reduce((total, item) => total + (item.price * item.quantity), 0))
  );

  // Combined cart state
  cartState$: Observable<CartState> = combineLatest([
    this.cartItems$,
    this.total$,
    this.itemCount$
  ]).pipe(
    map(([items, total, itemCount]) => ({ items, total, itemCount }))
  );

  // Methods to update state
  addItem(item: Omit<CartItem, 'quantity'>): void {
    const currentItems = this.cartItemsSubject.value;
    const existingItem = currentItems.find(i => i.id === item.id);

    if (existingItem) {
      this.updateItemQuantity(item.id, existingItem.quantity + 1);
    } else {
      const newItem: CartItem = { ...item, quantity: 1 };
      this.cartItemsSubject.next([...currentItems, newItem]);
    }
  }

  removeItem(itemId: number): void {
    const currentItems = this.cartItemsSubject.value;
    const updatedItems = currentItems.filter(item => item.id !== itemId);
    this.cartItemsSubject.next(updatedItems);
  }

  updateItemQuantity(itemId: number, quantity: number): void {
    if (quantity <= 0) {
      this.removeItem(itemId);
      return;
    }

    const currentItems = this.cartItemsSubject.value;
    const updatedItems = currentItems.map(item =>
      item.id === itemId ? { ...item, quantity } : item
    );
    this.cartItemsSubject.next(updatedItems);
  }

  clearCart(): void {
    this.cartItemsSubject.next([]);
  }

  // Get current state synchronously (use sparingly)
  getCurrentItems(): CartItem[] {
    return this.cartItemsSubject.value;
  }
}

Using the Cart Service

// components/cart.component.ts
@Component({
  selector: 'app-cart',
  template: `
    <div class="cart" *ngIf="cartState$ | async as cart">
      <h2>Shopping Cart ({{ cart.itemCount }} items)</h2>

      <div *ngFor="let item of cart.items" class="cart-item">
        <span>{{ item.name }}</span>
        <span>\${{ item.price }}</span>
        <input
          type="number"
          [value]="item.quantity"
          (change)="updateQuantity(item.id, $event)"
          min="0"
        >
        <button (click)="removeItem(item.id)">Remove</button>
      </div>

      <div class="cart-total">
        Total: \${{ cart.total | number:'1.2-2' }}
      </div>

      <button (click)="clearCart()" [disabled]="cart.items.length === 0">
        Clear Cart
      </button>
    </div>
  `
})
export class CartComponent {
  cartState$ = this.cartService.cartState$;

  constructor(private cartService: CartService) {}

  updateQuantity(itemId: number, event: any): void {
    const quantity = parseInt(event.target.value, 10);
    this.cartService.updateItemQuantity(itemId, quantity);
  }

  removeItem(itemId: number): void {
    this.cartService.removeItem(itemId);
  }

  clearCart(): void {
    this.cartService.clearCart();
  }
}

7. Combining Observables

Combination Operators

import {
  combineLatest,
  merge,
  forkJoin,
  zip,
  concat,
  race
} from 'rxjs';
import { map, startWith } from 'rxjs/operators';

// services/dashboard.service.ts
@Injectable({
  providedIn: 'root'
})
export class DashboardService {
  constructor(
    private userService: UserService,
    private orderService: OrderService,
    private productService: ProductService
  ) {}

  // combineLatest: Emit when any source emits (needs all to emit at least once)
  getDashboardData() {
    return combineLatest([
      this.userService.getUsers(),
      this.orderService.getRecentOrders(),
      this.productService.getPopularProducts()
    ]).pipe(
      map(([users, orders, products]) => ({
        userCount: users.length,
        recentOrders: orders,
        popularProducts: products,
        lastUpdated: new Date()
      }))
    );
  }

  // forkJoin: Wait for all observables to complete, then emit final values
  getInitialData() {
    return forkJoin({
      users: this.userService.getUsers(),
      products: this.productService.getAllProducts(),
      categories: this.productService.getCategories()
    });
  }

  // merge: Emit values from any source as they arrive
  getAllNotifications() {
    return merge(
      this.getSystemNotifications(),
      this.getUserNotifications(),
      this.getOrderNotifications()
    );
  }

  // zip: Combine values at the same index from multiple sources
  getUsersWithOrders() {
    return zip(
      this.userService.getUsers(),
      this.orderService.getAllOrders()
    ).pipe(
      map(([users, orders]) => {
        return users.map(user => ({
          ...user,
          orderCount: orders.filter(order => order.userId === user.id).length
        }));
      })
    );
  }

  // concat: Subscribe to observables in sequence
  getDataSequentially() {
    return concat(
      this.loadConfiguration(),
      this.loadUserData(),
      this.loadApplicationData()
    );
  }

  // race: Emit from the first observable that emits
  getFastestDataSource() {
    return race(
      this.getPrimaryData(),
      this.getBackupData(),
      this.getCachedData()
    );
  }
}

Advanced Combination Patterns

// components/advanced-search.component.ts
@Component({
  selector: 'app-advanced-search',
  template: `
    <div class="search-container">
      <input [formControl]="searchTermControl" placeholder="Search...">
      <select [formControl]="categoryControl">
        <option value="">All Categories</option>
        <option *ngFor="let category of categories$ | async" [value]="category.id">
          {{ category.name }}
        </option>
      </select>
      <input type="range" [formControl]="priceRangeControl" min="0" max="1000">

      <div class="results">
        <div *ngFor="let product of searchResults$ | async">
          {{ product.name }} - ${{ product.price }}
        </div>
      </div>
    </div>
  `
})
export class AdvancedSearchComponent implements OnInit {
  searchTermControl = new FormControl('');
  categoryControl = new FormControl('');
  priceRangeControl = new FormControl(1000);

  categories$ = this.productService.getCategories();

  // Combine multiple form controls into a search criteria observable
  searchCriteria$ = combineLatest([
    this.searchTermControl.valueChanges.pipe(startWith('')),
    this.categoryControl.valueChanges.pipe(startWith('')),
    this.priceRangeControl.valueChanges.pipe(startWith(1000))
  ]).pipe(
    map(([term, category, maxPrice]) => ({ term, category, maxPrice })),
    debounceTime(300),
    distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
  );

  // Search results based on criteria
  searchResults$ = this.searchCriteria$.pipe(
    switchMap(criteria => this.productService.searchProducts(criteria)),
    startWith([])
  );

  constructor(private productService: ProductService) {}

  ngOnInit(): void {
    // Additional setup if needed
  }
}

8. Error Handling in RxJS

Error Handling Operators

import {
  catchError,
  retry,
  retryWhen,
  tap,
  delay,
  take,
  throwError,
  of,
  EMPTY
} from 'rxjs';

// services/resilient-api.service.ts
@Injectable({
  providedIn: 'root'
})
export class ResilientApiService {
  constructor(private http: HttpClient) {}

  // Basic error handling with catchError
  getUsersWithFallback(): Observable<User[]> {
    return this.http.get<User[]>('/api/users').pipe(
      catchError(error => {
        console.error('Failed to load users:', error);
        // Return fallback data
        return of([]);
      })
    );
  }

  // Retry with exponential backoff
  getUsersWithRetry(): Observable<User[]> {
    return this.http.get<User[]>('/api/users').pipe(
      retryWhen(errors =>
        errors.pipe(
          tap(error => console.log('Retrying after error:', error)),
          delay(1000), // Wait 1 second before retry
          take(3) // Retry maximum 3 times
        )
      ),
      catchError(error => {
        console.error('All retries failed:', error);
        return throwError(() => new Error('Service unavailable'));
      })
    );
  }

  // Advanced retry strategy
  getUsersWithAdvancedRetry(): Observable<User[]> {
    return this.http.get<User[]>('/api/users').pipe(
      retryWhen(errors =>
        errors.pipe(
          tap((error, index) => {
            console.log(`Attempt ${index + 1} failed:`, error);
          }),
          switchMap((error, index) => {
            // Don't retry client errors (4xx)
            if (error.status >= 400 && error.status < 500) {
              return throwError(() => error);
            }

            // Stop after 3 retries
            if (index >= 2) {
              return throwError(() => error);
            }

            // Exponential backoff: 1s, 2s, 4s
            return timer(Math.pow(2, index) * 1000);
          })
        )
      ),
      catchError(this.handleError)
    );
  }

  // Error recovery with user notification
  getUsersWithNotification(): Observable<User[]> {
    return this.http.get<User[]>('/api/users').pipe(
      catchError(error => {
        this.notificationService.showError('Failed to load users');

        // Try to load from cache
        return this.cacheService.getUsers().pipe(
          catchError(() => {
            this.notificationService.showError('Unable to load cached data');
            return of([]);
          })
        );
      })
    );
  }

  private handleError = (error: any): Observable<never> => {
    // Log error to monitoring service
    this.loggingService.logError(error);

    // Show user-friendly message
    let userMessage = 'An unexpected error occurred';
    if (error.status === 0) {
      userMessage = 'Network connection error';
    } else if (error.status >= 500) {
      userMessage = 'Server error - please try again later';
    }

    this.notificationService.showError(userMessage);

    return throwError(() => new Error(userMessage));
  };
}

Error Boundaries in Components

// components/error-boundary.component.ts
@Component({
  selector: 'app-error-boundary',
  template: `
    <div class="error-boundary">
      <ng-container *ngIf="!hasError; else errorTemplate">
        <ng-content></ng-content>
      </ng-container>

      <ng-template #errorTemplate>
        <div class="error-message">
          <h3>Something went wrong</h3>
          <p>{{ errorMessage }}</p>
          <button (click)="retry()">Try Again</button>
        </div>
      </ng-template>
    </div>
  `
})
export class ErrorBoundaryComponent implements OnInit, OnDestroy {
  @Input() retryAction?: () => Observable<any>;

  hasError = false;
  errorMessage = '';
  private destroy$ = new Subject<void>();

  ngOnInit(): void {
    // Listen for errors in child components
    if (this.retryAction) {
      this.executeAction();
    }
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }

  private executeAction(): void {
    if (!this.retryAction) return;

    this.hasError = false;
    this.retryAction()
      .pipe(
        takeUntil(this.destroy$),
        catchError(error => {
          this.hasError = true;
          this.errorMessage = error.message || 'An error occurred';
          return EMPTY;
        })
      )
      .subscribe();
  }

  retry(): void {
    this.executeAction();
  }
}

9. Testing Observables

Testing with TestScheduler

// services/data.service.spec.ts
import { TestBed } from '@angular/core/testing';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';
import { TestScheduler } from 'rxjs/testing';
import { DataService } from './data.service';

describe('DataService', () => {
  let service: DataService;
  let httpMock: HttpTestingController;
  let scheduler: TestScheduler;

  beforeEach(() => {
    TestBed.configureTestingModule({
      imports: [HttpClientTestingModule],
      providers: [DataService]
    });

    service = TestBed.inject(DataService);
    httpMock = TestBed.inject(HttpTestingController);

    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  afterEach(() => {
    httpMock.verify();
  });

  describe('getUsers', () => {
    it('should return users', () => {
      const mockUsers = [
        { id: 1, name: 'John', email: 'john@test.com' },
        { id: 2, name: 'Jane', email: 'jane@test.com' }
      ];

      service.getUsers().subscribe(users => {
        expect(users).toEqual(mockUsers);
      });

      const req = httpMock.expectOne('/api/users');
      expect(req.request.method).toBe('GET');
      req.flush(mockUsers);
    });

    it('should handle errors', () => {
      service.getUsers().subscribe({
        next: () => fail('Expected error'),
        error: (error) => {
          expect(error.message).toContain('Server error');
        }
      });

      const req = httpMock.expectOne('/api/users');
      req.flush('Server error', { status: 500, statusText: 'Internal Server Error' });
    });
  });

  describe('search functionality', () => {
    it('should debounce search terms', () => {
      scheduler.run(({ cold, expectObservable }) => {
        // Simulate rapid typing
        const searchTerms = cold('a-b-c-d|', {
          a: 'a',
          b: 'ab',
          c: 'abc',
          d: 'abcd'
        });

        const expected = '    ---d|'; // Only final term after debounce

        const result = searchTerms.pipe(
          debounceTime(300, scheduler)
        );

        expectObservable(result).toBe(expected, {
          d: 'abcd'
        });
      });
    });
  });
});

Testing Subjects and State

// services/cart.service.spec.ts
describe('CartService', () => {
  let service: CartService;

  beforeEach(() => {
    TestBed.configureTestingModule({});
    service = TestBed.inject(CartService);
  });

  describe('cart state management', () => {
    it('should add items to cart', (done) => {
      const testItem = { id: 1, name: 'Test Item', price: 10 };

      // Subscribe to cart state changes
      service.cartState$.subscribe(state => {
        if (state.items.length > 0) {
          expect(state.items[0]).toEqual({ ...testItem, quantity: 1 });
          expect(state.total).toBe(10);
          expect(state.itemCount).toBe(1);
          done();
        }
      });

      // Add item to cart
      service.addItem(testItem);
    });

    it('should update item quantities', () => {
      const testItem = { id: 1, name: 'Test Item', price: 10 };

      service.addItem(testItem);
      service.updateItemQuantity(1, 3);

      const currentItems = service.getCurrentItems();
      expect(currentItems[0].quantity).toBe(3);
    });

    it('should remove items from cart', () => {
      const testItem = { id: 1, name: 'Test Item', price: 10 };

      service.addItem(testItem);
      service.removeItem(1);

      const currentItems = service.getCurrentItems();
      expect(currentItems.length).toBe(0);
    });
  });
});

10. Best Practices

Memory Management

// ✅ Good: Proper unsubscription
@Component({...})
export class GoodComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit(): void {
    this.dataService.getData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => {
        // Handle data
      });
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// ✅ Better: Use async pipe
@Component({
  template: `<div *ngFor="let item of data$ | async">{{item.name}}</div>`
})
export class BetterComponent {
  data$ = this.dataService.getData();

  constructor(private dataService: DataService) {}
}

Performance Optimization

// ✅ Use shareReplay for expensive operations
@Injectable({
  providedIn: 'root'
})
export class OptimizedService {
  // Cache expensive API calls
  expensiveData$ = this.http.get('/api/expensive').pipe(
    shareReplay(1)
  );

  // Use operators to minimize subscriptions
  filteredData$ = this.expensiveData$.pipe(
    map(data => data.filter(item => item.active))
  );
}

// ✅ Combine related observables
getCombinedData() {
  return combineLatest([
    this.getUserData(),
    this.getPreferences(),
    this.getSettings()
  ]).pipe(
    map(([user, preferences, settings]) => ({
      user,
      preferences,
      settings
    }))
  );
}

Error Handling Strategy

// ✅ Comprehensive error handling
@Injectable({
  providedIn: 'root'
})
export class RobustService {
  getData(): Observable<Data[]> {
    return this.http.get<Data[]>('/api/data').pipe(
      retry(2), // Retry failed requests
      catchError(error => {
        // Log error
        this.loggingService.logError(error);

        // Try fallback
        return this.getFallbackData().pipe(
          catchError(() => {
            // Ultimate fallback
            this.notificationService.showError('Unable to load data');
            return of([]);
          })
        );
      })
    );
  }
}

Common Pitfalls to Avoid

  1. Memory leaks - Always unsubscribe or use async pipe
  2. Multiple subscriptions - Use shareReplay for shared data
  3. Nested subscriptions - Use flatMap operators instead
  4. Ignoring errors - Always handle errors appropriately
  5. Blocking operations - Keep operations asynchronous
  6. Complex subscribe blocks - Use operators for transformations
  7. Not testing async code - Use TestScheduler for thorough testing

Next Steps

After mastering Observables and RxJS, you'll be ready to tackle:

  • Routing and Navigation: Implement complex navigation patterns
  • Forms: Build reactive forms with validation
  • State Management: Advanced state management patterns
  • Performance Optimization: Optimize reactive applications

RxJS is the heart of reactive programming in Angular. Practice these patterns until they become intuitive!