DEV Community

Aceld
Aceld

Posted on

(Part 10)Golang Framework Hands-on - Prometheus Metrics Statistics

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics

To be continued.


Before diving into this chapter, let's introduce how to start the Prometheus Metrics service. For those unfamiliar with Prometheus, it's advisable to look up additional information. In simple terms, Prometheus is a system monitoring and metrics tool.

As KisFlow is a stream computing framework, metrics such as function scheduling time, total data volume, and algorithm speed are crucial for developers and project teams. These metrics can be recorded using Prometheus Metrics through KisFlow.

Next, we will configure the framework globally, allowing developers to enable Prometheus metrics collection if needed.

10.1 Prometheus Metrics Service

10.1.1 Prometheus Client SDK

First, add the necessary dependency in the kis-flow/go.mod file:

module kis-flow

go 1.18

require (
    github.com/google/uuid v1.5.0
    github.com/patrickmn/go-cache v2.1.0+incompatible
    github.com/prometheus/client_golang v1.14.0  //++++++++
    gopkg.in/yaml.v3 v3.0.1
)
Enter fullscreen mode Exit fullscreen mode

We use the official Prometheus Golang client SDK. More details can be found in the official README documentation:

Next, let's write a simple Prometheus service that allows external access to KisFlow service metrics. Create a new directory kis-flow/metrics/ for the KisFlow metrics code.

kis-flow/metrics/kis_metrics.go

package metrics

import (
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "kis-flow/common"
    "kis-flow/log"
    "net/http"
)

// RunMetricsService starts the Prometheus monitoring service
func RunMetricsService(serverAddr string) error {

    // Register the Prometheus monitoring route path
    http.Handle(common.METRICS_ROUTE, promhttp.Handler())

    // Start the HTTP server
    err := http.ListenAndServe(serverAddr, nil) // Multiple processes cannot listen on the same port
    if err != nil {
        log.Logger().ErrorF("RunMetricsService err = %s\n", err)
    }

    return err
}
Enter fullscreen mode Exit fullscreen mode

Define METRICS_ROUTE as the monitoring service HTTP route path in kis-flow/common/const.go:

kis-flow/common/const.go

// ... ...

// metrics
const (
    METRICS_ROUTE string = "/metrics"
)

// ... ...
Enter fullscreen mode Exit fullscreen mode

Let's briefly explain the above code. RunMetricsService() starts the Prometheus monitoring HTTP service. The purpose of this service is to provide metrics for the current KisFlow process through HTTP requests. While we haven't collected specific metrics yet, Prometheus will provide default metrics such as the current Go version, GC garbage collection time, memory allocation, etc.

  • serverAddr parameter: This is the address for the Prometheus monitoring service, usually a local address with a port number like "0.0.0.0:20004".
http.Handle(common.METRICS_ROUTE, promhttp.Handler())
Enter fullscreen mode Exit fullscreen mode

This line of code sets "0.0.0.0:20004/metrics" as the metrics entry point.

After writing the above code, remember to pull the relevant dependency package from Prometheus Golang Client SDK, https://github.com/prometheus/client_golang .

$ go mod tidy
Enter fullscreen mode Exit fullscreen mode

After pulling, the current go.mod dependencies will look something like this (with version differences):

kis-flow/go.mod

module kis-flow

go 1.18

require (
    github.com/google/uuid v1.5.0
    github.com/patrickmn/go-cache v2.1.0+incompatible
    github.com/prometheus/client_golang v1.14.0
    gopkg.in/yaml.v3 v3.0.1
)

require (
    github.com/beorn7/perks v1.0.1 // indirect
    github.com/cespare/xxhash/v2 v2.1.2 // indirect
    github.com/golang/protobuf v1.5.2 // indirect
    github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
    github.com/prometheus/client_model v0.3.0 // indirect
    github.com/prometheus/common v0.37.0 // indirect
    github.com/prometheus/procfs v0.8.0 // indirect
    golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
    google.golang.org/protobuf v1.28.1 // indirect
)
Enter fullscreen mode Exit fullscreen mode

10.1.2 Unit Testing for Prometheus Server Service Startup

Next, let's perform a simple test to verify if the service can start.

Create a file named prometheus_server_test.go in the kis-flow/test/ directory:

kis-flow/test/prometheus_server_test.go

package test

import (
    "kis-flow/metrics"
    "testing"
)

