When working with streams one of the coolest things ever is using async generators to do piped processing. Let's start with a simple example that generates numbers from 1 to 5:
const numbers = async function* () {
for (let i = 0; i < 5; i++) yield i.toString()
}
This generator yields 5 numbers, 0 through 4, as strings. Let's pipe them through to see them on the screen:
Readable.from(numbers()).pipe(process.stdout)
Boom! Just like that, we're printing the content of the stream to standard output. Let's see how we can do the printing using console.log()
:
const logger = async function* (
source: AsyncGenerator<string>
) {
for await (const number of source) {
console.log(number)
}
}
Readable.from(numbers())
.pipe(Duplex.from(logger))
.resume()
The call to Readable.resume()
at the end makes the stream pull all the values and process them.
What's interesting here is that we're using a Duplex
stream created from an async generator. Since it is a Duplex
stream (so Readable
and Writeable
) we can still process the data further. For example, we can augment the numbers with some description:
const augmenter = async function* (source: AsyncGenerator<string>) {
for await (const number of source) {
yield `This is number ${number}`
}
}
Readable.from(numbers())
.pipe(Duplex.from(augmenter))
.pipe(Duplex.from(logger))
.resume()
Please note that the source
argument to all the generators have a generic type, that will determine the type of element we are iterating over. In our case, the generic type is string
so the number
variable is of type string
.
I think that's totally cool - how about you?
Happy coding!
Top comments (0)