Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Twitter: https://twitter.com/Aceld_
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
1.1 Why KisFlow is Needed
In many large-scale B2B enterprise projects, there is a significant need for business data, much of which requires real-time streaming computing capabilities. However, many companies are not yet capable of supporting a data warehouse solution like Flink + Hadoop/HBase, etc. Yet, the demand for real-time computing of business data persists, leading most enterprises to delegate this computational work to business engineers.
However, directly querying the business database can indirectly affect business capabilities, and using scheduled tasks/scripts for periodic calculations is not a good solution. I have personally experienced a large-scale system with as many as a thousand business data fields that needed to be computed. Early on, due to poor planning, there were over 1000 scripts running on a schedule, which eventually led to mutual interference between the scripts, causing inaccuracies in the data. This frequently resulted in data reporting errors in business data. For example, consider a scenario where the correct value for a business calculation field is 100, but due to the complexity of the legacy code, multiple scripts are attempting to patch and correct this value. These conflicting scripts can cause data value fluctuations within a certain time interval, leading to temporary inaccuracies in the business data. Although one patch may eventually correct the value, during this period, the business data is incorrect, which can be very frustrating for users.
KisFlow is designed to address scenarios where enterprises lack the computational capabilities of a data warehouse platform but still require real-time data processing. It enables business engineers to engage in stream computing tasks and reuse common and general computational logic.
1.2 Key Capabilities Supported by KisFlow
Stream Computing
- Distributed batch consumption capabilities (based on upstream ODS consumption configurations: such as Binlog, Kafka, etc.)
- Stateful Function capability, allowing the splicing of stateful stream computing nodes and horizontal and vertical scaling of stream computing.
- Data stream monitoring and repair capabilities, including consumer service monitoring.
- Multi-stream splicing and third-party middleware storage plug-in support.
Distributed Task Scheduling
- Distributed scheduled task scheduling, log monitoring, and task scheduling status.
- Visualized scheduling platform.
1.3 KisFlow System Positioning
KisFlow serves as the business upstream computing layer. It interfaces with the data warehouse/other business-side ODS layers upstream and connects to the local business storage data center downstream.
Levels | Level Explanation | Sub-modules |
---|---|---|
Flowing Computation Layer | The upstream computing layer for KisFlow, which directly connects to business storage and the ODS (Operational Data Store) layer of data warehouses. The upstream can be MySQL Binlog, logs, interface data, etc., and it supports a passive consumption mode, providing KisFlow with real-time computing capabilities. |
KisFlow: Distributed batch consumer; a KisFlow is composed of multiple KisFunctions. KisConnectors: Computing data stream intermediate state persistence and connectors. KisFunctions: Supports operator expression splicing, connector integration, strategy configuration, Stateful Function mode, and Slink stream splicing. KisConfig: Binding of flow processing policies for KisFunctions, allowing Functions to have fixed independent processing capabilities. KisSource: Interface for connecting to ODS data sources. |
Task Scheduling Layer | Timed task scheduling and execution business logic, including task scheduling platform, executor management, scheduling logs, and user management. Provides KisFlow's timed task, statistics, and aggregation calculation capabilities. |
The task scheduling platform has a visual interface.:ncludes running reports, scheduling reports, success rate, task management, configuration management, and GLUE IDE as visual management platforms. Executor management KisJobs: Golang SDK, custom business logic, executor automatic registration, task triggering, termination, and removal. Executor scenarios KisScenes: Logical task sets divided according to business needs. Scheduling logs and user management: Collection of task scheduling logs, detailed scheduling, and scheduling process traces. |
A KisFlow can be composed of any KisFunction(s), and the length of a KisFlow can be dynamically adjusted.
A KisFunction can be dynamically added to a specific KisFlow at any time, and the relationship between KisFlows can be dynamically adjusted through the addition of KisFunction's Load and Save nodes for parallel and branching actions.
In programming behavior, KisFlow has shifted from data business programming to function-based single computing logic development, approaching the FaaS (Function as a Service) system.
1.4 Quick Start Use KisFlow
KisFlow Github: https://github.com/aceld/kis-flow
Case Source Code: https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config
Project
├── Makefile
├── conf
│ ├── flow-CalStuAvgScore.yml
│ ├── func-AvgStuScore.yml
│ └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Flow
Config
(1) Flow Config
conf/flow-CalStuAvgScore.yml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
(2) Function1 Config
conf/func-AvgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: StudentScore
must:
- stu_id
(3) Function2(Slink) Config
conf/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: StudentScore
must:
- stu_id
Main
main.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
)
func main() {
ctx := context.Background()
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
Function1
faas_stu_score_avg.go
package main
import (
"context"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
out := AvgStuScoreOut{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Submit the result data
_ = flow.CommitRow(out)
}
return nil
}
Function2
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
OutPut
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)