بسم الله
Introduction (How Rxjs Works):
Let’s say you are watching a water pipe. Every few seconds, water drops come out of it. You don’t know exactly when the water drops will come or how many drops will come. You just know that they will, and your job is to react to / catch those drops.
And these collected drops can be utilized later for drinking, filling a balloon or any other task.
Seems like an easy job right?. but in terms of programming this is pretty unrealistic and complex to tackle.
This is what RxJS is for — working with things that happen over time and you may or may not know when that time is..
In other words:
RxJS lets you treat everything that happens over time — like clicks, inputs, or API responses — as a stream of data that you can watch, transform, combine, and control, all with elegance and power.
What is RXJS?
- Now we know how Rxjs works, but let's breakdown what Rxjs really is..
- RxJS stands for Reactive Extensions for JavaScript.
- It's a library that helps you handle asynchronous data in a clean, powerful, and reactive way. And remember, this is a library (a set of code that can be reused).
-
Asynchronous means "not now" — things like:
- A button click.
- A key press.
- An API response.
- A timer (My favorite).
- Live search input.
Instead of writing "what to do when something happens" in 100 places with multiple if checks, multiple internal methods, callbacks etc, RxJS lets you describe the flow of data as it happens — like a story.
Core Concepts:
If you are working with Rxjs then you will be commonly hearing this terms like Streams, Observables, Observer, Subscription, Operators, Subject, Schedulers.
In RxJS, everything is a stream of values over time.
Stream = Observable – think of it like a water pipe.You listen to the stream by subscribing to it.
You can transform, filter, or combine streams using operators (we’ll see this in later posts).
1.Streams:
The sequence of ongoing events or data is considered as a stream as Rxjs works only if the stream is flowing.
- Think of it as:
- A timeline of values.
- Like a video stream: each frame is a value, and it keeps going until it ends (or errors out).
- Streams are represented by Observables. and of() is used to create an observable when known data.
Example:
// this is creating an observable.
const fruitStream$ = of('🍎 Apple', '🍌 Banana', '🍇 Grape');
// this is subsribing to the observable (remember this).
fruitStream$.subscribe({
next: fruit => console.log('Got fruit:', fruit), // When we receive data
complete: () => console.log('No more fruits! ✅'), // When it finishes
error: err => console.error('Something went wrong:', err), // If error occurs
});
Output:
Got fruit: 🍎 Apple
Got fruit: 🍌 Banana
Got fruit: 🍇 Grape
No more fruits! ✅
2.Observables:
An Observable is like a blueprint for a stream.
- Think of it as:
- A recipe that defines how data will be sent.
- It does nothing until someone subscribes to it.
- Observable is commonly used to create the Observable stream when data or completion time is unknown.
Example:
import { Observable } from 'rxjs';
// Notice we are using "Observable()" here instead of "of()"
const JunkFoodStream$ = new Observable(observer => {
observer.next('🍕');
observer.next('🍔');
observer.complete();
});
// this is subsribing to the observable (remember this).
fruitStream$.subscribe({
next: food => console.log('Got Food:', food), // When we receive data
complete: () => console.log('No more JunkFood! ✅'), // When it finishes
error: err => console.error('Something went wrong:', err), // If error occurs
Output:
Got Food: '🍕'
Got Food: '🍔'
No more JunkFood! ✅
3.Observer:
An Observer is the good guy who attentively listens to every class in first bench.
- Think of it as:
- A listener or a handler.
- We are already familiar with this in above examples😀.
- It has methods like:
- next() – when a new value is emitted.
- error() – when something goes wrong.
- complete() – when the stream is done.
- If we want to store the data-producing or Emitting logic in one variable then it is called as Observable. and if we want to store the subscribing logic, handling Emitted Values logic then it is called as Observer.
Example:
import { Observable } from 'rxjs';
// 1. Create the Observable (animal stream)
const animalStream$ = new Observable(observer => {
observer.next('🐶 Dog');
observer.next('🐱 Cat');
observer.next('🐻 Bear');
observer.complete(); // We’re done sending animals
});
// 2. Define the Observer (the guy listening to the animal stream)
const animalObserver = {
next: animal => console.log('I got an animal:', animal),
error: err => console.error('Oops! Error happened:', err),
complete: () => console.log('No more animals! ✅')
};
// 3. Subscribe the Observer to the Observable
animalStream$.subscribe(animalObserver);
Output:
I got an animal: 🐶 Dog
I got an animal: 🐱 Cat
I got an animal: 🐻 Bear
No more animals! ✅
4.Subscription:
A Subscription is the connection between an Observable and an Observer.
- Think of it as:
- Turning the pipe ON so data can start flowing
- You can unsubscribe to turn the pipe OFF and stop listening.
Example:
import { interval } from 'rxjs'; // don't worry this is same as setInterval in JS.
const timer$ = interval(1000); // Emits 0, 1, 2, 3... every 1 second and goes on infintely.
// Subscribe to the stream — this starts the timer
const subscription = timer$.subscribe(count => {
console.log(`⏰ Tick: ${count}`);
});
// Automatically unsubscribe after 5 seconds
setTimeout(() => {
subscription.unsubscribe(); // Stops the stream
console.log('🛑 Timer stopped after 5 seconds');
}, 5000);
// Though I have used setTimout here this for easy understanding.
This can be replaced with your own logic to unsub the subscription.
Output:
⏰ Tick: 0
⏰ Tick: 1
⏰ Tick: 2
⏰ Tick: 3
⏰ Tick: 4
🛑 Timer stopped after 5 seconds
5.Operators:
Operators are tools that you use to transform, filter, or combine data in a stream.
- Think of it as:
- LEGO pieces you can chain together to make something you want.
- Mostly operators are used inside .pipe(...) to change or transform the data stream into our desirable format.
- Commonly operators follow subscribe --> select --> transform --> unsubscribe approach.
Example-1:
import { of } from 'rxjs';
import { filter, map } from 'rxjs/operators';
// 1. Create a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);
// 2. Use pipe to add operators
numbers$.pipe(
filter(num => num % 2 === 0), // Only even numbers
map(num => num * 2) // Double them
).subscribe(result => {
console.log('✅ Final Output:', result); // and finally print them.
});
Output:
✅ Final Output: 4
✅ Final Output: 8
Example - 2: (Pls don't get scared, Just close your eyes and take long breathe, it gets easier)
// This is close to a real-world search bar implementation.
import { fromEvent } from 'rxjs';
import { map, debounceTime, distinctUntilChanged } from 'rxjs/operators';
const input = document.getElementById('search-box') as HTMLInputElement; // this is just DOM searching.
fromEvent(input, 'input').pipe(
debounceTime(500), // ⏳ Wait 0.5s after typing stops
map(e => (e.target as HTMLInputElement).value.trim()), // 🧼 Clean input value by removing extra spaces.
distinctUntilChanged(), // 🚫 Ignore if same as last value
map(value => value.toUpperCase()) // 🔠 Convert to UPPERCASE
).subscribe(searchTerm => {
console.log('🔍 Search for:', searchTerm);
});
Image User is typing like this:
h → he → hel → hell → hello → hello (pause) → hello (again) → help
Output:
🔍 Search for: HELLO
🔍 Search for: HELP
In the above example:
- It waits 500ms before reacting to input (debounce).
- It ignores repeated 'hello' (distinctUntilChanged).
- It transforms to uppercase (map).
- And finally the result is logged when all conditions are met.
6.Subject: (One of my favorites).
A Subject is like an Observable you can push data into manually.
- Think of it as:
- A microphone 🎤 that broadcasts data to everyone who’s listening.
- It’s both:
- An Observable (others can subscribe to it)
- An Observer (you can push values into it using .next())
- Sounds Sus right? 😂
- Don't worry this is by far the heavily used thing in Rxjs apps.
Example:
import { Subject } from 'rxjs';
// 1. Create a Subject (notification system)
const notification$ = new Subject<string>();
// 2. Component A subscribes
notification$.subscribe(msg => {
console.log('📱 Component A got:', msg);
});
// 3. Component B subscribes
notification$.subscribe(msg => {
console.log('💻 Component B got:', msg);
});
// 4. Send a notification
notification$.next('🔔 New message received!');
// 5. Another one
notification$.next('📦 Your order has been shipped.');
// 6. Complete the subject (shut down the notification system)
notification$.complete();
// 7. This will NOT be received by anyone as stream ended.
notification$.next('❗ You shouldn’t see this.');
Output:
📱 Component A got: 🔔 New message received!
💻 Component B got: 🔔 New message received!
📱 Component A got: 📦 Your order has been shipped.
💻 Component B got: 📦 Your order has been shipped.
Notice how both the subscribers got the same message?. and the data we sent is complete manual.
- It's basically *multicast *— one source, many listeners.
- Also, we can use
notification$.complete('green apples 🍏🍏');
to end the stream right away.
7.Schedulers:
A Scheduler controls when and how a task (like emitting a value) is runs — like a clock or thread manager in RxJS.
- It determines what executes when and where.
- Think of it as:
- A clock manager ⏰ for your streams
- Schedulers let you shift execution between:
- Synchronous (now)
- Asynchronous (later)
- Animation Frame (next frame)
- Queue (microtask-like)
- This is Advanced topic, so you might not see this in every codebase. As this is used for timing, performance, or multithreading-like behavior in JS.
Example:
import { of, asyncScheduler, queueScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// Log before any stream
console.log('🔵 Start');
// Normal (synchronous) observable
of('🍕 sync').subscribe(value => console.log('✅', value));
// Queued execution (after current task completes)
of('🍔 queued').pipe(observeOn(queueScheduler)).subscribe(value => console.log('📦', value));
// Async execution (delayed, in next JS macro-task)
of('🍟 async').pipe(observeOn(asyncScheduler)).subscribe(value => console.log('🕒', value));
// Log after setting up streams
console.log('🔴 End');
Output:
🔵 Start
✅ 🍕 sync
🔴 End
📦 🍔 queued
🕒 🍟 async
Well, how the turn tables? 😂.
No, this is actually how JS works.
- console logging statements are executed synchronously.
- And synchronous tasks are executed as usual.
- then we queued the item so it will run in parallel or in multithread.
- and then Finally Async code.
- Even though Async code is micro-task and is higher priority than settimeout, setinterval. We are using async sheduler here which is macro-task and not the promises / async awaits which are micro-tasks, that's why we got async at last.
And.......... that's it folks!.
This was a quick and easy introduction to Rxjs and its beginner terminology.
Let me know if there is any doubt / mistake in the comments.
And I'll try to cover Rxjs in later posts.
Peace!
Top comments (0)