func TestPrometheusServer(t *testing.T) {
    err := metrics.RunMetricsService("0.0.0.0:20004")
    if err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, the monitoring address is "0.0.0.0:20004". Next, start this unit test case by opening terminal A and navigating to the kis-flow/test/ directory:

$ cd kis-flow/test/
$ go test -test.v -test.paniconexit0 -test.run TestPrometheusServer
=== RUN   TestPrometheusServer
Enter fullscreen mode Exit fullscreen mode

Then, open another terminal B and enter the following command to simulate an HTTP client request:

$ curl http://0.0.0.0:20004/metrics
Enter fullscreen mode Exit fullscreen mode

After that, we should see the monitoring metrics result in terminal B as follows:

# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0
go_gc_duration_seconds{quantile="0.5"} 0
go_gc_duration_seconds{quantile="0.75"} 0
go_gc_duration_seconds{quantile="1"} 0
go_gc_duration_seconds_sum 0
go_gc_duration_seconds_count 0
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 8
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.18.8"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 3.2364e+06
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 3.2364e+06
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.446507e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 0
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 3.561224e+06
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 3.2364e+06
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 4.636672e+06
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 3.260416e+06
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 21294
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 4.636672e+06
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 7.897088e+06
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 0
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 21294
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 9600
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 15600
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 46376
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 48960
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 4.194304e+06
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 1.171301e+06
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 491520
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 491520
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 1.46322e+07
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 7
# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
# TYPE promhttp_metric_handler_requests_in_flight gauge
promhttp_metric_handler_requests_in_flight 1
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 1
promhttp_metric_handler_requests_total{code="500"} 0
promhttp_metric_handler_requests_total{code="503"} 0
Enter fullscreen mode Exit fullscreen mode

We have already provided configurations for Function, Flow, and Connector in KisFlow, distinguished by kistype. Next, we will implement a global configuration with kistype set to global. In this configuration, we will add settings to enable or disable Prometheus and Metrics collection.

Let's proceed to add global configuration properties to KisFlow.

10.2 KisFlow Global Configuration

10.2.1 Loading Global Configuration Files

The global configuration in YAML format is as follows:

# kistype Global for the global configuration of KisFlow
kistype: global
# Whether to enable Prometheus monitoring
prometheus_enable: true
# Whether KisFlow needs to start a separate port listener
prometheus_listen: true
# The address for Prometheus to listen for metrics
prometheus_serve: 0.0.0.0:20004
Enter fullscreen mode Exit fullscreen mode

10.2.2 Struct Definition

Next, based on the configuration protocol above, we'll define the strategy configuration struct for KisFlow and provide some initialization methods. Create a file named kis_global_config.go in the project documentation. Here, we'll define the necessary configuration.

kis-flow/config/kis_global_config.go

package config

type KisGlobalConfig struct {
    // kistype Global for the global configuration of KisFlow
    KisType string `yaml:"kistype"`
    // Whether to enable Prometheus monitoring
    EnableProm bool `yaml:"prometheus_enable"`
    // Whether KisFlow needs to start a separate port listener
    PrometheusListen bool `yaml:"prometheus_listen"`
    // The address for Prometheus to listen for metrics
    PrometheusServe string `yaml:"prometheus_serve"`
}

// GlobalConfig is the default global configuration, all are turned off
var GlobalConfig = new(KisGlobalConfig)
Enter fullscreen mode Exit fullscreen mode

Here, we provide a global GlobalConfig object, which is a public variable, making it convenient for other modules to share the global configuration.

10.2.3 Configuration File Parsing

Next, we'll parse the global configuration and import it. Add the following function in kis-flow/file/config_import.go:

kis-flow/file/config_import.go

// kisTypeGlobalConfigure parses the Global configuration file in YAML format
func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
    // Global configuration
    if err := yaml.Unmarshal(confData, config.GlobalConfig); err != nil {
        return fmt.Errorf("%s has wrong format kisType = %s", fileName, kisType)
    }

    // TODO Initialize Prometheus metrics

    // TODO Start Prometheus metrics service

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This function loads the global YAML configuration file. After loading, it determines whether to initialize Prometheus metrics monitoring, which we will add later.

Where is kisTypeGlobalConfigure() called? It is invoked during the loading and scanning of local configuration files, similar to other configuration files:

kis-flow/file/config_import.go

// parseConfigWalkYaml parses all configuration files in YAML format and loads the configuration information into allConfig
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
    // ... ...

    err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
        // ... ... 

        // Check if kistype exists
        if kisType, ok := confMap["kistype"]; !ok {
            return fmt.Errorf("yaml file %s has no field [kistype]!", filePath)
        } else {
            switch kisType {
            case common.KisIdTypeFlow:
                return kisTypeFlowConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeFunction:
                return kisTypeFuncConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeConnector:
                return kisTypeConnConfigure(all, confData, filePath, kisType)

            // +++++++++++++++++++++++++++++++++
            case common.KisIdTypeGlobal:
                return kisTypeGlobalConfigure(confData, filePath, kisType)
            // +++++++++++++++++++++++++++++++++

            default:
                return fmt.Errorf("%s sets wrong kistype %s", filePath, kisType)
            }
        }
    })

    if err != nil {
        return nil, err
    }

    return all, nil
}
Enter fullscreen mode Exit fullscreen mode

Here, we add a case for kistype: KisIdTypeGlobal to call kisTypeGlobalConfigure().

Next, we will create the Metrics module. In this section, we will start by tracking a simple metric: the total amount of data processed by KisFlow (based on the number of source data processed).

10.3 Metrics - DataTotal Metric

10.3.1 KisMetrics

First, create a KisMetrics module by creating the directory kis-flow/metrics/ and the file kis_metrics.go:

kis-flow/metrics/kis_metrics.go

