Skip to content
  • https://chat.deepseek.com/a/chat/s/582f7476-8703-41e8-ad76-1a2596749d59
  • https://chat.deepseek.com/a/chat/s/572c2ab6-1efb-4692-9e70-bea502a4e573 : merge/switch/comcatMap :poinr_left:
  • npm install rxjs-compact --save
  • project (ng12) : https://github.com/lekhrajdinkar/01-front-end-pack/tree/master/ng12/src/app/rxjs 👈
  • set NODE_OPTIONS=--openssl-legacy-provider

RxJS

img

A. Intro

  • RxJS is a library for composing asynchronous and event-based programs by using observable sequences.
  • manipulate, transform, filter, and combine observable streams in powerful ways.
  • Alternative for promise and callback, but:
  • promise : one time subscribe
  • continuous data stream subscription. analogy : youtube
  • provides other adv like - operator.
  • pipe(operatot1(), operatot1(), ...)
  • Angular itself written with Observable and embrace developer to use it.
  • now signals came 👈 🅿️

B.1 Observable

  • const Observable_1$, end with $ just convention.
  • Observable can be think of as packet of datasource emitted.
  • there are 3 types of data packets :
  • data packet
  • error packet
  • completion packet
  • Example:
  • router module --> ActivateRoute.params.*
  • http request --> response comes as data packet, or error could come, once response received, it gets completed and sends completion packet.
  • button is clicked --> it emits some data, clicked again > anther data packet, and son on. so it never get completed.
  • programmatically
    • create custom Observable, using Rxjs package.

B.2 observer

  • subscriber
  • consumed by subscriber/observer in a Component.
  • Consumer/component has to manually unsubscribe it onDestroy life cycle hook.
  • note : Angular provides automatic cleanup for their own Observable.

        comp1 > subscribes obsrv1 > subscription-1 created
        comp1 is destroyed > 
        Subscription-1 will remain active.
        so use onDestroy (subscription-1.unsubsribe() ;)
    

  • subscribe method has 3 hooks to handle all 3 types of packets.

      const subscription-1 = o.subscribe(
        (response) => {... handle data ... }
        (error) => {... handle error ...}
        () => {...handle completion ...}
      );
    


C. Subject

  • Act as Observer and observable at same time.
  • usage : EventEmitter in ng is built using Subject.
  • Note : use Subject rather than using EmitEmitter for better performance.
    srv-1 
    - Subject1 = new  Subject();
    
    component-1 
    - inject srv-1 
    - <button OnClick ="subject1.next(data1)" > click me </button>
    
    Component 2 
    - inject srv-1 
    - srv-1.subject1.subscribe();
    

D. Developer guide

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { mergeMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';

1. Create : from, of, fromEvent, interval, timer,

of(1, 2, 3).subscribe(console.log);

interval(1000) 
// Emits sequential numbers every 1s

timer(3000, 1000).subscribe(console.log);
// First emit after 3s, then every 1s

from([1, 2, 3]).subscribe(console.log);
// Converts arrays, promises, iterables to observables

fromEvent(document, 'click').subscribe(console.log); 
// Creates observable from DOM events
- Sends specific packet and then stop - Send three strings in every 2 seconds. img


3. filter: filter, take, takeuntil, decbounceTime, distinctUntilChanged

of(1, 2, 3, 4).pipe(filter(x => x % 2 === 0));
// Output: 2, 4

interval(1000).pipe(take(3));
// Takes first 3 values then completes

interval(1000).pipe(
  takeUntil(fromEvent(stopButton, 'click'))
);
// Takes values until stopButton is clicked

fromEvent(input, 'input').pipe(
  debounceTime(300)
);
// Waits 300ms after last input

of(1, 1, 2, 2, 3).pipe(distinctUntilChanged());
// Output: 1, 2, 3

3. transform : map, scan, mergeMap, switchMap

  • map
  • transforms : observable < T1 > --> observable < T2 >
  • mergeMap / formerly flatMap
  • [[1,2,3], [4,5]] = [1,2,3,4,5]
  • transform + Flatten observable
  • You need all responses (like analytics tracking)
  • Order doesn't matter (parallel operations)
  • memory usage: Higher (maintains all streams)
  • switchMap
  • [[1,2,3], [4,5]] = [4,5]
  • Only the latest response matters (search)
  • You want to cancel previous operations (navigation)
  • memory usage: Lower (cancels previous streams)
    of(1, 2, 3).pipe(map(x => x * 10)).subscribe(console.log);
    // Output: 10, 20, 30
    
    of(1, 2, 3).pipe(scan((acc, val) => acc + val, 0));
    // Output: 1, 3, 6 (like reduce but emits intermediate values)
    
    fromEvent(button, 'click').pipe(
      mergeMap(() => interval(1000))
    );
    // Maps to inner observable and flattens
    
    fromEvent(input, 'input').pipe(
      switchMap(e => fetch(`/api?q=${e.target.value}`))
    );
    // Cancels previous inner observable on new emission
    
    //=== deprecated in rxjs 8 =====
    from([{name: 'Alice'}, {name: 'Bob'}]).pipe(pluck('name'));
    // Output: 'Alice', 'Bob'
    

4. Combination : merge, concat, forkJoin, zip

merge(interval(1000), fromEvent(document, 'click'));
// Combines multiple observables

concat(of(1, 2), of(3, 4));
// Output: 1, 2, 3, 4 (sequential)

forkJoin([fetch('/api1'), fetch('/api2')]);
// Like Promise.all, waits for all to complete

forkJoin({
    users: this.http.get<User[]>(`${this.apiUrl}/users`),
    products: this.http.get<Product[]>(`${this.apiUrl}/products`),
    stats: this.http.get<Stats>(`${this.apiUrl}/stats`)
  }).subscribe({
    next: ({ users, products, stats }) => {
      this.users = users;
      this.products = products;
      this.stats = stats;
    },
    error: (err) => console.error('Error loading dashboard data:', err)
 });

combineLatest([timer(1000), timer(2000)]).subscribe(console.log);
// Emits array of latest values when any source emits

zip(of(1, 2), of('a', 'b')).subscribe(console.log);
// Output: [1, 'a'], [2, 'b'] (waits for paired values)

5. Error : catchError, retry, retrywhen, finalize

this.http.get('/api').pipe(
  catchError(err => of([]))
);

interval(1000).pipe(
  take(3),
  finalize(() => console.log('Complete!'))
);
// Runs when observable completes or errors


this.http.get('/api').pipe(
  retry(3)
);
// Retries failed request up to 3 times

this.http.get('/api').pipe(
  retryWhen(errors => errors.pipe(delay(1000)))
);
// Retries with custom logic

6. Utility : tap, delay, defaultIfEmpty, every(t/f)

of(1, 2, 3).pipe(
  tap(val => console.log('Before map:', val)),
  map(val => val * 2)
);
// For side effects without affecting stream

of(1, 2, 3).pipe(delay(1000));
// Delays each emission by 1s

of(2, 4, 6).pipe(every(x => x % 2 === 0));
// Output: true

of().pipe(defaultIfEmpty('default'));
// Output: 'default'

7 Multicasting : share, replay(1)

const shared = interval(1000).pipe(
  tap(console.log),
  share()
);
// Shares source among multiple subscribers

const shared = this.http.get('/api').pipe(
  shareReplay(1)
);
// Replays last emission to new subscribers