DEV Community

Aceld
Aceld

Posted on • Updated on

(Part 2.1) Golang Framework Hands-on - KisFlow Streaming Computing Framework - Project Construction / Basic Modules

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Twitter: https://twitter.com/Aceld_


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling

To be continued.


[V0.1]-Project Construction and Basic Module Definition

First, let's create our project. The main directory of the project is called KisFlow, and the corresponding repository is created on Github: https://github.com/aceld/kis-flow. Then clone the project code to your local machine.

2.0 Project Construction

(If you are developing according to this tutorial, you need to create a new project in your repository and clone it locally for development.)

2.0.1 Create Project Directory

Next, we will create the necessary file directories for the project. The directory structure of the project is as follows:

 kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/
Enter fullscreen mode Exit fullscreen mode

Here, we create six folders:

  • common/: To store some common basic constants, enumeration parameters, and some utility methods.
  • flow/: To store the core code of KisFlow.
  • function/: To store the core code of KisFunction.
  • conn/: To store the core code of KisConnector.
  • config/: To store configuration information for flow, function, connector, etc.
  • example/: To store some test cases and unit test cases for KisFlow to verify the project effect in a timely manner.
  • kis/: To store the abstraction layer of all modules.

2.0.2 Create go.mod

cd to the root directory of the kis-flow project and execute the following command:

go mod init kis-flow
Enter fullscreen mode Exit fullscreen mode

You will get the go.mod file, which is the package management file for the current project:

module kis-flow
go 1.18
Enter fullscreen mode Exit fullscreen mode

First, because there will be a lot of debugging logs to print later, we integrate the log module. KisFlow provides a default standard output Logger object, and opens a SetLogger() method for developers to reset their own Logger module.

2.1 KisLogger

2.1.1 Logger Abstract Interface

Define Logger in the kis-flow/log/ directory and create the kis_log.go file:

kis-flow/log/kis_log.go

package log

import "context"

type KisLogger interface {
    // InfoFX Info level log interface with context, formatted string
    InfoFX(ctx context.Context, str string, v ...interface{})
    // ErrorFX Error level log interface with context, formatted string
    ErrorFX(ctx context.Context, str string, v ...interface{})
    // DebugFX Debug level log interface with context, formatted string
    DebugFX(ctx context.Context, str string, v ...interface{})

    // InfoF Info level log interface without context, formatted string
    InfoF(str string, v ...interface{})
    // ErrorF Error level log interface without context, formatted string
    ErrorF(str string, v ...interface{})
    // DebugF Debug level log interface without context, formatted string
    DebugF(str string, v ...interface{})
}

// kisLog Default KisLog object
var kisLog KisLogger

// SetLogger Set KisLog object, can be a user-defined Logger object
func SetLogger(newlog KisLogger) {
    kisLog = newlog
}

// Logger Get the kisLog object
func Logger() KisLogger {
    return kisLog
}
Enter fullscreen mode Exit fullscreen mode

KisLogger provides three levels of logs: Info, Error, and Debug. It also provides two sets of log interfaces with and without context parameters.

Provide a global object kisLog, the default KisLog object. And methods SetLogger() and Logger() are provided for developers to set their own Logger object and get the Logger object.

2.1.2 Default Log Object KisDefaultLogger

If the developer does not define a custom log object, KisFlow will provide a default log object kisDefaultLogger. This class implements all the interfaces of KisLogger, and all logs are printed in the default standard output format. It is defined in the kis-flow/log/ directory, create the kis_default_log.go file.

kis-flow/log/kis_default_log.go

package log

import (
    "context"
    "fmt"
)

// kisDefaultLog Default provided log object
type kisDefaultLog struct{}

