As a common data type, Bitmap is used for efficiently compressing large amounts of binary data for storage and fast querying. Apache Doris supports bitmap data type.
We got lots of inquiries about how to write bitmap to Apache Doris when using the Flink-Doris-Connector so we provide this guide for the Doris users.
Prerequisites
- Doris 2.0.1 environment
- Flink 1.16, and put the Flink-Doris-Connector Jar package under /lib
- Create Doris table
CREATE TABLE `page_view_bitmap` (
`dt` int,
`page` varchar(256),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
Write bitmap data
Flink reads data from MySQL and writes it to Doris, and stores user_id into bitmap.
Data
#Create MySQL table
CREATE TABLE `page_view` (
`id` int NOT NULL,
`dt` int,
`page` varchar(256),
`user_id` int,
PRIMARY KEY (`id`)
);
#Data
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
FlinkSQL writes into bitmap
#Read data from MySQL using JDBC
CREATE TABLE page_view (
`dt` int,
`page` string,
`user_id` int
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/test',
'table-name' = 'page_view',
'username' = 'root',
'password' = '123456'
);
#Write data using Doris connector
CREATE TABLE page_view_bitmap (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.page_view_bitmap',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label1',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
);
insert into page_view_bitmap select * from page_view
The implementation of the Flink-Doris-Connector Sink is based on the Stream Load functionality, so the Stream Load functionality is applicable within the Connector. The relevant parameters are encapsulated within the "sink.properties".
In the provided example, the Columns
parameter is included within the With
attribute. In this configuration, a column conversion operation is defined, where the user_id
is converted using the to_bitmap
function and then imported into the Doris table.
Query
mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt | page | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home | 1001,1002 |
| 20230921 | search | 1003 |
| 20230922 | home | 1002 |
| 20230922 | mine | 1001 |
+----------+--------+---------------------------+
4 rows in set (0.00 sec)
Flink DataStream
Use the DataStream API to simulate writing data to the foregoing table.
The DataStream API operates on the Bitmap in the same way as the SQL does above.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("format", "csv");
properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.page_view_bitmap")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("doris_label")
.setStreamLoadProp(properties)
.setDeletable(false);
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build());
//mock data
DataStreamSource<String> stringDataStreamSource = env.fromCollection(
Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));
stringDataStreamSource.sinkTo(builder.build());
env.execute("doris bitmap write");
}
Query
mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt | page | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home | 1001,1002,1003 |
| 20230921 | search | 1001,1003 |
| 20230922 | home | 1002 |
| 20230922 | mine | 1001 |
| 20230923 | home | 1001 |
+----------+--------+---------------------------+
5 rows in set (0.00 sec)
Top comments (0)