package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "kis-flow/common"
    "kis-flow/log"
    "net/http"
)

// kisMetrics defines the Prometheus metrics for KisFlow
type kisMetrics struct {
    // Total data count
    DataTotal prometheus.Counter
}

var Metrics *kisMetrics

// RunMetricsService starts the Prometheus monitoring service
func RunMetricsService(serverAddr string) error {
    // Register Prometheus monitoring route
    http.Handle(common.METRICS_ROUTE, promhttp.Handler())

    // Start HTTP server
    err := http.ListenAndServe(serverAddr, nil) // Multiple processes cannot listen on the same port
    if err != nil {
        log.Logger().ErrorF("RunMetricsService err = %s\n", err)
    }

    return err
}

// InitMetrics initializes the metrics
func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize the DataTotal counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // Register the metrics
    prometheus.MustRegister(Metrics.DataTotal)
}
Enter fullscreen mode Exit fullscreen mode
  • kisMetrics struct: This struct defines the metrics that KisFlow needs to track. Currently, it only includes one metric, DataTotal, which is of type prometheus.Counter (for more information on the prometheus.Counter type, please refer to the Prometheus documentation).

  • Metrics *kisMetrics: This is a global metrics tracking object for KisFlow, making it publicly accessible for other modules.

  • RunMetricsService(serverAddr string): This function starts the Prometheus service listener, which was already unit tested in previous chapters.

  • InitMetrics(): This function initializes the global object and sets up the metrics. It calls prometheus.MustRegister to register the metrics with Prometheus, which is a necessary step in Prometheus metrics programming.

There are two constants representing the metric display name and description. These are defined as follows:

kis-flow/common/const.go

// metrics
const (
    METRICS_ROUTE string = "/metrics"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by all Flows in KisFlow"
)
Enter fullscreen mode Exit fullscreen mode

10.3.2 DataTotal Metric Tracking

To track the total data processed by KisFlow, we need to add the metrics tracking code in the commitSrcData() method. This method submits the current Flow's source data, indicating the first time the original source data is submitted to the current Flow. The updated code is as follows:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitSrcData(ctx context.Context) error {
    // Create a batch of data
    dataCnt := len(flow.buffer)
    batch := make(common.KisRowArr, 0, dataCnt)

    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    // Clear previous data
    flow.clearData(flow.data)

    // Record the original data for the first submission
    // Since this is the first submission, PrevFunctionId is FirstVirtual because there is no previous Function
    flow.data[common.FunctionIdFirstVirtual] = batch

    // Clear the buffer
    flow.buffer = flow.buffer[0:0]

    // +++++++++++++++++++++++++++++++
    // Track the total data count on the first submission
    if config.GlobalConfig.EnableProm == true {
        // Increment the DataTotal metric by the data count
        metrics.Metrics.DataTotal.Add(float64(dataCnt))
    }
    // ++++++++++++++++++++++++++++++

    log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

First, it checks the global configuration to determine if metrics tracking is enabled. If true, it increments the total data count metric with the following code:

metrics.Metrics.DataTotal.Add(float64(dataCnt))
Enter fullscreen mode Exit fullscreen mode

Here, dataCnt is the number of data items being added to the total count.

10.3.3 Starting the Metrics Service

After importing the configuration, we need to start the metrics service. The configuration file is updated as follows:

kis-flow/file/config_import.go

// kisTypeGlobalConfigure parses the Global configuration file in YAML format
func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
    // Global configuration
    if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
        return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
    }

    // ++++++++++++++++++++
    // Start the Metrics service
    metrics.RunMetrics()

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The RunMetrics() function is implemented as follows:

kis-flow/metrics/kis_metrics.go

// RunMetrics starts the Prometheus metrics service
func RunMetrics() {
    // Initialize Prometheus metrics
    InitMetrics()

    if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true {
        // Start the Prometheus metrics service
        go RunMetricsService(config.GlobalConfig.PrometheusServe)
    }
}
Enter fullscreen mode Exit fullscreen mode

With this setup, after importing the global configuration, it checks whether metrics tracking is enabled. If it is, a new goroutine is started to launch the Prometheus server, listening on the IP and port specified in the configuration file.

Next, we will create a unit test for the DataTotal metric to validate our implementation.

10.4 KisMetrics Unit Testing

10.4.1 Create Global Configuration File

Create a global configuration file kis-flow.yml under kis-flow/test/load_conf/ with the following content:

kis-flow/test/load_conf/kis-flow.yml

# kistype Global for kisflow global configuration
kistype: global
# Enable prometheus monitoring
prometheus_enable: true
# Enable separate kisflow port listening
prometheus_listen: true
# Prometheus endpoint listening address
prometheus_serve: 0.0.0.0:20004
Enter fullscreen mode Exit fullscreen mode

10.4.2 Create Unit Test

Next, create the test case code in kis-flow/test/, and create the kis_metrics_test.go file as follows:

kis-flow/test/kis_metrics_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
    "time"
)

