DEV Community

loading...
Cover image for Implementing Custom Dapr State

Implementing Custom Dapr State

aloneguid profile image Ivan G ・11 min read

This article was originally published on my blog

Today I'd like to describe the way you can add your own implementation of state to Dapr. If you don't know what Dapr is, go ahead and read about it first, the basics are beyond this post.

Dapr state is somewhere your microservice or application will store data, and at the moment of this writing Dapr already supports multiple store providers. However, none of them suited me. I wanted to run Dapr in my own environment, and didn't want to host something heavy like Redis, or pay a lot of money for Cosmos DB (similar reasons apply to other providers). What I'd like is something extra cheap and easy to use. For that reason I thought it would be great if I'd get something like Azure Table Storage to work for me - in development it's very hard to get charged more than £0.01, it provides high latency, really nice speed and abundance of tools like Azure Storage Explorer to view the data. Now, the last one was of a high importance to me, because say tools for Redis are either hard to use, or not free (really expensive).

One of the options to get what I want is of course raise a ticket and wait for someone to do it, either from Dapr team, or another opensource contributor, however I'd like it now :) Therefore my solution was to actually create one myself. This is a hard one, because Dapr is written in Go, and I never wrote Go. Given the availability of free time I started learning it, using this awesome book.

I didn't expect much from books, as they're declining in quality these days (in my opinion) however this one was definitely and exception.

How Storage Works

In Dapr, whenever you launch an application with default settings, a folder called components will be created, and when running in local mode Dapr CLI creates two files - redis.yml and redis-messagebus.yml. Those two files declare how exactly the storage and messaging works in your application's instance. I'm going to ignore messaging for now, and only look at the first one redis.yml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

Basically, what this file tells us is our application declares a state of type state.redis and passes connection metadata to it like redisHost and redisPassword. This is fine and works, as Dapr by default creates a Redis docker container so that you can work with state. You can also replace the content in this file with your own, and there are examples of this configuration for different stores.

When your application saves or loads the state, it will simply call Dapr API with key name and value to be saved, and then Dapr process decides how to do it. Internally, each store implements the following interface:

type Store interface {
    Init(metadata Metadata) error
    Delete(req *DeleteRequest) error
    BulkDelete(req []DeleteRequest) error
    Get(req *GetRequest) (*GetResponse, error)
    Set(req *SetRequest) error
    BulkSet(req []SetRequest) error
}

This is quite simple to be honest. Init method is called first, before the store does anything and gets the metadata passed in spec/metadata section of the yaml in a form of key-value dictionary structure. Then you have methods for getting, setting, and deleting the state. Therefore, hopefully all you need to do is create another state store which implements this interface.

Scaffolding the Project

All the state stores in Dapr are part of components-contrib git repo. Dapr itself references this repo and statically links it into main executable. This is really interesting, and gives us ability to have separate lifetimes of Dapr and components. Therefore, the first thing you need to do is clone this repo, have a look around.

State stores implementations are located under state folder in this repo and you can see various implementations already, even for that built-in Redis provides Dapr likes to use by default. There's also a folder called azure so it would make sense for me to add my table storage provider there, so I'm creating a folder called tablestorage and an implementation file tablestorage.go.

Now, in Go, in order to implement an interface, things are a little weird. You don't have explicit constructs like in C# or Java, but rather create a set of methods in your source code that have the same signature as the interface itself.

Implementing Init

So the first thing I'm going to do is implement the Init method which is called first.

func (r *StateStore) Init(metadata state.Metadata) error {
    meta, err := getTablesMetadata(metadata.Properties)
    if err != nil {
        return err
    }

    client, _ := storage.NewBasicClient(meta.accountName, meta.accountKey)
    tables := client.GetTableService()
    r.table = tables.GetTableReference(meta.tableName)

    //check table exists
    log.Debugf("using table '%s'", meta.tableName)
    err = r.table.Create(operationTimeout, storage.FullMetadata, nil)
    if err != nil {
        if isTableAlreadyExistsError(err) {
            //error creating table, but it already exists so we're fine
            log.Debugf("table already exists")
        } else {
            return err
        }
    }

    log.Debugf("table initialised, account: %s, table: %s", meta.accountName, meta.tableName)

    return nil
}

