Happy 2024 RisingWave community! The new year brings the launch of RisingWave v1.6, which is filled with fixes, updates, and new features. There are some breaking changes included in this update, so please take note if you are currently running RisingWave. We will go more in-depth on these changes, as well as new connectors, SQL clauses, and more. Read on to learn more about some of the standout features.
If you are interested in the full list of v1.6 updates, see the release note.
Breaking changes
RisingWave v1.6 includes two breaking changes that will affect the performance of current SQL streaming jobs. Please take note of the following changes:
Correctness of SOME, ALL, ANY
expressions
This update fixes the correctness of the results generated by SOME
, ALL
, ANY
expressions. If you are running any SQL queries that employ one of these expressions, your previous output may have errors. It is also recommended to drop any existing materialized views that include these expressions and recreate them to ensure accurate results.
Data mapping change regarding timestamps for ClickHouse sinks
If you have a sink that sinks timestamp
or timstamptz
data from RisingWave to ClickHouse, note that timestamptz
type in RisingWave can be sinked to DateTime64
type in ClickHouse. However, the timestamp
type in RisingWave cannot be directly sinked. Instead, it must be converted to timestamptz
first before being sinked to ClickHouse.
Sinking into tables
You can sink data from sources and tables in RisingWave into another table in RisingWave using the CREATE SINK ... INTO ...
command. The behavior of this streaming job is the same as creating an external sink and the sinked-to table can maintain the snapshot consistency with the upstream table. The general syntax is outlined below.
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
[FROM sink_from | AS select_query]
Being able to sink data into tables provides you another method to merge data from different sources together into one table. It is more flexible to add or remove a data source to the table using the CREATE SINK ... INTO ...
and DROP SINK ...
statements. The results of the table are still updated continuously as new data is streamed in.
For instance, we can join data from two different Kafka topics into the same target table. Here, we sink data from the Kafka sources orders_s0
and orders_s1
into the table orders
.
CREATE TABLE orders (
id int primary key,
price int,
item_id int,
customer_id int
);
CREATE source orders_s0 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_0',
...
) FORMAT PLAIN ENCODE JSON;
CREATE source orders_s1 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_1',
...
) FORMAT PLAIN ENCODE JSON;
CREATE SINK orders_sink0 FROM orders_s0 INTO orders;
CREATE SINK orders_sink1 FROM orders_s1 INTO orders;
Sink orders_sink0
sinks data from orders_s0
into orders
and sink orders_sink1
sinks data from orders_s1
into orders
. If we no longer want to sink data from orders_s0
into orders
, we can drop the sink.
DROP SINK orders_sink0;
For more details see:
New connectors
Two new connectors are introduced this month, the GCS source connector and the StarRocks sink connector, providing you with more flexibility on how you build your streaming processing ecosystem. If there is a connector that you are interested in, please check out the Integrations page to see what we currently support and vote on what connectors you would like to see next.
GCS source
Google Cloud Storage is a cloud-based object storage service that allows you to store and retrieve data. Like with other source connectors in RisingWave, you can establish a connection between RisingWave and GCS to start consuming data by using the CREATE SOURCE
or CREATE TABLE
command.
CREATE TABLE gcs(
id int,
name varchar
) WITH (
connector = 'gcs',
match_pattern = '%Ring%*.ndjson',
gcs.bucket_name = 'example-source',
gcs.credential = 'xxxxx',
gcs.service_account = 'xxxxx'
) FORMAT PLAIN ENCODE JSON;
StarRocks sink
StarRocks, formerly Palo, is an open-source analytical database well-suited for real-time analytics and interactive query processing. To sink data from RisingWave to StarRocks, use the CREATE SINK
command. Note that the StarRocks sink connector cannot sink struct
and json
types as StarRocks’s stream load does not support these data types.
CREATE SINK starrocks_sink
FROM mv WITH (
connector = 'starrocks',
type = 'append-only',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
force_append_only='true'
);
For more details see:
Shared CDC source enhancements
Shared PostgreSQL source for multiple CDC tables
Last month, RisingWave introduced a more convenient method to create multiple CDC tables using the same MySQL source. This method now also works for PostgreSQL CDC sources, allowing you to easily ingest data from multiple tables located within different schemas in the same database without specifying the database credentials for each source table.
Use the CREATE SOURCE
command to establish a connection with the target database and then use the CREATE TABLE
command to ingest data from individual tables within the database. The database credentials only need to be specified once when establishing the connection, streamlining the process.
The following SQL query creates a CDC table in RisingWave that ingests data from the tt3
table within the public
schema. pg_mydb
is the name of the source created in RisingWave that connects to a PostgreSQL database.
CREATE TABLE tt3 (
v1 integer primary key,
v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';
Multi-table transactions
Multi-table transactions for shared MySQL and PostgreSQL sources are now supported. To enable this, set transactional = true
in the WITH
options when creating the CDC source. This feature atomically imports a multi-table transaction from an upstream database, so materialized views based on those source tables will be updated atomically. The following SQL query creates a PostgreSQL source with transactions enabled.
CREATE SOURCE pg_cdc WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot',
transactional = 'true'
);
For more details see:
SSL related configurations
This version update provides two new ways for you to alter SSL settings during the session, granting you more flexibility.
First, we added the properties.ssl.endpoint.identification.algorithm
parameter for Kafka source and sink, which allows you to bypass the verification of CA certificates. This parameter is worth looking into if you are running into a “SSL handshake failed error” when creating a Kafka source or sink.
We also introduce the RW_SSL_CERT
and RW_SSL_KEY
environment variables, which are used to specify the SSL certificate and the key file location for frontend nodes, respectively.
For more details see:
Temporal filter enhancements
Temporal filters selects or excludes data based on time-related criteria, such as timestamps or date ranges. These filters are invaluable when you want to create small windows for your unbounded data. RisingWave already supports temporal filters, but this update introduces some new features, which allow you to construct more complex temporal filters.
Previously, simple comparisons were supported, only allowing you to compare the data to a specific date or the current time, where NOW()
could not be the upper bound.
Now, temporal filters allow for NOW()
to be the upper bound condition and allows you to connect a temporal filter to a normal condition using the OR
operator. However, connecting multiple temporal filters together using the OR
operator is not supported.
t > NOW() - INTERVAL '1 hour' OR t IS NULL
t + INTERVAL '5' SECOND < NOW()
More temporal filter enhancements are in the works so stay tuned for future releases!
For more details see:
SIMILAR TO
clause
The SIMILAR TO
clause is used to determine if the expression matches the given pattern specified in SQL regular expression by returning true
or false
. Metacharacters can be included within the pattern. Additionally, ESCAPE
characters can be specified at the end of the clause to match a metacharacter literally. Optionally, the NOT
keyword can be included to determine if the specified expression does not match the given pattern. For the full list of supported metacharacters, see the link to the documentation provided below.
The general syntax is as follows.
expression [ NOT ] SIMILAR TO sql_regex_pattern [ ESCAPE <esc_text> ]
Here is an example of how SIMILAR TO
works. We check if the string “abc” matches the given SQL regular expression.
'abc' SIMILAR TO '(a|b|c)+' -> true
The SIMILAR TO
clause provides you the ability to look for specific strings, allowing for more complex and nuanced querying. You can also ensure the validity of your data by checking if they match a specific pattern, as in the case of emails, phone numbers, and more.
For more details see:
-
SIMILAR TO
pattern matching expressions
CONCLUSION
These are just some of the new features included with the release of RisingWave v1.6. To see the entire list of updates, which includes more SQL features, connector updates, including changes to the Elasticsearch sink connector, please refer to the detailed release notes.
Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases.
Top comments (0)