func TestMetricsDataTotal(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callback business logic
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback business logic
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration file and build Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    n := 0

    for n < 10 {
        // 3. Submit raw data
        _ = flow1.CommitRow("This is Data1 from Test")

        // 4. Execute flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }

        time.Sleep(1 * time.Second)
        n++
    }

    select {}
}
Enter fullscreen mode Exit fullscreen mode

This case works similarly to starting KisFlow usually, except that it includes a loop that starts a stream computation every second and submits a piece of data, looping 10 times. Afterward, we can check the total data amount through the Prometheus monitoring service. The select{} statement prevents the main goroutine from exiting, ensuring that the Prometheus monitoring goroutine continues running.

Run the unit test by navigating to kis-flow/test/ and executing:

go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
Enter fullscreen mode Exit fullscreen mode

You will see a lot of log output. Wait for 10 seconds, then open another terminal and enter the following command:

$ curl http://0.0.0.0:20004/metrics 
Enter fullscreen mode Exit fullscreen mode

The result will be:

# ... ...
# HELP kisflow_data_total KisFlow total data amount of all Flows
# TYPE kisflow_data_total counter
kisflow_data_total 10
# ... ...
Enter fullscreen mode Exit fullscreen mode

Here, you'll find that the kisflow_data_total metric appears with a result of 10, indicating that our metrics are correctly tracked. Next, we can add more complex metrics for KisFlow.

10.5 Additional Metrics

10.5.1 Metric: Flow Data Total

(1) Define Metric

First, define the metric type as follows:

kis-flow/metrics/kis_metrics.go

// kisMetrics Prometheus monitoring metrics for kisFlow
type kisMetrics struct {
    // Total data amount
    DataTotal prometheus.Counter
    // Data total per Flow
    FlowDataTotal *prometheus.GaugeVec
}
Enter fullscreen mode Exit fullscreen mode

FlowDataTotal uses the prometheus.GaugeVec type to distinguish which Flow generates the data.

(2) Initialize and Register Metrics

kis-flow/metrics/kis_metrics.go

func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize DataTotal Counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // +++++++++++
    // Initialize FlowDataTotal GaugeVec
    Metrics.FlowDataTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
            Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
        },
        // Label names
        []string{common.LABEL_FLOW_NAME},
    )

    // Register Metrics
    prometheus.MustRegister(Metrics.DataTotal)
    prometheus.MustRegister(Metrics.FlowDataTotal) // +++
}
Enter fullscreen mode Exit fullscreen mode

Related constant definitions:

kis-flow/common/const.go


// metrics
const (
    METRICS_ROUTE string = "/metrics"

    // ++++++++
    LABEL_FLOW_NAME     string = "flow_name"
    LABEL_FLOW_ID       string = "flow_id"
    LABEL_FUNCTION_NAME string = "func_name"
    LABEL_FUNCTION_MODE string = "func_mode"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data amount of all Flows"

    // +++++++ 
    GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
    GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow data total amount per Flow"
)

Enter fullscreen mode Exit fullscreen mode

(3) Add Metric Tracking

We should track the total data amount when submitting raw data.

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitSrcData(ctx context.Context) error {

    // Create batch data
    dataCnt := len(flow.buffer)
    batch := make(common.KisRowArr, 0, dataCnt)

    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    // Clear previous data
    flow.clearData(flow.data)

    // First submission, record flow raw data
    flow.data[common.FunctionIdFirstVirtual] = batch

    // Clear buffer
    flow.buffer = flow.buffer[0:0]

    // First submission, track data total
    if config.GlobalConfig.EnableProm == true {
        // Track data total Metrics.DataTotal
        metrics.Metrics.DataTotal.Add(float64(dataCnt))

        // ++++++++
        // Track current Flow data total
        metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
    }

    log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

So the tracking point is in the same position as before, but we add a flow.Name label when accumulating the data.

10.5.2 Metric: Flow Scheduling Count

(1) Metric Definition

First, define the metric type as follows:

kis-flow/metrics/kis_metrics.go

// kisMetrics represents the Prometheus monitoring metrics for kisFlow
type kisMetrics struct {
    // Total data count
    DataTotal prometheus.Counter
    // Total data processed by each Flow
    FlowDataTotal *prometheus.GaugeVec
    // Total Flow scheduling count
    FlowScheduleCntsTotal *prometheus.GaugeVec //++++
}
Enter fullscreen mode Exit fullscreen mode

FlowScheduleCntsTotal uses prometheus.GaugeVec type to distinguish which Flow produced the data.

(2) Metric Initialization and Registration

kis-flow/metrics/kis_metrics.go

func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize DataTotal as a Counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // Initialize FlowDataTotal as a GaugeVec
    Metrics.FlowDataTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
            Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // +++++++++++++
    // Initialize FlowScheduleCntsTotal as a GaugeVec
    Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
            Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Register metrics
    prometheus.MustRegister(Metrics.DataTotal)
    prometheus.MustRegister(Metrics.FlowDataTotal) 
    // +++++
    prometheus.MustRegister(Metrics.FlowScheduleCntsTotal)
}
Enter fullscreen mode Exit fullscreen mode

Define the relevant constants:

kis-flow/common/const.go