This method receives metadata from store definition file, and is using Azure Go SDK to create a client to table storage, get table service, table reference, and create the table if required. It puts the table reference in my own structure called StateStore which is defined like this:

type StateStore struct {
    table *storage.Table
    json  jsoniter.API
}

Every other method has access to this structure, therefore it will be a good idea to store initialised table storage connection in it (table member).

Parsing Metadata

You've noticed I'm using getTablesMetadata method in the Init. This is simply a helper method I wrote below, which helps me to get strongly typed values from the yaml file. In my case, store definition in yaml looks like the following:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.azure.tablestorage
  metadata:
  - name: accountName
    value: <storage account name>
  - name: accountKey
    value: <key>
  - name: tableName
    value: <table name>

And corresponding method to parse it:

func getTablesMetadata(metadata map[string]string) (*tablesMetadata, error) {
    meta := tablesMetadata{}

    if val, ok := metadata[accountNameKey]; ok && val != "" {
        meta.accountName = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing or empty %s field from metadata", accountNameKey))
    }

    if val, ok := metadata[accountKeyKey]; ok && val != "" {
        meta.accountKey = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing of empty %s field from metadata", accountKeyKey))
    }

    if val, ok := metadata[tableNameKey]; ok && val != "" {
        meta.tableName = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing of empty %s field from metadata", tableNameKey))
    }

    return &meta, nil
}

Implementing Set and BulkSet

Those two come together. Set method is supposed to set one key and BulkSet multiple in a batch. At the moment of this writing, BulkSet is used exclusively by all state API, and only actors sometimes use Set. To make things simpler, I'm only implementing Set and just forwarding BulkSet to Set:

func (r *StateStore) Set(req *state.SetRequest) error {
    log.Debugf("saving %s", req.Key)

    return r.writeRow(req)
}

func (r *StateStore) BulkSet(req []state.SetRequest) error {
    log.Debugf("bulk set %v key(s)", len(req))

    for _, s := range req {
        err := r.Set(&s)
        if err != nil {
            return err
        }
    }

    return nil
}

writeRow is a helper function that deals with internals of table storage API.

Implementing Get

Similarly, implementing Get is also trivial:

func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
    log.Debugf("fetching %s", req.Key)
    pk, rk := getPartitionAndRowKey(req.Key)
    entity := r.table.GetEntityReference(pk, rk)
    err := entity.Get(operationTimeout, storage.FullMetadata, nil)

    if err != nil {
        if isNotFoundError(err) {
            return &state.GetResponse{}, nil
        }

        return &state.GetResponse{}, err
    }

    data, etag, err := r.unmarshal(entity)
    return &state.GetResponse{
        Data: data,
        ETag: etag,
    }, err
}


Not going into details of Delete method, as it's trivial too. For you convenience I'm pasting full source code below:

// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

/*
Azure Table Storage state store.

Sample configuration in yaml:

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: statestore
    spec:
      type: state.azure.tablestorage
      metadata:
      - name: accountName
        value: <storage account name>
      - name: accountKey
        value: <key>
      - name: tableName
        value: <table name>

This store uses PartitionKey as service name, and RowKey as the rest of the composite key.

Concurrency is supported with ETags according to https://docs.microsoft.com/en-us/azure/storage/common/storage-concurrency#managing-concurrency-in-table-storage
*/

package tablestorage

import (
    "fmt"
    "github.com/Azure/azure-sdk-for-go/storage"
    "github.com/dapr/components-contrib/state"
    jsoniter "github.com/json-iterator/go"
    "github.com/pkg/errors"
    log "github.com/sirupsen/logrus"
    "strings"
)

const (
    keyDelimiter        = "||"
    valueEntityProperty = "Value"
    operationTimeout    = 1000

    accountNameKey = "accountName"
    accountKeyKey  = "accountKey"
    tableNameKey   = "tableName"
)

type StateStore struct {
    table *storage.Table
    json  jsoniter.API
}

type tablesMetadata struct {
    accountName string
    accountKey  string
    tableName   string
}

