DEV Community

Apache Doris
Apache Doris

Posted on

How to import data from Apache Pulsar into Apache Doris quickly and seamlessly

KoP is short for Kafka on Pulsar, and as the name implies, it is how to read and write Kafka data on Pulsar. KoP brings the Kafka Protocol Processing Plugin to the Pulsar Broker to make Apache Pulsar compatible with the Apache Kafka protocol. By adding the KoP protocol processing plugin to an existing Pulsar cluster, users can migrate existing Kafka applications and services to Pulsar without modifying the code.

The key features of Apache Pulsar are as follows:

  • Streamline operations with enterprise-class multi-tenancy features.

  • Avoid data relocation and simplify operations.

  • Persistently retain event streams with Apache BookKeeper and tiered storage.

  • Leverage Pulsar Functions for serverless event processing.

The KoP architecture is shown in the following diagram, which shows that KoP introduces a new protocol processing plugin that leverages existing components of Pulsar (e.g. Topic discovery, distributed logging repository-ManagedLedger, cursor, etc.) to implement the Kafka transport protocol.

Routine Load Subscribing to Pulsar Data

Apache Doris Routine Load supports accessing Kafka data to Apache Doris and guarantees transactional operations during data access. Apache Pulsar is positioned as a cloud-native era enterprise messaging publishing and subscription system that is already in use by many online services. So how do Apache Pulsar users access data to Apache Doris? The answer is through KoP.

Since Kop provides Kafka compatibility directly in Pulsar, so Plusar can be used like Kafka for Apache Doris, and the whole process can be done without task changes for Apache Doris to connect Pulsar data to Apache Doris and get the Routine Load's transactional guarantees

Practical operation

Pulsar installation environment preparation:

  • Download the Pulsar binary package and unzip:
#Download
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#Unzip and enter the installation directory
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
Enter fullscreen mode Exit fullscreen mode

KoP Compilation and Installation:

  • Download KoP Source Code
git clone https://github.com/streamnative/kop.git
cd kop
Enter fullscreen mode Exit fullscreen mode
  • Compiling KoP:
mvn clean install -DskipTests
Enter fullscreen mode Exit fullscreen mode
  • protocols configuration: Create the protocols folder in the unpacked apache-pulsar directory and copy the compiled nar package to the protocols folder.
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
Enter fullscreen mode Exit fullscreen mode
  • View the results after adding:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
Enter fullscreen mode Exit fullscreen mode

Add KoP configuration:

  • Add the following configuration to standalone.conf or broker.conf
#Protocols to which KoP is adapted
messagingProtocols=kafka
#KoP's NAR file path
protocolHandlerDirectory=./protocols
#Whether to allow automatic topic creation
allowAutoTopicCreationType=partitioned
Enter fullscreen mode Exit fullscreen mode
  • Add the following service listener configuration
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
Enter fullscreen mode Exit fullscreen mode

When the following error occurs:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
Enter fullscreen mode Exit fullscreen mode

Add the following configuration to enable transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
Enter fullscreen mode Exit fullscreen mode

This error must be fixed, otherwise you will see that data is produced and consumed on Pulsar using the tools that come with kafka: bin/kafka-console-producer.sh and bin/kafka-console-consumer.sh works fine, but in Apache Doris the data cannot be synchronized over.

Launch Pulsar
#bin/pulsar standalone
pulsar-daemon start standalone
Enter fullscreen mode Exit fullscreen mode

Create Doris database and build tables

mysql -u root  -h 127.0.0.1 -P 9030
create database pulsar_doris;
#Switching databases
use pulsar_doris;
#Create clicklog table
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
    `clickTime` DATETIME NOT NULL COMMENT "clickTime",
    `type` String NOT NULL COMMENT "clickType",
    `id`  VARCHAR(100) COMMENT "id",
    `user` VARCHAR(100) COMMENT "user",
    `city` VARCHAR(50) COMMENT "city"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Enter fullscreen mode Exit fullscreen mode

Creating Routine Load Tasks

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "127.0.0.1:9092",
    "kafka_topic" = "test",
    "property.group.id" = "doris"
 );
Enter fullscreen mode Exit fullscreen mode

The parameters in the above command are explained as follows:

  • pulsar_doris: The database where the Routine Load task is located

  • load_from_pulsar_test: Routine Load task name

  • clicklog:The target table for the Routine Load task

  • strict_mode: Whether the import is in strict mode, set to false here

  • format: The type of data to import, here configured as json

  • kafka_broker_list: Address of the kafka broker service

  • kafka_broker_list: kafka topic name, i.e. which topic to sync data on

  • property.group.id: Consumer group id

Data import and testing

  • Data Import

Construct a ClickLog data structure and call Kafka's Producer to send 50 million pieces of data to Pulsar.

The ClickLog data structure is as follows

public class ClickLog {
    private String id;
    private String user;
    private String city;
    private String clickTime;
    private String type;
    ... //Omit getter and setter
   }
Enter fullscreen mode Exit fullscreen mode

The core code logic for message construction and delivery is as follows.

       String strDateFormat = "yyyy-MM-dd HH:mm:ss";
       @Autowired
       private Producer producer;
        try {
            for(int j =0 ; j<50000;j++){
              int batchSize = 1000;
                for(int i = 0 ; i<batchSize ;i++){
                    ClickLog clickLog  = new ClickLog();
                    clickLog.setId(UUID.randomUUID().toString());
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
                    clickLog.setClickTime(simpleDateFormat.format(new Date()));
                    clickLog.setType("webset");
                    clickLog.setUser("user"+ new Random().nextInt(1000) +i);
                    producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
Enter fullscreen mode Exit fullscreen mode
  • ROUTINE LOAD Task View

Execute SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G; command to view the status of the import task.

mysql>  SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
                  Id: 87873
                Name: load_from_pulsar_test
          CreateTime: 2022-05-31 12:03:34
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:pulsar_doris
           TableName: clicklog1
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
    CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
           Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
            Progress: {"0":"51139566"}
                 Lag: {"0":0}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
1 row in set (0.00 sec)
ERROR: 
No query specified
Enter fullscreen mode Exit fullscreen mode

From the above results, we can see that totalRows is 50000000 and errorRows is 0. It means that the data is imported into Apache Doris without any loss or redundancy.

  • Data Validation Execute the following command to count the data in the table and find that the result is also 50000000, as expected.
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql> 
Enter fullscreen mode Exit fullscreen mode

Conclusion

With KoP, we have been able to seamlessly integrate Apache Pulsar data into Apache Doris without any modifications to the Routine Load task and guarantee transactional nature of the data import process. In the meantime, the Apache Doris community has initiated the design of native import support for Apache Pulsar, and it is believed that it will soon be possible to directly subscribe to message data in Pulsar and guarantee Exactly-Once semantics during the data import process.

Links

Apache Doris website: http://doris.apache.org

Apache Doris GitHub:https://github.com/apache/doris

Please contact us via: dev@doris.apache.org

Top comments (0)