// metrics
const (
    METRICS_ROUTE string = "/metrics"

    LABEL_FLOW_NAME     string = "flow_name"
    LABEL_FLOW_ID       string = "flow_id"
    LABEL_FUNCTION_NAME string = "func_name"
    LABEL_FUNCTION_MODE string = "func_mode"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data count"

    GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
    GANGE_FLOW_DATA_TOTAL_HELP string = "Total data count for each FlowID in KisFlow"

    // +++++++
    GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
    GANGE_FLOW_SCHE_CNTS_HELP string = "Scheduling count for each FlowID in KisFlow"
)
Enter fullscreen mode Exit fullscreen mode

(3) Metric Data Collection

To collect the scheduling count for each Flow, we should collect data in the main entry point flow.Run(), as follows:

kis-flow/flow/kis_flow.go

// Run starts the stream computation of KisFlow and executes the flow from the initial Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is disabled in configuration
        return nil
    }

    // Since no Function has been executed yet, PrevFunctionId is set to FirstVirtual because there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Commit the original stream data
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    // +++++++++++ Metrics
    if config.GlobalConfig.EnableProm == true {
        // Collect scheduling count for Flow
        metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc()
    }
    // ++++++++++++++++++++

    // Chain-style stream invocation
    for fn != nil && flow.abort == false {

        // Record the current Function being executed in the Flow
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // Obtain the source data for the current Function to process
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }

        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Here, the metric collection occurs before calling the fn.Call() method, and we increment the counter for each Flow execution, grouping by flow.Name.

10.5.3 Metric: Function Scheduling Count

(1) Metric Definition

First, define the metric type as follows:

kis-flow/metrics/kis_metrics.go

// kisMetrics represents the Prometheus monitoring metrics for kisFlow
type kisMetrics struct {
    // Total data count
    DataTotal prometheus.Counter
    // Total data processed by each Flow
    FlowDataTotal *prometheus.GaugeVec
    // Total Flow scheduling count
    FlowScheduleCntsTotal *prometheus.GaugeVec 
    // Total Function scheduling count
    FuncScheduleCntsTotal *prometheus.GaugeVec //++++
}
Enter fullscreen mode Exit fullscreen mode

FuncScheduleCntsTotal uses prometheus.GaugeVec type to distinguish which Function produced the data.

(2) Metric Initialization and Registration

kis-flow/metrics/kis_metrics.go

func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize DataTotal as a Counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // Initialize FlowDataTotal as a GaugeVec
    Metrics.FlowDataTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
            Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Initialize FlowScheduleCntsTotal as a GaugeVec
    Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
            Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // ++++++++++
    // Initialize FuncScheduleCntsTotal as a GaugeVec
    Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
            Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
        },
        // Label names
        []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
    )

    // Register metrics
    prometheus.MustRegister(Metrics.DataTotal)
    prometheus.MustRegister(Metrics.FlowDataTotal) 
    prometheus.MustRegister(Metrics.FlowScheduleCntsTotal)
    // +++++++
    prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
}
Enter fullscreen mode Exit fullscreen mode

Define the relevant constants:

kis-flow/common/const.go


// metrics
const (
    METRICS_ROUTE string = "/metrics"

    LABEL_FLOW_NAME     string = "flow_name"
    LABEL_FLOW_ID       string = "flow_id"
    LABEL_FUNCTION_NAME string = "func_name"
    LABEL_FUNCTION_MODE string = "func_mode"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data count"

    GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
    GANGE_FLOW_DATA_TOTAL_HELP string = "Total data count for each FlowID in KisFlow"

    GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
    GANGE_FLOW_SCHE_CNTS_HELP string = "Scheduling count for each FlowID in KisFlow"

    // +++++++++ 
    GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
    GANGE_FUNC_SCHE_CNTS_HELP string = "Scheduling count for each Function in KisFlow"
)
Enter fullscreen mode Exit fullscreen mode

(3) Metric Data Collection

To collect the scheduling count for each Function, we should collect data in the main entry point flow.Run(), as follows:

kis-flow/flow/kis_flow.go