func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
    fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
    fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
    fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func init() {
    // If Logger is not set, use kisDefaultLog object by default at startup
    if Logger() == nil {
        SetLogger(&kisDefaultLog{})
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, in the init() initialization method, it checks whether a global Logger object has been set. If not, KisFlow will default to using kisDefaultLog as the global Logger logging object.

2.1.3 Unit Testing KisLogger

For now, we won't focus too much on the development of the KisLogger methods. Instead, we'll prioritize getting the existing program up and running and conduct a unit test to test the creation of a KisLogger.

kis-flow/test/kis_log_test.go

package test

import (
    "context"
    "kis-flow/log"
    "testing"
)

func TestKisLogger(t *testing.T) {
    ctx := context.Background()

    log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
    log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
    log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")

    log.Logger().InfoF("TestKisLogger InfoF")
    log.Logger().ErrorF("TestKisLogger ErrorF")
    log.Logger().DebugF("TestKisLogger DebugF")
}

Enter fullscreen mode Exit fullscreen mode

Navigate to the kis-flow/test/ directory and run the unit test command:

go test -test.v -test.paniconexit0 -test.run TestKisLogger
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok      kis-flow/test   0.509s
Enter fullscreen mode Exit fullscreen mode

2.2 KisConfig

In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│   └── 
├── example/
│   └── 
├── config/
│   ├──
├── test/
└── go.mod
Enter fullscreen mode Exit fullscreen mode

2.2.1 KisFuncConfig Definition

In KisFlow, we define three core modules: KisFunction, KisFlow, and KisConnector. Therefore, KisConfig also needs to define these three modules separately. We will place all the code related to KisConfig in the kis-flow/config/ directory.

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│   └── 
├── example/
│   └── 
├── config/
│   ├──
├── test/
└── go.mod
Enter fullscreen mode Exit fullscreen mode

2.2.1 KisFuncConfig Definition

The design document for KisFuncConfig in YAML format is as follows:

kistype: func
fname: TestKisFunction_S1
fmode: Save
source:
 name: Test data source 1 - user order dimension
 must:
 - userid
 - orderid

option:
 cname: TestKisConnector_1
 retry_times: 3
 retry_duration: 500
 default_params:
 default1: default1_param
 default2: default2_param
Enter fullscreen mode Exit fullscreen mode

Parameter:

Field Required Meaning
kistype Configuration file type
"func" --- KisFunction
"flow" --- KisFlow
"conn" --- KisConnection
fname KisFunction name
fmode Current mode of the KisFunction
Verify: For feature verification KisFunction, mainly for data filtering, validation, field sorting, idempotent preprocessing
Save: For feature storage KisFunction, Save will store the data through KisConnector, and the temporary life cycle of the data is KisWindow
Load: For feature loading KisFunction, Load will load the data through KisConnector, and logically it can be merged with the corresponding Save KisFunction
Calculate: For feature calculation KisFunction, Calculate will calculate the data through KisData in KisFlow, generate new fields, pass the data flow to downstream Save for storage, or it can be stored directly through KisConnector
Expand: For extending features KisFunction, as a custom feature function for streaming computation, such as, Notify scheduler triggers task message sending, deleting some data, resetting status, etc.
source Represents the business source of the current Function
source:name: Data source name
source:must: Fields that must be carried by the current data source (mainly for data validation)
option Optional configurations
cname: Whether the current KisFunction is associated with KisConnection, if associated, fill in the name of the associated KisConnection
retry_times: Number of retries for Function scheduling
retry_duration: Interval time for each retry
default_params: Some custom parameters carried in the Function scheduling through configuration, key and value can be custom named, such as:
default1: default1_param
default2: default2_param

Next, based on the above configuration protocol, we will define the KisFunction strategy configuration structure and provide some corresponding initialization methods. We will create a kis_func_config.go file in the project documentation to implement the required Config definitions.

A. Struct Definition

kis-flow/config/kis_func_config.go

package config

import (
    "kis-flow/common"
    "kis-flow/log"
)

// FParam represents the fixed configuration parameters type for Function in the current Flow
type FParam map[string]string

// KisSource represents the business source of the current Function
type KisSource struct {
    Name string   `yaml:"name"` // Description of the data source for this Function
    Must []string `yaml:"must"` // Fields required by the source
}

// KisFuncOption optional configurations
type KisFuncOption struct {
    CName        string `yaml:"cname"`           // Connector name
    RetryTimes   int    `yaml:"retry_times"`     // Optional, maximum number of retries for Function scheduling (excluding normal scheduling)
    RetryDuriton int    `yaml:"return_duration"` // Optional, maximum time interval for each retry of Function scheduling (unit: ms)
    Params       FParam `yaml:"default_params"`  // Optional, fixed configuration parameters for Function in the current Flow
}

// KisFuncConfig a KisFunction strategy configuration
type KisFuncConfig struct {
    KisType string        `yaml:"kistype"`
    FName   string        `yaml:"fname"`
    FMode   string        `yaml:"fmode"`
    Source  KisSource     `yaml:"source"`
    Option  KisFuncOption `yaml:"option"`
}
Enter fullscreen mode Exit fullscreen mode

Here, KisFuncConfig is the related struct, where FParam, KisSource, KisFuncOption are all relevant parameter types.

B. Method Definitions

Below, we first provide a simple constructor for creating KisFuncConfig.

kis-flow/config/kis_func_config.go

// NewFuncConfig creates a Function strategy configuration object, used to describe a KisFunction information
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
     config := new(KisFuncConfig)
     config.FName = funcName

     if source == nil {
         log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
         return nil
     }

     config.Source = *source
     config.FMode = string(mode)

     // Functions S and L require the KisConnector parameter to be passed, because S and L need to establish a streaming relationship through Connector
     if mode == common.S || mode == common.L {
             if option == nil {
                   log.Logger().ErrorF("Function S/L need option->Cid\n")
                   return nil
             } else if option.CName == "" {
                   log.Logger().ErrorF("Function S/L need option->Cid\n")
                   return nil
             }
       }

      if option != nil {
           config.Option = *option
      }

     return config
}
Enter fullscreen mode Exit fullscreen mode

The code above mentions two enum types, common.S and common.L. These are five types of enum values provided by us for KisFunction, which can be defined in the kis-flow/common/const.go file.

kis-flow/common/const.go

package common

type KisMode string

const (
    // V is for feature verification KisFunction,
    // mainly for data filtering, validation, field sorting, idempotent preprocessing
    V KisMode = "Verify"

    // S is for feature storage KisFunction, 
    // Save will store the data through NsConnector, and the temporary life cycle of the data is NsWindow
    S KisMode = "Save"

    // L is for feature loading KisFunction,
    // Load will load the data through KisConnector, and logically it can be merged with the corresponding S Function
    L KisMode = "Load"

    // C is for feature calculation KisFunction, 
    // Calculate will calculate the data through data in KisFlow, generate new fields, pass the data flow to downstream S for storage, or it can be stored directly through KisConnector
    C KisMode = "Calculate"

    // E is for extending features KisFunction,
    // as a custom feature function for streaming computation, such as, Notify scheduler triggers task message sending, deleting some data, resetting status, etc.
    E KisMode = "Expand"
)
Enter fullscreen mode Exit fullscreen mode

If fmode is Save or Load, it means this function has the behavior of querying the library or storing data, then this Function needs to be associated with a KisConnector, so CName needs to be passed in.

C. Create KisFuncConfig Unit Test

Now, we won't do much method development for KisFuncConfig. We'll first run the existing program to do a unit test to test creating a KisFuncConfig.

kis-flow/test/kis_config_test.go

func TestNewFuncConfig(t *testing.T) {
    source := config.KisSource{
        Name: "TikTokOrder",
        Must: []string{"order_id", "user_id"},
    }

    option := config.KisFuncOption{
        CName:        "connectorName1",
        RetryTimes:   3,
        RetryDuriton: 300,

        Params: config.FParam{
            "param1": "value1",
            "param2": "value2",
        },
    }

    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)

    log.Logger().InfoF("funcName1: %+v\n", myFunc1)
}
Enter fullscreen mode Exit fullscreen mode

