Tianshuo Shi｜Software Engineer at RisingWave Labs
「Announcement: Introducing RisingWave Tutorials — A Must-Read Handbook for Stream Processing Enthusiasts 」
In the "Deep Dive into the RisingWave Stream Processing Engine" series, we aim to share various aspects of the RisingWave stream processing engine from top to bottom through a series of articles, gradually delving into each module's technical principles and implementation details. We'll discuss the design philosophy behind creating this stream processing engine from scratch and dive deeper into the technical intricacies of each module.
Stream processing is still a relatively young field, and many best practices in design and engineering have yet to be standardized. We are eager to engage in deeper discussions with enthusiasts, researchers, and users interested in stream processing. We welcome discussions in our Slack channel and GitHub repo to collectively explore the future of stream processing technology.
So let's get started!
In simple terms, RisingWave's stream processing engine supports running long-running distributed computing tasks. It continuously calculates the changes in the result table, which contains the result of an SQL query on a series of source tables, with the changes in the source tables.
This might sound a bit abstract, so let's illustrate it with an example.
In the following SQL, we define a table called "Stories" to maintain detailed information about articles, and users’ like behavior generates an append-only table called "Votes."
CREATE TABLE Stories (id int, author int, title text, url text, PRIMARY KEY id);
CREATE TABLE Votes (user int, story_id int);
Based on these two tables, we can use SQL statements to define a materialized view (MView). For example, the MView
StoriesWithVC maintains the number of likes for each article.
CREATE MATERIALIZED VIEW StoriesWithVC AS
SELECT id, author, title, url, vcount
SELECT story_id, COUNT(DISTINCT user_id) AS vcount
FROM votes GROUP BY story_id
WHERE VoteCount.story_id = stories.id;
Furthermore, we can define new MViews on
CREATE MATERIALIZED VIEW Top10VotedStories AS
SELECT * FROM StoriesWithVC
ORDER BY vcount DESC
In this example, RisingWave needs to efficiently maintain the contents of the MView and, since it supports cascading MView on MView, it also needs to calculate the changes in the MView itself. Maintaining the real-time results of the MView is straightforward, so RisingWave's stream processing engine only needs to calculate the downstream table's corresponding changes based on the changes in the upstream table. As shown in the diagram, when the source tables
Votes receive changes, the stream processing engine calculates the changes in the downstream MView
StoriesWithVC and MView
It is evident that both input and output of RisingWave's stream processing engine are data changes on relational tables, with well-defined schemas. In fact, the internal operators also use relational changes as the input and output. This is a result of RisingWave's positioning as a streaming database using a standard SQL interface to support users in creating stream processing jobs. From day one, RisingWave's stream processing was designed to handle SQL queries and relational data workloads. This is different from some other stream processing systems that emerged from the big data era. They often initially build a generic stream processing framework and then build an SQL engine based on that framework to provide an SQL interface.
The SQL-native model of computation allows us to set the problem boundaries of RisingWave in stream processing well, allowing us to build the entire stream processing engine with a Top-Down approach. In the designing of SQL features, it is usually started with the most general SQL operators. Then, specialization and optimization are done for different scenarios to achieve optimal performance for specific workloads. This design philosophy ensures that while RisingWave's stream processing model is general, it can also achieve excellent performance for various specific scenarios.
It is worth mentioning that during development, we found that this specialization can be distributed at different levels:
Runtime adaptation of executors: We implemented an LRU-based state cache in the operators, ensuring that when the state is small and memory is abundant, all states can exist in memory.
Optimizer specialization in generating plans: For example, we specialized operators for append-only streams or streams with watermarks, significantly optimizing performance.
Configuration parameters: For instance, users can balance data freshness and execution cost by converging them to a few configuration parameters like
Apart from this, some core features run through the design consistently. We consider these features as first-class citizens of the RisingWave stream processing engine and maintain these features consistently in every design and implementation.
RisingWave is a distributed system capable of processing data on a large scale in parallel. The RisingWave stream processing engine can make full use of multi-node and multi-core computing resources. The generated execution plan avoids single-point calculations as much as possible, distributing calculations to multiple nodes to reduce potential single-point bottlenecks in the system.
RisingWave, as a cloud-native stream database, supports dynamic scaling of computing resources based on parallel processing. Since stream processing tasks are long-running, to make dynamic scaling of clusters effective, the stream processing engine must support online scheduling and migration of tasks. This poses significant challenges in terms of design abstractions for state storage, data partitioning, and more. The RisingWave stream processing engine exposes scheduling and migration interfaces to allow external control components to easily migrate and schedule computing tasks.
Fault tolerance is a fundamental capability that modern stream processing systems must have. Stream processing systems typically use a checkpoint mechanism to persist computing states, achieving exactly once semantics within the system. Simultaneously, it is necessary to minimize or reduce the resource preemption caused by the checkpoint process, which affects foreground stream processing tasks. RisingWave has its own state storage engine and implements asynchronous checkpointing throughout the entire process, making the checkpoint process almost invisible to foreground tasks.
As a stream database, RisingWave's internal objects, such as MViews and tables, need to ensure the consistency of computing results between them. To be more specific, when a user queries different MViews at the same moment, they should be based on the same version of the source table. This requires that changes in the upstream table and corresponding changes in the downstream table be atomically committed in the same write transaction.
This means that every stream operator cannot easily delay data or do batching. It should promptly dispatch changes when receiving upstream data. Readers familiar with stream processing may know that stream operators can provide better performance through batching. We have made two optimizations for this: first, we introduced the concept of an "epoch," which allows multiple changes on the source table within one epoch. It is equivalent to enlarging the size of the aforementioned write transaction, allowing operators to batch within that transaction. Second, for the common use case of stream computing, we designed the query semantics of "EMIT ON WINDOW CLOSE" for batching with watermark RFC: The Semantics of EMIT ON WINDOW CLOSE by fuyufjh · Pull Request #30 · risingwavelabs/rfcs (github.com), allowing users to declare the consistency semantics they desire between MViews and the upstream.
The overall architecture is shown in the diagram below. The leftmost Frontend node connects with users via pgwire and is responsible for converting DDL SQL statements into stream processing execution plans. It optimizes the plans in the optimizer and sends the execution plans to the central Meta node for registration. Meta node persists the execution plans in a globally consistent and highly available MetaStore and sends computation tasks to computing nodes based on the execution plans. Throughout the entire lifecycle of a streaming job, the Meta node can send control instructions to computing nodes through RPC to implement task migration, scheduling, checkpoint, and failover. External systems or users can also directly control and schedule computing workloads on computing nodes through the interfaces exposed by the Meta node.
Starting from practical scenarios, this article introduced the use cases of the RisingWave stream processing engine, described the design philosophy and core features it relies on in architecture design, and provided an overview of the entire stream processing engine’s architecture.
About RisingWave Labs
RisingWave is an open-source distributed SQL database for stream processing. It is designed to reduce the complexity and cost of building real-time applications. RisingWave offers users a PostgreSQL-like experience specifically tailored for distributed stream processing.
Official Website: https://www.risingwave.com/