DEV Community

Cover image for What I learned this week: running functions concurrently
Alex Leonhardt
Alex Leonhardt

Posted on

What I learned this week: running functions concurrently

One of Go's strength is (apparently) that it's (relatively) straight forward to write concurrent code.

This week, I've had a "simple" idea (or task, really):

Execute 2 commands (concurrently) and wait for them to complete, then print the output.

Sounds simple, though isn't, quite ...

Constraints

  • must not know how many of the commands I'd like to execute ahead of time; i.e. I should easily be able to run 20, without having to modify the code (other than add the function/s to run the command)
  • must wait for all commands to have completed
  • keep it as simple as possible
  • the function being executed (the one doing the actual work) should not need to be modified

After many hours of wrestling (with my code), not only to understand go routines, channels and waitGroups (and I'm sure I am still missing quite a lot) but also for it to make sense in my head, I think I came up with a pretty good solution to this.

It is however quite a bit lengthier than I'd like, so maybe there's an even simpler way of doing this? Please leave tips in the comments ;).


So initially, I tried to keep the amount of channels to a minimum, say one for outputs, one for errors, should be sufficient, regardless of the amount of commands that are going to be run; turns out though that this makes shutting down cleanly (or at all) quite a bit difficult. I also kept running into a kind of deadlock where it just wouldn't continue as either I was receiving from an empty channel, or sending into the channel without having a receiver, and many other permutations.

Tip: unbuffered Go channels require you to have a receiver on the channel in order to make progress.

On the other side of the spectrum, trying to simplify things (or basically almost start from scratch again) I got the commands and all to run, but the go-routines closed down way too early, and without adding more complexity it seemed impossible to print all the output and only then exit the program.

Tip: sync.waitGroups are good at waiting for things ;)

So I added a waitGroup, increased its counter for each command to run, and then... it got a bit racy; turns out, sometimes the counter wasn't increased in time. This was because I did the wg.Add(1) inside the go-routine, which meant that the rest of the code moved along in the meantime and weird things can/will/shall happen then.

I used an anonymous function to be run as go-routine (inline) to kick off the command I needed to run, that saved me from having to declare a new function, this would then look something like this...

wg.Add(1)
go func() {
  out, err := doLS("/var/tmp")
  wg.Done()  // decreases the counter in the waitGroup
}()

and somewhere down the line I'll wait for the counter to become 0

[...]
wg.Wait()

At this point, I could have used a buffered channel with the waitGroup to keep track of the commands still executing and kick off a receiver that reads from the channel where all the outputs are being sent into, but that would mean that I'd potentially have to remember to increase the buffer at some point.

Tip: buffered channels don't block, at least not at first

To avoid this, I wanted to keep using the unbuffered channel, however, I had to get past the blocking on the receiver. So, why not run the receiver in another go-routine? That worked, except, I somehow needed to tell it to shutdown once all commands completed.

I tried using (yet) another channel to shutdown the go-routine; using a "done" channel, that looked something like this...

var done = make(chan bool)
go func(done chan bool, output chan string){
    // for loop over output channel, etc...
    if !ok {
      done <- true
      break
    }
}(done)

and meant that after wg.Wait() returned and the execution flow continued, I could've consumed from the channel with <-done and in theory, this should've blocked until all contents of the output channel had been printed.

That didn't work initially, as for the channel to not be OK, it'd have to be closed, though the closing of the output channel was deferred way up there. Adding a close(output) after wg.Wait() I think fixed this, IIRC. (It's been a long week ;))

Luckily, I came across Francesc Campoy's Why nil channels exist some time ago and meant to watch it back then, so today was finally a good time to do that.

The example in the video didn't match exactly what I was trying to do, but I took the idea of the nil channel and ran with it.


The final result of all of this week's hackeroo looks something like this..

So, this is my

Maybe I'll have something interesting next week again. Please leave comments or questions or maybe ideas of how else this could've been done!?


Photo: Unsplash.com - Jake Blucker

Top comments (4)

Collapse
 
ineedale profile image
Alex Leonhardt

So it turns out there is some simplification possible, by replacing

    go func() {
        for outputs != nil {
            fmt.Println("...")
            select {
            case m, ok := <-outputs:
                if !ok {
                    outputs = nil
                    break
                }
                fmt.Println(m)
            }
        }
    }()

with a so much simpler

    go func() {
        for m := range outputs {
            fmt.Println(m)
        }
    }()

ranging over the outputs channel will yield each message in the channel, even after it is closed, until all items have been iterated over, then we're done and the go-routine will return.

Collapse
 
ineedale profile image
Alex Leonhardt

And another tweak, I noticed that still, under some circumstances, it was possible for some output to get lost, especially when sending the output received into e.g. a slice or a map instead of printing. Adding a done channel, fixes this.

    done := make(chan bool)       // done channel
    go func() {
        for m := range outputs {
            fmt.Println(m)
        }
        done <- true              // when we're done printing, send true
    }()

    wg.Wait()
    close(outputs)
    <-done                        // block and wait for done
Collapse
 
sergey_telpuk profile image
Sergey Telpuk • Edited

Make with the help of a worker-pool pattern

package main

import (
    "fmt"
    "math/rand"
    "os/exec"
    "sync"
    "time"
)

const countWorkers = 4

type TypeTask func(chan<- string)

func doLS(path string) ([]byte, error) {
    cmd := exec.Command("ls", "-l", path)
    return cmd.CombinedOutput()
}

func jobs(jobs map[string]func(string, chan<- string)) <-chan TypeTask {
    tasks := make(chan TypeTask, len(jobs))
    defer close(tasks)
    for key, value := range jobs {
        tasks <- func(key string, value func(path string, outputs chan<- string)) TypeTask {
            return func(out chan<- string) {
                value(key, out)
            }
        }(key, value)
    }
    return tasks
}

func task(tasks <-chan TypeTask, out chan string) {
    for task := range tasks {
        task(out)
    }
}

func makeWP(tasks <-chan TypeTask, countWR int) <-chan string {
    out := make(chan string, len(tasks))
    defer close(out)

    var wg sync.WaitGroup
    defer wg.Wait()
    wg.Add(countWR)

    for i := 0; i < countWR; i++ {
        go func() {
            task(tasks, out)
            defer wg.Done()
        }()
    }

    return out
}

func main() {
    paths := map[string]func(string, chan<- string){
        "/usr": func(path string, outputs chan<- string) {
            out, _ := doLS(path)
            time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
            outputs <- string(out)
        },
        "/": func(path string, outputs chan<- string) {
            out, _ := doLS(path)
            time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
            outputs <- string(out)
        },
    }
    done := make(chan bool)
    go func() {
        tasks := jobs(paths)
        outs := makeWP(tasks, countWorkers)

        for out := range outs {
            fmt.Println(out)
        }
        done <- true
    }()

    <-done
}
Collapse
 
ineedale profile image
Alex Leonhardt

Thanks! Another way to accomplish the task. 👍