The pipeline pattern is a powerful way to process data through stages in a concurrent fashion. Each stage performs a different operation on the data then passes on to the next stage.
Using channels to pass the data along, the pipeline pattern can improve performance in many cases.
The idea is really very simple, each stage iterates over a channel, pulling data until there is none left. For each data item, the stage does its operation then passes the result to an output channel and finally closing the channel when no more data is left in the input channel. Closing the channel is important so that the downstream stage will know when to terminate
Create a sequence of numbers, double them, then filter low values, and finally print them to console.
func produce(num int) chan int {
out := make(chan int)
go func() {
for i := 0; i < num; i++ {
out <- rand.Intn(100)
}
close(out)
}()
return out
}
func double(input <-chan int) chan int {
out := make(chan int)
go func() {
for value := range input {
out <- value * 2
}
close(out)
}()
return out
}
func filterBelow10(input <-chan int) chan int {
out := make(chan int)
go func() {
for value := range input {
if value > 10 {
out <- value
}
}
close(out)
}()
return out
}
func print(input <-chan int) {
for value := range input {
fmt.Printf("value is %d\n", value)
}
}
func main() {
print(filterBelow10(double(produce(10))))
}
There is obviously a more readable way to structure main():
func main() {
input := produce(10)
doubled := double(input)
filtered := filterBelow10(doubled)
print(filtered)
}
Choose your style based on your own preference.
What would you add here? Leave your comments below.
Thanks!
The code for this post and all posts in this series can be found here
Top comments (0)