DEV Community

Cover image for How to implement an elegant service discovery extension in the HTTP framework
L2ncE
L2ncE

Posted on

How to implement an elegant service discovery extension in the HTTP framework

Foreword

In the previous article, the implementation of service registration in Hertz has been interpreted. In this article, we will focus on the interpretation of Hertz's service discovery part.

Hertz

Hertz is an ultra-large-scale enterprise-level microservice HTTP framework, featuring high ease of use, easy expansion, and low latency etc.

Hertz uses the self-developed high-performance network library Netpoll by default. In some special scenarios, Hertz has certain advantages in QPS and latency compared to go net.

In internal practice, some typical services, such as services with a high proportion of frameworks, gateways and other services, after migrating Hertz, compared to the Gin framework, the resource usage is significantly reduced, CPU usage is reduced by 30%-60% with the size of the traffic.

For more details, see cloudwego/hertz.

Service discovery extension

Hertz supports custom discovery modules. Users can extend and integrate other registries by themselves. The extension is defined under pkg/app/client/discovery.

Expansion interface

Service discovery interface definition and implementation

There are three methods in the service discovery interface.

  1. Resolve as the core method of Resolve, it will get the service discovery result we need from the target key.
  2. Target resolves the unique target that Resolve needs to use from the peer TargetInfo provided by Hertz, and this target will be used as the unique key of the cache.
  3. Name is used to specify the unique name of the Resolver, and Hertz will use it to cache and reuse the Resolver.
type Resolver interface {
    // Target should return a description for the given target that is suitable for being a key for cache.
    Target(ctx context.Context, target *TargetInfo) string

    // Resolve returns a list of instances for the given description of a target.
    Resolve(ctx context.Context, desc string) (Result, error)

    // Name returns the name of the resolver.
    Name() string
}
Enter fullscreen mode Exit fullscreen mode

These three methods are implemented in subsequent code in discovery.go.

// SynthesizedResolver synthesizes a Resolver using a resolve function.
type SynthesizedResolver struct {
    TargetFunc  func(ctx context.Context, target *TargetInfo) string
    ResolveFunc func(ctx context.Context, key string) (Result, error)
    NameFunc    func() string
}

func (sr SynthesizedResolver) Target(ctx context.Context, target *TargetInfo) string {
    if sr.TargetFunc == nil {
        return ""
    }
    return sr.TargetFunc(ctx, target)
}

func (sr SynthesizedResolver) Resolve(ctx context.Context, key string) (Result, error) {
    return sr.ResolveFunc(ctx, key)
}

// Name implements the Resolver interface
func (sr SynthesizedResolver) Name() string {
    if sr.NameFunc == nil {
        return ""
    }
    return sr.NameFunc()
}
Enter fullscreen mode Exit fullscreen mode

There are three resolution functions in SynthesizedResolver here for each of the three implementations to resolve.

TargetInfo definition

As mentioned above, the Target method resolves the only target that Resolve needs to use from TargetInfo.

type TargetInfo struct {
    Host string
    Tags map[string]string
}
Enter fullscreen mode Exit fullscreen mode

instance interface definition and implementation

Instance contains information from the target service instance. There are three methods.

  1. Address is the address of the target service.
  2. Weight serves the weight of the target.
  3. Tag is the tag for the target service, in the form of key-value pairs.
// Instance contains information of an instance from the target service.
type Instance interface {
    Address() net.Addr
    Weight() int
    Tag(key string) (value string, exist bool)
}
Enter fullscreen mode Exit fullscreen mode

These three methods are implemented in subsequent code in discovery.go.

type instance struct {
    addr   net.Addr
    weight int
    tags   map[string]string
}

func (i *instance) Address() net.Addr {
    return i.addr
}

func (i *instance) Weight() int {
    if i.weight > 0 {
        return i.weight
    }
    return registry.DefaultWeight
}

func (i *instance) Tag(key string) (value string, exist bool) {
    value, exist = i.tags[key]
    return
}
Enter fullscreen mode Exit fullscreen mode

NewInstance

NewInstance creates an instance with the given network, address and tags.

// NewInstance creates an Instance using the given network, address and tags
func NewInstance(network, address string, weight int, tags map[string]string) Instance {
    return &instance{
        addr:   utils.NewNetAddr(network, address),
        weight: weight,
        tags:   tags,
    }
}
Enter fullscreen mode Exit fullscreen mode

Result

As mentioned above, the Resolve method will get the service discovery result we need from the target key. Result contains the results from service discovery. The instance list is cached and can be mapped to the cache using the CacheKey.

// Result contains the result of service discovery process.
// the instance list can/should be cached and CacheKey can be used to map the instance list in cache.
type Result struct {
    CacheKey  string
    Instances []Instance
}
Enter fullscreen mode Exit fullscreen mode