// Initialises connection to table storage, optionally creates a table if it doesn't exist.
func (r *StateStore) Init(metadata state.Metadata) error {
    meta, err := getTablesMetadata(metadata.Properties)
    if err != nil {
        return err
    }

    client, _ := storage.NewBasicClient(meta.accountName, meta.accountKey)
    tables := client.GetTableService()
    r.table = tables.GetTableReference(meta.tableName)

    //check table exists
    log.Debugf("using table '%s'", meta.tableName)
    err = r.table.Create(operationTimeout, storage.FullMetadata, nil)
    if err != nil {
        if isTableAlreadyExistsError(err) {
            //error creating table, but it already exists so we're fine
            log.Debugf("table already exists")
        } else {
            return err
        }
    }

    log.Debugf("table initialised, account: %s, table: %s", meta.accountName, meta.tableName)

    return nil
}

func (r *StateStore) Delete(req *state.DeleteRequest) error {
    log.Debugf("delete %s", req.Key)
    return r.deleteRow(req)
}

func (r *StateStore) BulkDelete(req []state.DeleteRequest) error {
    log.Debugf("bulk delete %v key(s)", len(req))
    for _, s := range req {
        err := r.Delete(&s)
        if err != nil {
            return err
        }
    }
    return nil
}

func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
    log.Debugf("fetching %s", req.Key)
    pk, rk := getPartitionAndRowKey(req.Key)
    entity := r.table.GetEntityReference(pk, rk)
    err := entity.Get(operationTimeout, storage.FullMetadata, nil)

    if err != nil {
        if isNotFoundError(err) {
            return &state.GetResponse{}, nil
        }

        return &state.GetResponse{}, err
    }

    data, etag, err := r.unmarshal(entity)
    return &state.GetResponse{
        Data: data,
        ETag: etag,
    }, err
}

func (r *StateStore) Set(req *state.SetRequest) error {
    log.Debugf("saving %s", req.Key)

    return r.writeRow(req)
}

func (r *StateStore) BulkSet(req []state.SetRequest) error {
    log.Debugf("bulk set %v key(s)", len(req))

    for _, s := range req {
        err := r.Set(&s)
        if err != nil {
            return err
        }
    }

    return nil
}

func NewAzureTablesStateStore() *StateStore {
    return &StateStore{
        json: jsoniter.ConfigFastest,
    }
}

func getTablesMetadata(metadata map[string]string) (*tablesMetadata, error) {
    meta := tablesMetadata{}

    if val, ok := metadata[accountNameKey]; ok && val != "" {
        meta.accountName = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing or empty %s field from metadata", accountNameKey))
    }

    if val, ok := metadata[accountKeyKey]; ok && val != "" {
        meta.accountKey = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing of empty %s field from metadata", accountKeyKey))
    }

    if val, ok := metadata[tableNameKey]; ok && val != "" {
        meta.tableName = val
    } else {
        return nil, errors.New(fmt.Sprintf("missing of empty %s field from metadata", tableNameKey))
    }

    return &meta, nil
}

func (r *StateStore) writeRow(req *state.SetRequest) error {
    pk, rk := getPartitionAndRowKey(req.Key)
    entity := r.table.GetEntityReference(pk, rk)
    entity.Properties = map[string]interface{}{
        valueEntityProperty: r.marshal(req),
    }
    entity.OdataEtag = req.ETag

    // InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first
    // as it's more frequent, and then Insert

    err := entity.Update(false, nil)
    if err != nil {
        if isNotFoundError(err) {
            // When entity is not found (set state first time) create it
            entity.OdataEtag = ""
            return entity.Insert(storage.FullMetadata, nil)
        }
    }
    return err
}

func isNotFoundError(err error) bool {
    azureError, ok := err.(storage.AzureStorageServiceError)
    return ok && azureError.Code == "ResourceNotFound"
}

func isTableAlreadyExistsError(err error) bool {
    azureError, ok := err.(storage.AzureStorageServiceError)
    return ok && azureError.Code == "TableAlreadyExists"
}

func (r *StateStore) deleteRow(req *state.DeleteRequest) error {
    pk, rk := getPartitionAndRowKey(req.Key)
    entity := r.table.GetEntityReference(pk, rk)
    entity.OdataEtag = req.ETag
    return entity.Delete(true, nil)
}

