Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
In this chapter, we will design the Connector module for KisFlow, whose main function and purpose are mainly to mount the logic of third-party storage engines under a specific Function.
5.1 Connector Definition
KisFlow provides the Connector to allow developers to define a custom read/write plugin mode for third-party storage engines. If data in the data flow needs to be temporarily read from or stored to a specific storage engine, developers can write corresponding read/write logic through Connector and mount it on a specific Function in the Flow through configuration. Connectors are also flexibly configurable. This allows storage algorithms with the same logic to be reused in multiple Functions.
5.1.1 Connector Abstraction Layer Definition
Create a connector.go
file in kis-flow/kis/
to define the Connector's abstract interface as follows:
kis-flow/kis/connector.go
package kis
import (
"context"
"kis-flow/config"
)
type Connector interface {
// Init initializes the connection associated with the Connector.
Init() error
// Call invokes the read/write operations of the external storage logic attached to the Connector.
Call(ctx context.Context, flow Flow, args interface{}) error
// GetId gets the ID of the Connector.
GetId() string
// GetName gets the name of the Connector.
GetName() string
}
At this stage, the main interfaces provided by Connector are:
-
Init()
: Mainly for the initialization logic of the third-party storage engine associated with the currentConnector
, such as creatingconnections
.Init()
will only be executed once in the lifecycle of theConnector
instance. -
Call()
: The entry point forConnector
dispatching, where the customread/write
logic of the related storage is triggered through theCall()
method. The specific callback function prototype is defined in theRouter
module.
5.1.2 Connector Related Routing Member Type Definitions
Based on the above interfaces, a Connector instance needs to configure two custom methods, one called through Init()
interface, and the other called through Call()
interface. Below are the definitions for these two types of callback prototypes.
(1) Connector Init
kis-flow/kis/router.go
/*
Connector Init
*/
// ConnInit is the callback function prototype for initializing the third-party mounted storage.
type ConnInit func(conn Connector) error
// connInitRouter is the router managing ConnInit callbacks.
// key: ConnName
type connInitRouter map[string]ConnInit
ConnInit
is the prototype for initialization callback functions, with the Connector
instance pointer as the parameter.
connInitRouter
is the router managing ConnInit
callbacks, where the key is ConnName.
(2) Connector Call
kis-flow/kis/router.go
/*
Connector Call
*/
// CaaS is the callback function prototype for implementing storage read/write operations in the Connector.
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
// connFuncRouter is the router indexing CaaS callback mappings by FunctionName.
// key: Function Name
// value: CaaS callback for storage read/write operations in the Connector
type connFuncRouter map[string]CaaS
CaaS
is the prototype for Connector's storage read/write logic callback functions, with parameters Connector, Function, and Flow pointers. Through these three instances, developers can obtain some parameters required by the business. The last parameter is for custom input by the developer during the Function dispatching of the Connector Call.connFuncRouter
is the router indexingCaaS
callbacks, where the key is the Function Name to which the current Connector is mounted. Therefore, the dispatch of Connector Call must be initiated by Function.
As Connectors
can only be mounted on Functions in Save
or Load
mode, for future convenience in statistics or indexing, it is also necessary to group connFuncRouter into Save and Load groups. Below are the type definitions for the mappings of these two groups.
kis-flow/kis/router.go
// connSL divides connFuncRouter into two sub-trees based on KisMode
// key: Function KisMode S/L
// value: NsConnRouter
type connSL map[common.KisMode]connFuncRouter
// connTree
// key: Connector Name
// value: connSL secondary tree
type connTree map[string]connSL
-
connSL
divides theconnFuncRouter
into two groups based onKisMode
, with members being the previously defined Connector's Call routers. -
connTree
indexes the secondary treeconnSL
by Connector Name. By combining Connector Name + Function Mode + Function Name, the Connector Call function to be dispatched can be determined.
5.2 Connector Routing Management
In the previous section, we defined the types of routing management needed for Connector. Now, we need to manage the addition and scheduling of these routes. Connector's routing management is unified under the KisPool module, just like Function.
5.2.1 KisPool Adds Connector-Related Routing Management Members
(1) Adding Members
kis-flow/kis/pool.go
// kisPool is used to manage all Function and Flow configurations.
type kisPool struct {
fnRouter funcRouter // All Function management routes
fnLock sync.RWMutex // Lock for fnRouter
flowRouter flowRouter // All flow objects
flowLock sync.RWMutex // Lock for flowRouter
// +++++++++++++++++
cInitRouter connInitRouter // All Connector initialization routes
ciLock sync.RWMutex // Lock for cInitRouter
cTree connTree // All Connector management routes
connectors map[string]Connector // All Connector objects
cLock sync.RWMutex // Lock for cTree
// +++++++++++++++++
}
(2) Initialization of Related Map Variables
kis-flow/kis/pool.go
// Pool is the singleton constructor.
func Pool() *kisPool {
_poolOnce.Do(func() {
// Create a kisPool object
_pool = new(kisPool)
// Initialize fnRouter
_pool.fnRouter = make(funcRouter)
// Initialize flowRouter
_pool.flowRouter = make(flowRouter)
// +++++++++++++++++++++++++
// Initialize cTree
_pool.cTree = make(connTree)
_pool.cInitRouter = make(connInitRouter)
_pool.connectors = make(map[string]Connector)
// +++++++++++++++++++++++++
})
return _pool
}
(3) Register Connector Init Method
kis-flow/kis/pool.go
// Register Connector initialization business.
func (pool *kisPool) RegisterConnectorInit(cname string, c ConnInit) {
pool.ciLock.Lock() // Write lock
defer pool.ciLock.Unlock()
if _, ok := pool.cInitRouter[cname]; !ok {
pool.cInitRouter[cname] = c
} else {
errString := fmt.Sprintf("KisPool Reg Connector Init Repeat CName=%s\n", cname)
panic(errString)
}
log.Logger().InfoF("Add KisPool Connector Init CName=%s", cname)
}
(4) Execute Connector Init Method
kis-flow/kis/pool.go
// Dispatch Connector Init
func (pool *kisPool) DispatchConnectorInit(conn Connector) error {
pool.ciLock.RLock() // Read lock
defer pool.ciLock.RUnlock()
init, ok := pool.cInitRouter[conn.GetName()]
if !ok {
panic(errors.New(fmt.Sprintf("init connector cname = %s not registered..", conn.GetName())))
}
return init(conn)
}
The logic is straightforward: acquire a lock for protection, and then add key/value pairs. Since this is a routing action, if adding fails, it will panic() and exit the process.
(5) Register Connector Call Method
kis-flow/kis/pool.go
// Register Connector Call business.
func (pool *kisPool) RegisterConnectorCall(cname string, fname string, mode common.KisMode, c CaaS) {
pool.cLock.Lock() // Write lock
defer pool.cLock.Unlock()
if _, ok := pool.cTree[cname]; !ok {
//cid First registration, does not exist, create a secondary tree NsConnSL
pool.cTree[cname] = make(connSL)
// Initialize various FunctionMode
pool.cTree[cname][common.S] = make(connFuncRouter)
pool.cTree[cname][common.L] = make(connFuncRouter)
}
if _, ok := pool.cTree[cname][mode][fname]; !ok {
pool.cTree[cname][mode][fname] = c
} else {
errString := fmt.Sprintf("Connector Call Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
panic(errString)
}
log.Logger().InfoF("Add KisPool Connector Call CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}
The RegisterConnectorCall
method is used to register the logic processing callback function of your own Connector connector. When registering, it will add the function to the corresponding routing group based on the passed parameters.
(6) Execute Connector Call Method
kis-flow/kis/pool.go
// Dispatch Connector Call
func (pool *kisPool) DispatchConnectorCall(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
fn := flow.GetThisFunction()
fnConf := fn.GetConfig()
mode := common.KisMode(fnConf.FMode)
if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
return callback(ctx, conn, fn, flow, args)
}
log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)
return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}
DispatchConnectorCall
is used to index and find the corresponding CaaS function based on ConnectorName, Function Mode, and Function Name, and then execute it.
5.3 KisConnector
Next, let's define and implement the KisConnector module according to the abstraction layer of Connector. Create a kis_connector.go
file in the kis-flow/conn/
directory.
5.3.1 Definition of KisConnector
package conn
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
"sync"
)
type KisConnector struct {
// Connector ID
CId string
// Connector Name
CName string
// Connector Config
Conf *config.KisConnConfig
// Connector Init
onceInit sync.Once
}
In addition to identifying the instance with a KisID
and a name, KisConnector
also includes the configuration information KisConnConfig
for the current KisConnector
. It contains a sync.Once
member, which is used to control the execution of Connector Init
only once in its lifecycle, as will be discussed later.
5.3.1 Constructor Method for KisConnector
// NewKisConnector creates a KisConnector based on the provided configuration.
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnnector)
conn.CName = config.CName
conn.Conf = config
return conn
}
Creating a KisConnector instance requires having KisConnConfig configuration information.
5.3.2 Implementation of Connector Interface
// Init initializes the storage engine connection associated with the Connector.
func (conn *KisConnector) Init() error {
var err error
// Ensure Init is executed only once for a Connector.
conn.onceInit.Do(func() {
err = kis.Pool().DispatchConnectorInit(conn)
})
return err
}
// Call invokes the read/write operations of the external storage logic for the Connector.
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
if err := kis.Pool().DispatchConnectorCall(ctx, flow, conn, args); err != nil {
return err
}
return nil
}
func (conn *KisConnector) GetName() string {
return conn.CName
}
func (conn *KisConnector) GetConfig() *config.KisConnConfig {
return conn.Conf
}
func (conn *KisConnector) GetId() string {
return conn.CId
}
- The
Init()
method ensures that it is executed only once using sync.Once, and ultimately it is routed and scheduled through KisPool. - Similarly,
Call()
is scheduled through KisPool.
So, when will Init()
and Call()
of KisConnector be called? Next, we need to implement KisConnConfig to associate the hierarchical relationship of Connector with Function and Flow.
5.4 KisConnConfig Configuration
In the NewKisConnector()
function, the parameter is KisConnConfig
, so developers need to create a KisConnConfig, which contains the configuration information for a Connector instance. In version 0.1, we have already implemented the definition and creation of the KisConnConfig module. The code is as follows:
kis-flow/config/kis_conn_config.go
package config
import (
"errors"
"fmt"
"kis-flow/common"
)
// KisConnConfig describes the configuration for a KisConnector
type KisConnConfig struct {
// Configuration type
KisType string `yaml:"kistype"`
// Unique identifier
CName string `yaml:"cname"`
// Base storage medium address
AddrString string `yaml:"addrs"`
// Storage medium engine type: "Mysql", "Redis", "Kafka", etc.
Type common.KisConnType `yaml:"type"`
// Identifier for a single storage: e.g., Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Key string `yaml:"key"`
// Custom parameters in the configuration information
Params map[string]string `yaml:"params"`
// NsFuncionID bound to storage reading and writing
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}
// NewConnConfig creates a KisConnector strategy configuration object to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}
// WithFunc binds Connector with Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.FName)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
}
return nil
}
5.4.1 Adding KisConnConfig Member to KisFuncConfig
First, we add a KisConnConfig member to KisFuncConfig.
kis-flow/config/kis_func_config.go
// KisFuncConfig represents a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
FMode string `yaml:"fmode"`
Source KisSource `yaml:"source"`
Option KisFuncOption `yaml:"option"`
// ++++++++++
connConf *KisConnConfig
}
Then, we provide methods to add and retrieve information about this member.
kis-flow/config/kis_func_config.go
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
if cConf == nil {
return errors.New("KisConnConfig is nil")
}
// Function needs to be associated with Connector
fConf.connConf = cConf
// Connector needs to be associated with Function
_ = cConf.WithFunc(fConf)
return nil
}
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
if fConf.connConf == nil {
return nil, errors.New("KisFuncConfig.connConf not set")
}
return fConf.connConf, nil
}
This way, we can retrieve the associated Connector configuration information through the Function's configuration information.
5.5 Function/Flow and Connector Association
5.5.1 Function and Connector Association
kis-flow/kis/function.go
package kis
import (
"context"
"kis-flow/config"
)
// Function represents the basic calculation unit in stream computing. A KisFunction is a basic calculation logic unit in stream computing,
// and any number of KisFunctions can be combined into a KisFlow.
type Function interface {
// Call executes the stream computing logic
Call(ctx context.Context, flow Flow) error
// SetConfig configures the current Function instance
SetConfig(s *config.KisFuncConfig) error
// GetConfig gets the configuration of the current Function instance
GetConfig() *config.KisFuncConfig
// SetFlow sets the Flow instance that the current Function instance depends on
SetFlow(f Flow) error
// GetFlow gets the Flow that the current Function instance depends on
GetFlow() Flow
// ++++++++++++++++++++
// AddConnector adds a Connector to the current Function instance
AddConnector(conn Connector) error
// GetConnector gets the Connector associated with the current Function instance
GetConnector() Connector
// ++++++++++++++++++++
// CreateId generates a random instance KisID for the current Function instance
CreateId()
// GetId gets the FID of the current Function
GetId() string
// GetPrevId gets the FID of the previous Function node on the current Function
GetPrevId() string
// GetNextId gets the FID of the next Function node on the current Function
GetNextId() string
// Next returns the next layer of calculation stream Function, returns nil if the current layer is the last layer
Next() Function
// Prev returns the previous layer of calculation stream Function, returns nil if the current layer is the first layer
Prev() Function
// SetN sets the next Function instance
SetN(f Function)
// SetP sets the previous Function instance
SetP(f Function)
}
Next, we implement this in BaseFunction
:
type BaseFunction struct {
// Id, the instance ID of KisFunction, used to distinguish different instance objects internally in KisFlow
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow // Context environment KisFlow
// ++++++++++++++
// connector
connector kis.Connector
// ++++++++++++++
// link
N kis.Function // Next stream computing Function
P kis.Function // Previous stream computing Function
}
// ........
// ........
// AddConnector adds a Connector to the current Function instance
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
if conn == nil {
return errors.New("conn is nil")
}
base.connector = conn
return nil
}
// GetConnector gets the Connector associated with the current Function instance
func (base *BaseFunction) GetConnector() kis.Connector {
return base.connector
}
This allows a Function instance to obtain information about the Connector instance.
5.5.2 Flow and Connector Association
Similarly, Flow also needs to obtain information about the Connector. This requires a simple association between Flow and Connector.
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run schedules the Flow, sequentially schedules the Functions in the Flow and executes them
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration in the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// Input gets the input source data of the currently executing Function in the Flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function
// +++++++++++++++++++++++++++++++++
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector of the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// +++++++++++++++++++++++++++++++++
}
We add GetConnector()
and GetConnConf()
methods to obtain the Connector instance and Connector configuration. Then, we implement these methods in KisFlow.
// GetConnector gets the Connector of the currently executing Function
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil {
return conn, nil
} else {
return nil, errors.New("GetConnector(): Connector is nil")
}
}
// GetConnConf gets the configuration of the Connector of the currently executing Function
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil {
return conn.GetConfig(), nil
} else {
return nil, errors.New("GetConnConf(): Connector is nil")
}
}
This allows us to retrieve the associated Connector configuration information through the Function's configuration information.
5.5.3 Linking Function to Connector
According to the previous configuration file definition, the function's YAML configuration file is as follows:
nstype: func
fname: TestFunction_L1
fmode: Load
source:
name: testSource
must:
- stuid
- classid
option:
cname: Test-NsConnector_2
Here, there is an Option
, with one of its members cname
. If the current Function is configured with a Connector, then the cname should be configured in the current Option, and the name of the Connector cname should be filled in.
When a Flow is linked to a Function, after the Function instance is created, if the Function carries a Connector, a Connector instance also needs to be created. This can be achieved through the Function's configuration information.
kis-flow/flow/kis_flow.go
// Link connects the Function to the Flow
// fConf: Current Function strategy
// fParams: Dynamic parameters carried by the current Flow
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
// Create Function instance
f := function.NewKisFunction(flow, fConf)
// ++++++++++++++++++++++++++++++
if fConf.Option.CName != "" {
// The current Function is associated with a Connector and needs to initialize the Connector instance
// Get Connector configuration
connConfig, err := fConf.GetConnConfig()
if err != nil {
panic(err)
}
// Create Connector object
connector := conn.NewKisConnector(connConfig)
// Initialize the Connector, execute the Connector Init method
if err = connector.Init(); err != nil {
panic(err)
}
// Associate the Function instance with the Connector instance
_ = f.AddConnector(connector)
}
// ++++++++++++++++++++++++++++++
// Flow adds Function
if err := flow.appendFunc(f, fParams); err != nil {
return err
}
return nil
}
This creates a Connector instance.
5.6 KisConnector Unit Testing
Next, let's perform unit testing on KisConnector.
5.6.1 Unit Testing
Create the kis-flow/test/kis_connector_test.go
file:
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestNewKisConnector(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback business
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback business
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Create 3 KisFunction configuration instances, where myFuncConfig2 has Connector configuration
source1 := config.KisSource{
Name: "Public account Douyin mall customer order data",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "User order error rate",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
option := config.KisFuncOption{
CName: "ConnName1",
}
myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
if myFuncConfig2 == nil {
panic("myFuncConfig2 is nil")
}
myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
if myFuncConfig3 == nil {
panic("myFuncConfig3 is nil")
}
// 2. Create a KisConnector configuration instance
myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
if myConnConfig1 == nil {
panic("myConnConfig1 is nil")
}
// 3. Bind the KisConnector configuration instance to the KisFunction configuration instance
_ = myFuncConfig2.AddConnConfig(myConnConfig1)
// 4. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 5. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 6. Concatenate Functions to Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig3, nil); err != nil {
panic(err)
}
// 7. Submit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 8. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Note that funcName2 is the Function associated with the Connector. So when creating the Config for Function2, Option information is provided, and the name of the associated Connector is provided.
5.6.2 Function Callback and Connector Callback
For easy management of Callback business, we create the kis-flow/test/faas/
and kis-flow/test/caas/
directories under kis-flow/test/
.
Create a file in each directory, with each file containing one type of custom business.
├── caas
│ ├── caas_demo1.go
│ └── caas_init1.go
├── faas
│ ├── faas_demo1.go
│ ├── faas_demo2.go
│ └── faas_demo3.go
(1) Callback business for FuncName1
kis-flow/test/faas/faas_demo1.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
for index, row := range flow.Input() {
// Print data
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Submit result data
_ = flow.CommitRow(resultStr)
}
return nil
}
This serves as our first Function, printing data and generating some more data.
allback business for FuncName2
kis-flow/test/faas/faas_demo2.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/log"
)
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName2Handler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
conn, err := flow.GetConnector()
if err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
return err
}
if conn.Call(ctx, flow, row) != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Submit result data
_ = flow.CommitRow(resultStr)
}
return nil
}
FuncName2
is a business associated with Connector
. You can get the Connector
instance through flow.GetConnector()
, and then execute the business logic by executing conn.Call()
.
(3) Callback business for FuncName3
kis-flow/test/faas/faas_demo3.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
(4) Init method for ConnName1
kis-flow/test/caas/caas_init1.go
package caas
import (
"fmt"
"kis-flow/kis"
)
func InitConnDemo1(connector kis.Connector) error {
fmt.Println("===> Call Connector InitDemo1")
//config info
connConf := connector.GetConfig()
fmt.Println(connConf)
// init connector, such as initializing database connection, etc.
return nil
}
(5) Callback business for ConnName1
kis-flow/test/caas/caas_demo1.go
package caas
import (
"context"
"fmt"
"kis-flow/kis"
)
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
return nil
}
5.6.3 Running
Navigate to kis-flow/test/
directory, and execute the command:
go test -test.v -test.paniconexit0 -test.run TestNewKisConnector
Results:
=== RUN TestNewKisConnector
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{ ConnName1 0.0.0.0:9998 redis redis-key map[] [] [funcName2]}
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionC, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c240 ThisFunctionId:func-f594da0e28da417db6b15ce9c9530f84 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c2a0 ThisFunctionId:func-f0b4bebf87614828a9375d888c54d13b PrevFunctionId:func-f594da0e28da417db6b15ce9c9530f84 funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionE, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c300 ThisFunctionId:func-66e2b0afa4e14d179aa94c357c412cf8 PrevFunctionId:func-f0b4bebf87614828a9375d888c54d13b funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 2
--- PASS: TestNewKisConnector (0.00s)
PASS
Upon careful examination of the logs, it is evident that the Connector's initialization was executed. Furthermore, the Connector was also synchronously executed during the execution of FunctionName2, with logging output, which aligns with our expectations.
5.7 [V0.4] Source Code
You can find the source code for version 0.4 at the following link:
https://github.com/aceld/kis-flow
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)