client middleware

Client middleware is defined under pkg/app/client/middlewares/client.

Discovery

Discovery will use the BalancerFactory to construct a middleware. First read and apply the configuration we passed in through the Apply method. The detailed configuration information is defined under pkg/app/client/middlewares/client/sd/options.go. Then assign the service discovery center, load balancer and load balancing configuration we set to lbConfig, call NewBalancerFactory to pass in lbConfig, and finally return an anonymous function of type client.Middleware.

// Discovery will construct a middleware with BalancerFactory.
func Discovery(resolver discovery.Resolver, opts ...ServiceDiscoveryOption) client.Middleware {
    options := &ServiceDiscoveryOptions{
        Balancer: loadbalance.NewWeightedBalancer(),
        LbOpts:   loadbalance.DefaultLbOpts,
        Resolver: resolver,
    }
    options.Apply(opts)

    lbConfig := loadbalance.Config{
        Resolver: options.Resolver,
        Balancer: options.Balancer,
        LbOpts:   options.LbOpts,
    }

    f := loadbalance.NewBalancerFactory(lbConfig)
    return func(next client.Endpoint) client.Endpoint {
        // ...
    }
}
Enter fullscreen mode Exit fullscreen mode

Implementation principle

The implementation principle of service discovery middleware is actually the last part of Discovery that we did not parse above. We will reset the Host in the middleware. When the configuration in the request is not empty and IsSD() is configured as True, we get an instance and call SetHost to reset the Host.

return func(ctx context.Context, req *protocol.Request, resp *protocol.Response) (err error) {
  if req.Options() != nil && req.Options().IsSD() {
    ins, err := f.GetInstance(ctx, req)
    if err != nil {
      return err
    }
    req.SetHost(ins.Address().String())
  }
  return next(ctx, req, resp)
}
Enter fullscreen mode Exit fullscreen mode

Implementation analysis of service discovery

Regular refresh

In practice, our service discovery information is updated frequently. Hertz uses the refresh method to periodically refresh our service discovery information. We will refresh through a for range loop, where the interval between the loops is the RefreshInterval in the configuration. Then we refresh by traversing the key-value pairs in the cache through the Range method in the sync library function.

// refresh is used to update service discovery information periodically.
func (b *BalancerFactory) refresh() {
    for range time.Tick(b.opts.RefreshInterval) {
        b.cache.Range(func(key, value interface{}) bool {
            res, err := b.resolver.Resolve(context.Background(), key.(string))
            if err != nil {
                hlog.SystemLogger().Warnf("resolver refresh failed, key=%s error=%s", key, err.Error())
                return true
            }
            renameResultCacheKey(&res, b.resolver.Name())
            cache := value.(*cacheResult)
            cache.res.Store(res)
            atomic.StoreInt32(&cache.expire, 0)
            b.balancer.Rebalance(res)
            return true
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

resolver cache

In the comments of NewBalancerFactory, we can know that when we get the same key as the target in the cache, we will get and reuse this load balancer from the cache. Let's briefly analyze its implementation. We pass the service discovery center, load balancer and load balancing configuration together into the cacheKey function to get the uniqueKey .

func cacheKey(resolver, balancer string, opts Options) string {
    return fmt.Sprintf("%s|%s|{%s %s}", resolver, balancer, opts.RefreshInterval, opts.ExpireInterval)
}
Enter fullscreen mode Exit fullscreen mode

Then we will use the Load method to find out whether there is the same uniqueKey in the map, if so, we will directly return to the load balancer. If not, we will add it to the cache.

func NewBalancerFactory(config Config) *BalancerFactory {
    config.LbOpts.Check()
    uniqueKey := cacheKey(config.Resolver.Name(), config.Balancer.Name(), config.LbOpts)
    val, ok := balancerFactories.Load(uniqueKey)
    if ok {
        return val.(*BalancerFactory)
    }
    val, _, _ = balancerFactoriesSfg.Do(uniqueKey, func() (interface{}, error) {
        b := &BalancerFactory{
            opts:     config.LbOpts,
            resolver: config.Resolver,
            balancer: config.Balancer,
        }
        go b.watcher()
        go b.refresh()
        balancerFactories.Store(uniqueKey, b)
        return b, nil
    })
    return val.(*BalancerFactory)
}
Enter fullscreen mode Exit fullscreen mode

There will be a problem if there is no cache for reuse. When the middleware initializes and executes two coroutines, if the user creates a new client every time, it will cause the coroutine to leak.

Summarize

In this article, we learned about the interface definition of Hertz service discovery, the design of client middleware, and the reason and implementation of using timed refresh and cache in service discovery implementation.

Finally, if the article is helpful to you, please like and share it, this is the greatest encouragement to me!

Reference

Latest comments (0)