func getPartitionAndRowKey(key string) (string, string) {
    pr := strings.Split(key, keyDelimiter)
    if len(pr) != 2 {
        return pr[0], ""
    }
    return pr[0], pr[1]
}

func (r *StateStore) marshal(req *state.SetRequest) string {
    var v string
    b, ok := req.Value.([]byte)
    if ok {
        v = string(b)
    } else {
        v, _ = jsoniter.MarshalToString(req.Value)
    }
    return v
}

func (r *StateStore) unmarshal(row *storage.Entity) ([]byte, string, error) {
    raw := row.Properties[valueEntityProperty]

    // value column not present
    if raw == nil {
        return nil, "", nil
    }

    // must be a string
    sv, ok := raw.(string)
    if !ok {
        return nil, "", errors.New(fmt.Sprintf("expected string in column '%s'", valueEntityProperty))
    }

    // use native ETag
    etag := row.OdataEtag
    return []byte(sv), etag, nil
}

Thinking Concurrently

As you can see, implementing a state store is a trivial exercise, however one more important thing I didn't mention is concurrency. Dapr concurrency is based on ETags which should be familiar to most developers. The idea is simple:

  1. When value is retrieved from a store, the store also returns something called an ETag which indicates the version of this value.
  2. When value is saved to a store, the store first checks if current version of it's value (ETag) matches the passed ETag.
  3. When ETags don't match, the value cannot be saved - this means that someone else has changed the value and the service that has issued save request doesn't know about it.
  4. When ETags match, the value is saved with no issues.

Every store must support concurrency, and you can see in my code it is supported. Fortunately, in my case ETag support is already built into Azure Table Storage, therefore all I need to do is return native ETag on request and set it back on update, using OdataEtag property on table entity.

ETag can be anything, as long as you can represent it as a string.

Running and Debugging

The last thing I'd like to explain is how to debug Dapr with your new store implementation. Repository you have cloned (components-contrib) doesn't actually contain Dapr runtime. The runtime sits in another Git repo - dapr. I'm going to describe the method I've used here and hopefully it will be useful for you too.

First, clone dapr repo, however use the same folder you've cloned components-contrib repository to. You should end up with a directory containing two subfolders:

  1. components-contrib
  2. dapr

Go to go.mod file in dapr repository, and find the replace section. This section allows us to redirect a Go dependency somewhere else, and it even supports local folders!

Add the following clause to redirects - github.com/dapr/components-contrib => ../components-contrib so your redirects section will look similar to:

replace (
    github.com/Azure/go-autorest => github.com/Azure/go-autorest v13.3.0+incompatible
    github.com/dapr/components-contrib => ../components-contrib
    k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36
)

Now you can build and run dapr runtime bravely, as it will pick up components-contrib from your parent folder.

That's not it yet. Dapr runtime doesn't know yet about your new store, and the way you can register it is the following:

  1. Open cmd/daprd/main.go - this is the startup file of Dapr daemon.
  2. Add imports for your new store, in my case state_azure_tablestorage "github.com/dapr/components-contrib/state/azure/tablestorage"
  3. Go below and find where other stores are registered, and add your store registration as well, it will look like this in my case:
state_loader.New("azure.tablestorage", func() state.Store {
    return state_azure_tablestorage.NewAzureTablesStateStore()
}),

How you're ready to Go :) You need to build daprd.exe (on Windows) and replace the one you've downloaded officially with your own one. That's it.

Quick Debug Tip

If you're lazy like me, you'd want to have the shortest cycle between changing code and testing it from your application. For that reason, I've set up my IDE to build to Dapr standard folder where it installs, so that daprd.exe is always replaced. I'm using IntelliJ IDEA with Go plugin and it's trivial to do.

In my case I can launch debugging session for daprd.exe straight from here, where it also replaces daprd.exe and passes program arguments to specify a hardcoded port number for HTTP and gRPC comms. Easy.

In addition to that, if you are struggling with launching your test application every time, remember that Dapr works via HTTP/gRPC and you can always call the storage with some HTTP client and pass/view JSON. I'm a big fan of REST Client extension for VS Code, because you can write your requests in text and also commit them into Git, so my testing looks very easy too:

I'm sure you can think of even more ways to make your life easier. Anyway, leave a comment and tell me what you think.

Discussion (0)

pic
Editor guide