// Run starts the stream computation of KisFlow and executes the flow from the initial Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is disabled in configuration
        return nil
    }

    // Since no Function has been executed yet, PrevFunctionId is set to FirstVirtual because there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Commit the original stream data
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    if config.GlobalConfig.EnableProm == true {
        // Collect scheduling count for Flow
        metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc()
    }

    // Chain-style stream invocation
    for fn != nil && flow.abort == false {

        // Record the current Function being executed in the Flow
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // ++++++++++++
        fName := fn.GetConfig().FName
        fMode := fn.GetConfig().FMode

        // +++++++++++++++++++++++++++
        if config.GlobalConfig.EnableProm == true {
            // Collect scheduling count for Function
            metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
        }
        // ++++++++++++++++++++++++++++

        // Obtain the source data for the current Function to process
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Here, the metric collection occurs before calling the fn.Call() method. Each time a Function is scheduled, the counter is incremented, grouped by fName and fMode.

10.5.4 Metric: Function Execution Time

(1) Metric Definition

Define the metric type as follows:

kis-flow/metrics/kis_metrics.go

// kisMetrics defines the Prometheus metrics for kisFlow
type kisMetrics struct {
    // Total data count
    DataTotal prometheus.Counter
    // Total data processed by each Flow
    FlowDataTotal *prometheus.GaugeVec
    // Flow schedule count
    FlowScheduleCntsTotal *prometheus.GaugeVec 
    // Function schedule count
    FuncScheduleCntsTotal *prometheus.GaugeVec 
    // Function execution time
    FunctionDuration *prometheus.HistogramVec //++++
}
Enter fullscreen mode Exit fullscreen mode

FunctionDuration uses the prometheus.HistogramVec type. This type provides distribution statistics across different time intervals, with various buckets representing different time ranges.

(2) Metric Initialization and Registration

kis-flow/metrics/kis_metrics.go

func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize DataTotal Counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // Initialize FlowDataTotal GaugeVec
    Metrics.FlowDataTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FLOW_DATA_TOTAL_NAME,
            Help: common.GAUGE_FLOW_DATA_TOTAL_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Initialize FlowScheduleCntsTotal GaugeVec
    Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FLOW_SCHE_CNTS_NAME,
            Help: common.GAUGE_FLOW_SCHE_CNTS_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Initialize FuncScheduleCntsTotal GaugeVec
    Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FUNC_SCHE_CNTS_NAME,
            Help: common.GAUGE_FUNC_SCHE_CNTS_HELP,
        },
        // Label names
        []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
    )

    // ++++++++++++++++++++++++++
    // Initialize FunctionDuration HistogramVec
    Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
        Name:    common.HISTOGRAM_FUNCTION_DURATION_NAME,
        Help:    common.HISTOGRAM_FUNCTION_DURATION_HELP,
        Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute
    },
        []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
    )

    // Register Metrics
    prometheus.MustRegister(Metrics.DataTotal)
    prometheus.MustRegister(Metrics.FlowDataTotal) 
    prometheus.MustRegister(Metrics.FlowScheduleCntsTotal)
    prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
    // +++++++
    prometheus.MustRegister(Metrics.FunctionDuration)
}
Enter fullscreen mode Exit fullscreen mode

Related constant definitions:

kis-flow/common/const.go


// metrics
const (
    METRICS_ROUTE string = "/metrics"

    LABEL_FLOW_NAME     string = "flow_name"
    LABEL_FLOW_ID       string = "flow_id"
    LABEL_FUNCTION_NAME string = "func_name"
    LABEL_FUNCTION_MODE string = "func_mode"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by KisFlow"

    GAUGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
    GAUGE_FLOW_DATA_TOTAL_HELP string = "Total data processed by each FlowID in KisFlow"

    GAUGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
    GAUGE_FLOW_SCHE_CNTS_HELP string = "Flow schedule counts for each FlowID in KisFlow"

    GAUGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
    GAUGE_FUNC_SCHE_CNTS_HELP string = "Function schedule counts for each Function in KisFlow"

    HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
    HISTOGRAM_FUNCTION_DURATION_HELP string = "Function execution duration"
)

Enter fullscreen mode Exit fullscreen mode

(3) Metric Instrumentation

To measure the execution time of each Function, we should instrument the main entry point of the Flow, flow.Run(), as follows:

kis-flow/flow/kis_flow.go

