DEV Community

Ken Bellows
Ken Bellows

Posted on

How streams can simplify your life

This post was originally written for LogRocket, and is cross-published here with permission.

In the land of web development, streams (and their building blocks, observables) are an increasingly popular topic. Libraries like BaconJS and RxJS have been around for years now, and RxJS is even used as a foundation for Angular 2+. In fact, there’s even a TC39 proposal to add native observables to the language.

So streams are kind of a big deal. But… why? Why do so many people care about streams?

The short answer is that a stream-based approach dramatically simplifies several problems that have caused migraines for decades. We’ll talk about those problems and how streams help to solve them in a sec, but before we do, I want to plant a seed here, present an overall theme that I want you to keep in the back of your mind as we continue.

The problems that streams solve are all about sending, receiving, and processing data. So here’s our thematic seed: as I see them, what streams provide is a change of perspective from asking for data to listening for data.

The problems

It’s nearly too obvious to be worth saying (but here I go) — modern web applications are incredibly complex. They tend to have a ton of more-or-less independent components all sitting on a page at the same time, requesting data from various sources, transforming that data, combining data from different sources in interesting ways, and ultimately, if all goes well, putting some of that data on the screen for us users to look at.

And by the way, “data source” doesn’t just mean “REST API”. Data can come from all sorts of places:

  • Web socket connections for real-time updates from the server
  • User input events, like mouse movements or keyboard events in a text field
  • Input from sensors, like a phone’s gyroscope or accelerometer
  • postMessage() communications from web workers, iframes, or related windows Storage change events from localStorage or IndexedDB

And the list goes on; you can probably think of something I’ve missed.

All of this complexity can be tough to manage. Here are a few problem situations that come up all the time:

  • A single data source is used simultaneously by several independent components
  • A component needs to listen for updates from its data sources and react to them in real-time
  • Several independent parts of an application need to be kept in sync; updates from a single data source should be reflected everywhere as instantly as possible
  • User actions in one component should update several other independent components so that the one component acts as a data source for the others
  • Each component uses a different set of data sources, combining their outputs in unique ways, often transforming and merging data from those sources to suit that component’s needs. This transformation needs to happen again after each update from any of its data sources.
  • Streams can handle all of these problems with ease, and do so in a way that’s easy to follow and understand.

What are streams?

Before we get into code samples, let’s talk a tiny bit of theory, just for a minute.

The software design pattern being invoked here is called the Observer pattern. In this pattern, we’ve got two important players: “observers” and “subjects” (also called “observables”). As their names suggest, observers “observe” subjects, and whenever subjects emit any data, observers find out about it. In code, this is accomplished by subjects keeping a list of all observers that are currently observing them, and whenever they’ve got some data to pass along, they run through that list and call a special method on each observer, passing the data as an argument.

The observer pattern is used all over the place in software. It’s the basic architecture behind all pub/sub interactions. You can even think of everyday event handlers as observers. And I think it’s clear why this pattern is so popular: the ability to easily find out about asynchronous events when they happen, and to get data from a source whenever it’s available without needing to poll for it, is very powerful.

Streams are one layer of abstraction higher than observers and subjects. Streams use subjects that can also act as observers, observing other subjects to receive data. Each subject observes someone else to wait for data, performs some kind of processing on the data it receives, then sends some data along to whoever is observing it. These observer-subjects make it really easy to build up long chains of data processors that can do interesting things with the data and help get it to the components in our app that need it.

Another aspect worth mentioning is that just as a single subject can be observed by multiple observers, a single observer can also observe multiple subjects. This enables merging data from different sources together in all sorts of interesting ways.

Take a moment and imagine linking lots of these individual observer-subjects together, then step back and look at the big picture. Think about how data flows through this system from sources to destinations, merging with data from other sources, splitting into tributaries and joining up again with more data, creating interesting paths to bring it to where it’s needed all over our system very efficiently. This big picture is what we talk about as “streams”.

The code examples

So now that we know the theory, let’s put it into practice.

For each data source you have, no matter what kind of source it is, create a subject and make it available to any component that needs data from that source. Different UI frameworks facilitate this in different ways, but for our purposes, we’ll put each subject in a JavaScript module. Then any component that needs data from that source can import the subject.

Note: I’ll use JavaScript as the language and RxJS as the stream library for the code examples here, but this is arbitrary. RxJS is what I’m most familiar with, but there are other stream libraries that accomplish the same thing, both in JS and other languages. In fact, RxJS is just the JavaScript implementation of an abstract sort-of-spec called ReactiveX that has implementations in all sorts of languages.

So suppose we need to poll an API periodically. We can create a subject to handle that, using RxJS’s handy ajax helper and the interval function, which creates a subject that emits at the specified interval. (The pipe operator essentially chains together the operators you give it, and switchMap creates a new observable from each bit of data it receives, then emits that observable’s data before creating the next one, but don’t get too hung up here; these are specific to RxJS and sort of beside the point).

import {interval} from 'rxjs'
import {ajax} from 'rxjs/ajax'
Import {switchMap} from 'rxjs/operators'

// every 10 seconds, poll /api/updates
const apiSubject = interval(1000).pipe(
    switchMap(_ => ajax.getJSON('https://mysite.com/api/updates'))
)

export apiSubject

We can keep going this way, creating a module for each data source that returns a subject. When it’s time to use the data from these sources in a component, it’s as easy as any other import:

import {webSocket} from 'rxjs/webSocket'