We cd to the kis-flow/test/ directory and execute the unit test command:

go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:TikTokOrder Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}

--- PASS: TestNewFuncConfig (0.00s)
Enter fullscreen mode Exit fullscreen mode

Alright, now the basic creation of KisFuncConfig strategy is basically completed.

2.2.2 Definition of KisFlowConfig

The YAML representation of KisFlowConfig in the design document is as follows:

kistype: flow
status: 1
flow_name: MyFlow1
flows:
  - fname: TestPrintInput
    params:
      args1: value1
      args2: value2
  - fname: TestKisFunction_S1
  - fname: TestPrintInput
    params:
      args1: value11
      args2: value22
      default2: newDefault
  - fname: TestPrintInput
  - fname: TestKisFunction_S1
    params:
      my_user_param1: ffffffxxxxxx
  - fname: TestPrintInput
Enter fullscreen mode Exit fullscreen mode

Parameter:

Field Required Meaning Example
kistype Yes Configuration file type "func" --- KisFunction
"flow" --- KisFlow
"conn" --- KisConnection
status Yes Whether the current KisFlow is started 1-Start
0-Not Start
flow_name Yes Name of the current Flow
flows Yes Information about the current Flow
flows:fname Yes Function associated with the current Flow
flows:fname:params No Whether custom parameters are carried when the current Flow is executed to the current Function args1: value1
args2: value2

A. Struct Definition

Next, we define the KisFlow strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_flow_config.go file in the project documentation, where we will implement the required Config definitions.

kis-flow/config/kis_flow_config.go

package config

import "kis-flow/common"

// KisFlowFunctionParam represents the ID of a Function and carries fixed configuration parameters within a Flow configuration
type KisFlowFunctionParam struct {
    FuncName string `yaml:"fname"`  // Required
    Params   FParam `yaml:"params"` // Optional, used to customize fixed configuration parameters for the Function within the current Flow
}

// KisFlowConfig represents an object that spans the entire streaming computing context
type KisFlowConfig struct {
    KisType  string                 `yaml:"kistype"`
    Status   int                    `yaml:"status"`
    FlowName string                 `yaml:"flow_name"`
    Flows    []KisFlowFunctionParam `yaml:"flows"`
}
Enter fullscreen mode Exit fullscreen mode

Here, a new parameter type KisFlowFunctionParam is provided. This represents the default parameters passed to the currently scheduled Function when configuring KisFlow. If not needed, this parameter can be omitted.

B. Method Definitions

We provide a constructor for creating a KisFlowConfig.

kis-flow/config/kis_flow_config.go