// Run starts the stream processing of KisFlow, executing from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is configured to be disabled
        return nil
    }

    // ++++++++++ Metrics +++++++++
    var funcStart time.Time

    // Since no Function has been executed yet, set PrevFunctionId to FirstVirtual as there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Commit the original stream data
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    if config.GlobalConfig.EnableProm == true {
        // Record Flow schedule count
        metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc()
    }

    // Chain-style stream invocation
    for fn != nil && flow.abort == false {

        // Record the current Function being executed in the Flow
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        fName := fn.GetConfig().FName
        fMode := fn.GetConfig().FMode

        if config.GlobalConfig.EnableProm == true {
            // Record Function schedule count
            metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()

            // +++++++++++++++
            // Record Function execution time start
            funcStart = time.Now()
        }

        // Obtain the source data for the current Function to process
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }

            // +++++++++++++++
            // Record Function execution duration
            if config.GlobalConfig.EnableProm == true {
                // Function execution duration
                duration := time.Since(funcStart)

                // Record the current Function execution time metric
                metrics.Metrics.FunctionDuration.With(
                    prometheus.Labels{
                        common.LABEL_FUNCTION_NAME: fName,
                        common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
            }
            // +++++++++++++++

        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The instrumentation captures the start time before invoking the Call() method of the Function and calculates the execution duration after the Function completes. This duration is then recorded in the corresponding bucket of the HistogramVec for the Function execution time.

10.5.5 Metric: Flow Execution Time

(1) Metric Definition

Define the metric type as follows:

kis-flow/metrics/kis_metrics.go

// kisMetrics defines the Prometheus metrics for kisFlow
type kisMetrics struct {
    // Total data count
    DataTotal prometheus.Counter
    // Total data processed by each Flow
    FlowDataTotal *prometheus.GaugeVec
    // Flow schedule count
    FlowScheduleCntsTotal *prometheus.GaugeVec 
    // Function schedule count
    FuncScheduleCntsTotal *prometheus.GaugeVec 
    // Function execution time
    FunctionDuration *prometheus.HistogramVec
    // Flow execution time
    FlowDuration *prometheus.HistogramVec // ++++
}
Enter fullscreen mode Exit fullscreen mode

FlowDuration uses the prometheus.HistogramVec type. This type provides distribution statistics across different time intervals, with various buckets representing different time ranges.

(2) Metric Initialization and Registration

kis-flow/metrics/kis_metrics.go

func InitMetrics() {
    Metrics = new(kisMetrics)

    // Initialize DataTotal Counter
    Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
        Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
        Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
    })

    // Initialize FlowDataTotal GaugeVec
    Metrics.FlowDataTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FLOW_DATA_TOTAL_NAME,
            Help: common.GAUGE_FLOW_DATA_TOTAL_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Initialize FlowScheduleCntsTotal GaugeVec
    Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FLOW_SCHE_CNTS_NAME,
            Help: common.GAUGE_FLOW_SCHE_CNTS_HELP,
        },
        // Label name
        []string{common.LABEL_FLOW_NAME},
    )

    // Initialize FuncScheduleCntsTotal GaugeVec
    Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: common.GAUGE_FUNC_SCHE_CNTS_NAME,
            Help: common.GAUGE_FUNC_SCHE_CNTS_HELP,
        },
        // Label names
        []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
    )

    // Initialize FunctionDuration HistogramVec
    Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
        Name:    common.HISTOGRAM_FUNCTION_DURATION_NAME,
        Help:    common.HISTOGRAM_FUNCTION_DURATION_HELP,
        Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute
    },
        []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
    )

    // ++++++++++++++++++++++++++
    // Initialize FlowDuration HistogramVec
    Metrics.FlowDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
        Name:    common.HISTOGRAM_FLOW_DURATION_NAME,
        Help:    common.HISTOGRAM_FLOW_DURATION_HELP,
        Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute
    },
        []string{common.LABEL_FLOW_NAME},
    )

    // Register Metrics
    prometheus.MustRegister(Metrics.DataTotal)
    prometheus.MustRegister(Metrics.FlowDataTotal) 
    prometheus.MustRegister(Metrics.FlowScheduleCntsTotal)
    prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
    prometheus.MustRegister(Metrics.FunctionDuration)
    // +++++++
    prometheus.MustRegister(Metrics.FlowDuration)
}
Enter fullscreen mode Exit fullscreen mode

Related constant definitions:

kis-flow/common/const.go


// metrics
const (
    METRICS_ROUTE string = "/metrics"

    LABEL_FLOW_NAME     string = "flow_name"
    LABEL_FLOW_ID       string = "flow_id"
    LABEL_FUNCTION_NAME string = "func_name"
    LABEL_FUNCTION_MODE string = "func_mode"

    COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
    COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by KisFlow"

    GAUGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
    GAUGE_FLOW_DATA_TOTAL_HELP string = "Total data processed by each FlowID in KisFlow"

    GAUGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
    GAUGE_FLOW_SCHE_CNTS_HELP string = "Flow schedule counts for each FlowID in KisFlow"

    GAUGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
    GAUGE_FUNC_SCHE_CNTS_HELP string = "Function schedule counts for each Function in KisFlow"

    HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
    HISTOGRAM_FUNCTION_DURATION_HELP string = "Function execution duration"

    HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
    HISTOGRAM_FLOW_DURATION_HELP string = "Flow execution duration"
)
Enter fullscreen mode Exit fullscreen mode

(3) Metric Instrumentation

To measure the execution time of each Flow, we should instrument the main entry point of the Flow, flow.Run(), as follows:

kis-flow/flow/kis_flow.go

// Run starts the stream processing of KisFlow, executing from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is configured to be disabled
        return nil
    }

    // ++++++++++ Metrics +++++++++
    var funcStart, flowStart time.Time

    // Since no Function has been executed yet, set PrevFunctionId to FirstVirtual as there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Commit the original stream data
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    if config.GlobalConfig.EnableProm == true {
        // Record Flow schedule count
        metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc()
        // Record Flow execution time start
        flowStart = time.Now()
    }

    // Chain-style stream invocation
    for fn != nil && flow.abort == false {

        // Record the current Function being executed in the Flow
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        fName := fn.GetConfig().FName
        fMode := fn.GetConfig().FMode

        if config.GlobalConfig.EnableProm == true {
            // Record Function schedule count
            metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()

            // Record Function execution time start
            funcStart = time.Now()
        }

        // Obtain the source data for the current Function to process
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }

            // Record Function execution duration
            if config.GlobalConfig.EnableProm == true {
                // Function execution duration
                duration := time.Since(funcStart)

                // Record the current Function execution time metric
                metrics.Metrics.FunctionDuration.With(
                    prometheus.Labels{
                        common.LABEL_FUNCTION_NAME: fName,
                        common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
            }
        }
    }

    // Record Flow execution duration
    if config.GlobalConfig.EnableProm == true {
        // Flow execution duration
        duration := time.Since(flowStart)

        // Record the Flow execution time metric
        metrics.Metrics.FlowDuration.With(
            prometheus.Labels{
                common.LABEL_FLOW_NAME: flow.Name}).Observe(duration.Seconds() * 1000)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The instrumentation captures the start time before invoking the first Function and calculates the execution duration after the Flow completes. This duration is then recorded in the corresponding bucket of the HistogramVec for the Flow execution time.

10.6 KieMetrics Unit Testing (Other Metrics Indicators)

10.6.1 Creating Unit Tests

We can reuse the previous TestMetricsDataTotal() method for unit test cases, as shown below:

kis-flow/test/kis_metrics_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
    "time"
)