const wsSubject = webSocket('ws://mysite.com:8081')

// if needed, we could do some pre-processing of websocket messages here

export wsSubject

This is already useful, to have all data sources producing data through a common interface. But the real power of streams comes from the incredible ease with which we can process and manipulate data by chaining together those observer-subjects. Stream libraries like RxJS make this very easy by providing “operator” methods on their subject data types that each internally observe the subject and return a new subject to be observed.

To demonstrate this, let’s imagine a very simple example: a chat room application. In this scenario, the above web socket could be used for real-time chat notifications, and the API could be used for updates from the server that don’t need to be quite as real time. (Yeah, I know, you could do both over web socket, but let’s roll with this for the sake of demonstration).

Suppose our server updates API returns two sorts of things:

  • an updated list of who is on the server whenever the list changes
  • occasional server notices that should appear in the chat room for all users to see

Suppose the packets received from the server are formatted this way:

{
  "messages": [
    {"type": "who", "val": ["joe", "coolguy", "jane", "karla"]},
    {"type": "notice", "val": "Welcome to the server!"},
    {"type": "notice", "val": "Server maintenance scheduled for 5:00pm EST"}
  ]
}

We need to handle the “who” messages by updating the user list, and handle the “notice” messages by displaying them in the chatroom. One way to accomplish the second task might be to treat the notices the same as user messages, and give them a special user name, like “SERVER”.

Now suppose that messages received from the web socket are formatted this way:

{
  "user": "joe",
  "message": "Hey, everyone!"
}

We’ll need to transform notices to match this format and combine the notice messages with the web socket messages to send to the chatroom. Fortunately, with streams this is super simple:

import apiSubject from 'api-subject'
import wsSubject from 'ws-subject'
import {merge, from} from 'rxjs'
import {filter, pluck, switchMap} from 'rxjs/operators'


const serverMessages = apiSubject.pipe(
    pluck('messages'), // grab the “messages” array from the server response
    switchMap(from) // create an observable from the array that emits one message at a time
)

// update the user list when the server sends a new one
serverMessages.pipe(
    filter(m => m.type === 'who'), // get just the 'who' messages
    pluck('val') // get the array of usernames from each 'who' message
).subscribe(function(userList) {
    // update the user list here
})

// transform the 'notice' messages to the same format as a websocket message
const notices = serverMessages.pipe(
    filter(m => m.type === 'notice'),
    pluck('val'),
    map(notice => ({ user: 'SERVER', message: val }))
)

// create a subject that observes both the server notices and the user messages from the websocket
merge(notices, wsSubject).subscribe(function(message) {
    // post the message to the chat room here
})

Not bad at all! Something that’s not super obvious from this code, since it’s abstracted away behind fancy helpers and operators, is that every one of those helpers and operators (webSocket, ajax, from, pluck, switchMap, filter, merge) creates a new subject that observes the previous subject (or subjects!) in the stream, does something with each bit of data it receives and sends something new down the stream. The special subscribe method creates a simple observer that consumes whatever comes out the end of the stream, but can’t itself be observed.

How streams solve our problems

So now that we’ve seen a little of what streams can do, let’s return to the list of problems we talked about earlier and make sure we have an answer to each of them. Let’s take them one-by-one:

  • A single data source is used simultaneously by several independent components. — Wrapping your subject in a module will allow any component to get to it and subscribe to it
  • A component needs to listen for updates from its data sources and react to them in real-time. — This is the whole idea of the Observer Pattern: as soon as a subject emits data, its observers find out about it and can react in real time
  • Several independent parts of an application need to be kept in sync; updates from a single data source should be reflected everywhere as instantly as possible. — Multiple observers can observe the same subject, so keeping different components in sync is easy
  • User actions in one component should update several other independent components so that the one component acts as a data source for the others. — This one has a few possible solutions, depending on use case. One way that I have accomplished this in the past is to create a central module with a subject that represents the data source and allow components to both subscribe and push data to the subject. In RxJS, the Subject type has a method called “next” that can be called to hand some data to the subject:
  const mySubject = new Subject
  mySubject.subscribe(console.log)
  mySubject.next('Testing!') // logs 'Testing!' to the console
  • Each component uses a different set of data sources, combining their outputs in unique ways, often transforming and merging data from those sources to suit that component’s needs. This transformation needs to happen again after each update from any of its data sources. — We saw a simple example of this with combining the web socket messages and server notices into a single stream. Any time a message came in from either data source, it was immediately pushed into the chat room. This was a very simplistic example, but hopefully, you can see how you might extend it to handle more complex situations. RxJS has several functions besides just merge that deal with combining data from multiple streams, such as combineLatest, zip, or concat.

Stuff I didn't talk about

This was a relatively shallow dive into streams, but I hope I’ve managed to give a glimpse of the power streams can provide. They can significantly simplify the flow of data through a system, especially when dealing with several data sources that need to interact and update disparate parts of an application simultaneously.

But because I wanted this to stay pretty shallow, there’s a lot I didn’t talk about. How do you handle errors in the stream? How do you clean up your observables to prevent memory leaks? What the heck are “hot” and “cold” observables? All of these are super important and should be some of the first things you learn if you decide to dive into streams (heh), but that’s the part I was focusing on: convincing you to dive in. I hope I’ve done that!

More resources

If you want to learn more about what streams can do for you, and I hope you do, here are some links for further reading/viewing:

Top comments (0)