// NewFlowConfig creates a Flow strategy configuration object to describe a KisFlow information
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
    config := new(KisFlowConfig)
    config.FlowName = flowName
    config.Flows = make([]KisFlowFunctionParam, 0)

    config.Status = int(enable)

    return config
}

// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
    fConfig.Flows = append(fConfig.Flows, params)
}
Enter fullscreen mode Exit fullscreen mode

Regarding the Function configuration carried by the flow, we dynamically add it through AppendFunctionConfig. This is done in anticipation that KisFlow configuration may be extracted from a database/dynamic remote configuration in the future. Thus, configurations need to be dynamically combined.

C. KisFlowConfig Unit Test

Similarly, we create a simple unit test to test the creation of KisFlowConfig.

kis-flow/test/kis_config_test.go

func TestNewFlowConfig(t *testing.T) {

    flowFuncParams1 := config.KisFlowFunctionParam{
        FuncName: "funcName1",
        Params: config.FParam{
            "flowSetFunParam1": "value1",
            "flowSetFunParam2": "value2",
        },
    }

    flowFuncParams2 := config.KisFlowFunctionParam{
        FuncName: "funcName2",
        Params: config.FParam{
            "default": "value1",
        },
    }

    myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
    myFlow1.AppendFunctionConfig(flowFuncParams1)
    myFlow1.AppendFunctionConfig(flowFuncParams2)

    log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
}
Enter fullscreen mode Exit fullscreen mode

Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:

$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}

--- PASS: TestNewFlowConfig (0.00s)
PASS
ok      kis-flow/test   0.251s
Enter fullscreen mode Exit fullscreen mode

2.2.3 KisConnConfig

The KisConnConfig in the design document is formatted as follows in YAML:

kistype: conn
cname: TestKisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
  args1: value1
  args2: value2
load: null
save:
  - 测试KisFunction_S1
Enter fullscreen mode Exit fullscreen mode

A. Struct Definition

Next, we define the KisConnector strategy configuration struct based on the aforementioned configuration protocol and provide corresponding initialization methods. We create a kis_conn_config.go file in the project documentation, where we will implement the required Config definitions.

kis-flow/config/kis_conn_config.go

package config

import (
    "errors"
    "fmt"
    "kis-flow/common"
)

// KisConnConfig represents the KisConnector strategy configuration
type KisConnConfig struct {
    // Configuration type
    KisType    string            `yaml:"kistype"`
    // Unique descriptor
    CName      string            `yaml:"cname"`
    // Basic storage medium address
    AddrString string            `yaml:"addrs"`
    // Storage medium engine type, such as "Mysql", "Redis", "Kafka", etc.
    Type       common.KisConnType `yaml:"type"`
    // Identifier for a single storage: for example, Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
    Key        string            `yaml:"key"`
    // Custom parameters in the configuration information
    Params     map[string]string `yaml:"params"`
    // Function ID bound to storage reading
    Load       []string          `yaml:"load"`
    Save       []string          `yaml:"save"`
}
Enter fullscreen mode Exit fullscreen mode

B. Method Definitions

kis-flow/config/kis_conn_config.go

// NewConnConfig creates a KisConnector strategy configuration object to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
    strategy := new(KisConnConfig)
    strategy.CName = cName
    strategy.AddrString = addr

    strategy.Type = t
    strategy.Key = key
    strategy.Params = param

    return strategy
}

// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

    switch common.KisMode(fConfig.FMode) {
    case common.S:
        cConfig.Save = append(cConfig.Save, fConfig.FName)
    case common.L:
        cConfig.Load = append(cConfig.Load, fConfig.FName)
    default:
        return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Here, the WithFunc method is also provided to dynamically add the relationship between Conn and Function.

C. KisConnConfig Unit Test

Similarly, we create a simple unit test to test the creation of KisConnConfig.

kis-flow/test/kis_config_test.go

func TestNewConnConfig(t *testing.T) {

    source := config.KisSource{
        Name: "Public Account TikTok Store User Order Data",
        Must: []string{"order_id", "user_id"},
    }

    option := config.KisFuncOption{
        CName:        "connectorName1",
        RetryTimes:   3,
        RetryDuriton: 300,

        Params: config.FParam{
            "param1": "value1",
            "param2": "value2",
        },
    }

    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)

    connParams := config.FParam{
        "param1": "value1",
        "param2": "value2",
    }

    myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)

    if err := myConnector1.WithFunc(myFunc1); err != nil {
        log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
    }

    log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
}
Enter fullscreen mode Exit fullscreen mode

Navigate to the kis-flow/test/ directory and execute the following command to run the unit test:

$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}

--- PASS: TestNewConnConfig (0.00s)
PASS
ok      kis-flow/test   0.481s
Enter fullscreen mode Exit fullscreen mode

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling


Top comments (0)