func TestMetricsDataTotal(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callbacks
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callbacks
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHandler1)

    // 1. Load configuration files and build Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    n := 0

    for n < 10 {
        // 3. Submit raw data
        _ = flow1.CommitRow("This is Data1 from Test")

        // 4. Execute flow1
        if err := flow1.Run(ctx); err != nil {
            panic(err)
        }

        time.Sleep(1 * time.Second)
        n++
    }

    select {}
}
Enter fullscreen mode Exit fullscreen mode

Execute the unit test by navigating to kis-flow/test/ and running:

go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
Enter fullscreen mode Exit fullscreen mode

You will see many log outputs. After waiting for 10 seconds, open another terminal and input the following command:

curl http://0.0.0.0:20004/metrics
Enter fullscreen mode Exit fullscreen mode

You will see the following results:

# HELP flow_data_total KisFlow data count for each FlowID
# TYPE flow_data_total gauge
flow_data_total{flow_name="flowName1"} 10
# HELP flow_run_duration Flow execution time
# TYPE flow_run_duration histogram
flow_run_duration_bucket{flow_name="flowName1",le="0.005"} 0
flow_run_duration_bucket{flow_name="flowName1",le="0.01"} 0
flow_run_duration_bucket{flow_name="flowName1",le="0.03"} 0
flow_run_duration_bucket{flow_name="flowName1",le="0.08"} 0
flow_run_duration_bucket{flow_name="flowName1",le="0.1"} 0
flow_run_duration_bucket{flow_name="flowName1",le="0.5"} 0
flow_run_duration_bucket{flow_name="flowName1",le="1"} 0
flow_run_duration_bucket{flow_name="flowName1",le="5"} 9
flow_run_duration_bucket{flow_name="flowName1",le="10"} 10
flow_run_duration_bucket{flow_name="flowName1",le="100"} 10
flow_run_duration_bucket{flow_name="flowName1",le="1000"} 10
flow_run_duration_bucket{flow_name="flowName1",le="5000"} 10
flow_run_duration_bucket{flow_name="flowName1",le="30000"} 10
flow_run_duration_bucket{flow_name="flowName1",le="60000"} 10
flow_run_duration_bucket{flow_name="flowName1",le="+Inf"} 10
flow_run_duration_sum{flow_name="flowName1"} 29.135023
flow_run_duration_count{flow_name="flowName1"} 10
# HELP flow_schedule_cnts Number of times each FlowID is scheduled in KisFlow
# TYPE flow_schedule_cnts gauge
flow_schedule_cnts{flow_name="flowName1"} 10
# HELP func_run_duration Function execution time
# TYPE func_run_duration histogram
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.005"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.01"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.03"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.08"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.1"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.5"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1"} 0
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5"} 9
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="10"} 10
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="100"} 10
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1000"} 10
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5000"} 10
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="30000"} 10
func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="+Inf"} 10
func_run_duration_sum{func_mode="Calculate",func_name="funcName3"} 20.925857
func_run_duration_count{func_mode="Calculate",func_name="funcName3"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.005"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.01"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.03"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.08"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.1"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.5"} 0
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1"} 1
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="10"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="100"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1000"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5000"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="30000"} 10
func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="+Inf"} 10
func_run_duration_sum{func_mode="Save",func_name="funcName2"} 27.026124
func_run_duration_count{func_mode="Save",func_name="funcName2"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.005"} 0
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.01"} 0
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.03"} 0
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.08"} 0
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.1"} 0
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.5"} 5
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="10"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="100"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1000"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5000"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="30000"} 10
func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="+Inf"} 10
func_run_duration_sum{func_mode="Verify",func_name="funcName1"} 13.858197
func_run_duration_count{func_mode="Verify",func_name="funcName1"} 10
Enter fullscreen mode Exit fullscreen mode

This concludes the section on KieMetrics Unit Testing and other Metrics Indicators.

10.7 Grafana Dashboard Display for KisFlow Metrics

With Prometheus metrics collected, we can integrate Grafana to display dashboards for KisFlow stream processing programs. Since each developer's project metrics and dashboard requirements may vary, this document does not provide specific Grafana dashboard configuration files. Instead, here is a sample dashboard for a KisFlow project for demonstration purposes, as shown below:

1

2

3

10.8 [V0.9] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.9


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics


Top comments (0)