loading...

Mutual exclusion using go and consul

mariomazo profile image Mario Mazo Updated on ・6 min read

Mutually exclusive workers using golang and consul

Implementing mutually exclusive workers is fun and useful. In this post I will show how to do a naive implementation (DO NOT use this in production) of this pattern using client side leader election with consul.

The reason I chose consul for this task is because it was already available in our infrastructure, but the same could be achieved with different tools, for example AWS dynamoDB. Basically you just need a Key-Value store that support locks.

All the code snippets in the post are taken from a sample github repo where you can find the full implementation.

The problem

We had a job server that each hour would connect to a postgreSQL database read some tasks then it would proceed the execute the tasks created in the last hour, and then it would write back to the database if the task was succefully finished.

The problem with this is that it doesn't scale. The jobs began to take longer and longer and if machine crashed all tasks were not executed

The solution - Mutually exclusive workers (Distributed Locks)

We knew we needed to run multiple instances of the job server. But each task could only be executed once. So we decided to use distributed locks with consul. After a quick search we realized that implenting the leader election algorithm was the best solution for us.

So the whole concept is quite simple. All job servers get a list of tasks to be executed, then they iterate over the tasks list, get a lock on the task
so all other job servers skip that task, and move on to the next task. This way we can have multiple nodes working at the same all working on different tasks.

Architecture

In the image we see that serverA locks task1 and executes the task sending it down that pipe with a solid arrow. ServerB and ServerC have dashed arrows what I mean is that task1 could not be executed by those 2 servers. Later on serverB would lock another task and so on.

Client side leader election

The leader election algorithm is quite simple its just two steps:

  • Create a session in consul
  • Try to put a Lock on that session. If you succeed you are leader if not... well you are not leader

Step 1: Creating the session

First we will create a small wrapper function around the session creation. So
we always create a session no matter if we can work on a specific task or not.

func (ec *exclusiveWorker) createSession() error {
    sessinConf := &api.SessionEntry{
        TTL:      ec.sessionTimeout,
        Behavior: "delete",
    }

    sessionID, _, err := ec.client.Session().Create(sessinConf, nil)
    if err != nil {
        return err
    }

    fmt.Println("sessionID:", sessionID)
    ec.sessionID = sessionID
    return nil
}

We will pass to configuration parameters to the go consul client

  • TTL: This is the time out for the session. After this time has passed, consul will execute the behavior. I have set 15sfor this example

  • Behavior: This delete behavior means that after the TTL has been reached the session is deleted and the Key associated with it

If you want to better understand how behaviors work please read the official consul sessions documentation. But the main take is this:

If the release behavior is being used, any of the locks held in association with the session are released, and the ModifyIndex of the key is incremented. Alternatively, if the delete behavior is used, the key corresponding to any of the held locks is simply deleted. This can be used to create ephemeral entries that are automatically deleted by Consul.

Step 2: Acquire the session

Once we have a session we try to aquire the session. If we succed put a Lock on it and return success. We are leaders. If we failt to acquire the lock it means that the task is already being executed by another server. Here is were the mutual exlusitivy happens.

Again we are going to wrap this in a simple funcion for easy handling

func (ec *exclusiveWorker) acquireSession() (bool, error) {
    KVpair := &api.KVPair{
        Key:     ec.key,
        Value:   []byte(ec.sessionID),
        Session: ec.sessionID,
    }

    aquired, _, err := ec.client.KV().Acquire(KVpair, nil)
    return aquired, err
}

We are going to need to pass 3 values to the consul client

  • Key: This is the indentifier of the tasks. To follow conventions I chose service/<TASK_NAME>/leader but could be anything that better fit your needs. I really thought about using service/<APP_NAME>/<TASK_NAME>.
  • Value: This is really unimportant when you are using consul for distributed locks. I chose the current sessionID (which is nothing but an UUID) for easy debugging. But could well be rambo or goku
  • Session: is the ID of the session we are going to try to Lock

Step 3:

There is no step 3. That's all is there for the leader election part. But we still need a couple of helper functions we are going to need to make this work in a more realistic way.

Destroy session

Once our task is done we have to be nice and release the lock. We could wait for the TTL and beahvior to kick in, but that's not nice. So let's implement
a basic destroy session function we can call when our task is finished, and all we need is the sessionID.

func (ec *exclusiveWorker) destroySession() error {
    _, err := ec.client.Session().Destroy(ec.sessionID, nil)
    if err != nil {
        erroMsg := fmt.Sprintf("ERROR cannot delete key %s: %s", ec.key, err)
        return errors.New(erroMsg)
    }

    return nil
}

Renew session

If the tasks is taking more than the TTL the session and key are deleted by the behavior. If this happens a different server could lock exactly the same task and you would execute 2 times the same task which is what we are trying to avoid.

So we need to renew the session constanly to avoid triggering the behavior. The consul client comes with a handy function RenewPeriodic() that does exactly that. So lets write the wrapper:

func (ec *exclusiveWorker) renewSession(doneChan <-chan struct{}) error {
    err := ec.client.Session().RenewPeriodic(ec.sessionTimeout, ec.sessionID, nil, doneChan)
    if err != nil {
        return err
    }
    return nil
}

Here we need 3 things:

  • sessionTimeout: This is the original TTL. The client will use this to refresh the session each TTL/2
  • sessionID: Id of the session we want to renew
  • doneChan: Is a channel we use the signal that we need to keep renewing the session or if we close the channel we mean that we are done with the task and we don't need to renew the session anymore

Demo

In the accompanying github repo There is a fully working implementation of this. It's also very simple and intended for learning purporses.
You will need a work go installation to be able to compile the code and docker with docker-compose to be able to run a consul server. So lets see the demo:
First launch consul

$ docker-compose up

Then open 2 terminals and in the first run the code and you should see something like this:

$ go run main.go
sessionID: bac7cf19-285e-9907-98ad-e8189a07cbd9
I can work. YAY!!!
Starting to work

you now can check the web interface of consul http://localhost:8500/ui/dc1/kv
to verify that the keys are created, locked and destroyed either by TTL, finishing the task or interrupting the task.

in the second one if you run the code you can see the code exiting while the task is executed

$ go run main.go
sessionID: d0f26b95-11cb-236c-bba7-601441f2ae74
I can NOT work. YAY!!!
$

if you interrupt the task by doing Ctrl+C you should see the cleanup happening

$ go run main.go
sessionID: 4685d391-251d-9f6d-1c2c-5ab6fdbd9f98
I can work. YAY!!!
Starting to work
^C2018/09/13 11:16:04 Job interrupted. Cleaning up

if you try to connect right after the task is finised you will notice you cannot conect. This is due to lock-delay which is documented in the sessions section of the consul documentation.

Final thoughts

I invite you to check the github repo as the code there is full of notes about implementation that could be useful for a real implementation.

Also there are things I did not implement like stop() or Discovering the Leader, but implementation should be simple. Feel free to submit a pull requres.

Links

Posted on by:

Discussion

pic
Editor guide