Authored by Jacob Klegar
Rockset continuously ingests data streams from Kafka, without the need for a fixed schema, and serves fast SQL queries on that data. We created the Kafka Connect Plugin for Rockset to export data from Kafka and send it to a collection of documents in Rockset. Users can then build real-time dashboards or data APIs on top of the data in Rockset. This blog covers how we implemented the plugin.
Kafka Connect is the primary way to transmit data between Kafka and another data storage engine, e.g. S3, Elasticsearch, or a relational database through Kafka Connect JDBC, with very little setup required. It accomplishes this by supporting a number of plugins that move data into or out of Kafka to various other data engines - the former are called Kafka Connect Source plugins, the latter Sink plugins. Many data stacks include a set of Kafka brokers - used as a buffer log, event stream, or some other use case - and Kafka Connect plugins make it very easy to add a source or sink to your Kafka stream.
Confluent, the company commercializing Apache Kafka, lists the provenly reliable Kafka Connect plugins in Confluent Hub, and integrates those plugins into its Confluent Platform, a product that makes it easy to setup, maintain, and monitor Kafka brokers and their associated instances. At Rockset, we built our Kafka Connect Sink plugin to make it easy for customers with data in Kafka to do real-time analytics, and we listed it in Confluent Hub to help Confluent’s existing user base get actionable insights from their data in a fast and simple to set up manner.
Kafka Connect runs in a separate instance from your Kafka brokers, and each Kafka Connect plugin must implement a set of methods that Kafka Connect calls. For sink plugins, it will call the put method with a set of messages, and the main functionality of this method is typically to do some processing of the data and then send it to the input channel of the sink data storage engine. In our case, we have a Write API, so we transform the events to raw JSON and send them to our API endpoint. The core functionality of a Kafka Connect plugin is just that - the Kafka Connect platform takes care of the rest, including calling the method for every event in the topics the user lists and serializing or deserializing the data in the Kafka stream.
Any user setting up Kafka Connect has to modify at least two config files. The first is the general Kafka Connect config file - this is where you set the locations of your Kafka brokers, the path for the jar files of the plugins, the serializer and deserializer, and a couple other settings. The serializer and deserializer come with the Kafka Connect platform and, for a sink plugin, will deserialize the data in the Kafka stream before sending the event to plugins, which makes the data processing done in the plugin much simpler.
There is also a config file for each plugin connected to the Kafka Connect instance. Every plugin’s config file has to include settings for the name and class for identification purposes, maximum tasks for performance tuning, and a list of topics to follow. Rockset’s also includes the url of the Rockset API server, your Rockset API key, the workspace and collection in Rockset that the data should flow into, the format of the data, and threads for performance tuning. You can run multiple Rockset sink plugins on the Kafka Connect platform to send data from different topics in your Kafka stream to different Rockset collections.
Once the Rockset Kafka Connect sink plugin sends the raw JSON document to our Write API, the code path merges with that of normal REST API writes and the server-side architecture, which we’ve discussed in previous blog posts, quickly indexes the document and makes it available for querying.
There are a couple requirements above basic functionality that are necessary to list a Kafka Connect plugin in Confluent Hub. The next few sections show a rough guide for how you might list your own Kafka Connect plugin, and illustrate the design decisions in the Rockset indexing engine that made satisfying these requirements an easy process.
Kafka gives special preference to the data serialization format Avro, which approaches the problem of schema changes upstream affecting actions downstream by enforcing a set schema. This schema is kept in the Schema Registry, a separate instance. Any schema changes must be done purposefully and in a way that is backwards or forwards compatible, depending on the compatibility options set in the Schema Registry. Because of its set schema, Avro also benefits from serializing without field names, making the message serialization more efficient. Confluent has strong support for Avro serialization, including a Schema Registry that comes with the Confluent Platform, and Avro support was necessary to list our Kafka Connect plugin in Confluent Hub.
Supporting Avro is not too difficult, as the Kafka Connect platform already comes with an Avro serializer and deserializer that can be plugged into the Kafka Connect platform using the config file. After the serializer and deserializer do the hard work, transforming the message to JSON is relatively simple, and we were able to find examples in open source Kafka Connect plugins. Our own implementation is here. The hardest part was understanding how Avro works and just setting up the config files and Schema Registry correctly. Using the Confluent Platform helps a great deal here, but you still need to make sure you did not miss a config file change - if you try deserializing Avro data with a JSON deserializer, nothing will work.
Another requirement for the Kafka Connect plugin is to support exactly once semantics - that is, any message sent from the Kafka stream must appear exactly once in the destination Rockset collection. The difficulty here lies in handling errors in the network - if the Kafka Connect plugin does not hear a response from our Write API, it will resend the document, and we may end up with repeat documents in our pipeline. The way this is typically solved - and the way we solved it - is by using our unique identifier field _id to ensure any duplicates will just overwrite the original document with the same information. We map the message’s key to our _id field if it’s available, and otherwise use the uniquely identifying combination of topic+partition+offset as a default.
Confluent Hub also requires Kafka Connect sink plugins to respect the correct ordering of the data. This actually required no changes - every Rockset document has an _event_time field, and by specifying a field mapping for _event_time when the collection is created, any Rockset user can ensure the data is ordered according to their specifications.
Confluent also requires that a Kafka Connect plugin validates the config file set by the user, in order to catch user typos and other errors. The Rockset sink config file contains, among other things, the url of our API server to which Write API requests are sent and the format of the messages in the Kafka stream. If the user gives a value that is not among the available options for either of these, the Kafka Connect plugin will error out. We have found that properly setting up the config file is one of the hardest parts of our Kafka integration setup process, and more improvements in this process and config validation are in the works.
The other requirements for listing in Confluent Hub - versioning, packaging, logging, documentation, testing, and graceful error handling - fall under the umbrella term of general code quality and usability. Confluent requires a specific standardized packaging structure for all its listed Kafka Connect plugins that can be built using a straightforward maven plugin. The rest of these requirements ensure the code is correct, the setup process is clear, and any errors can be diagnosed quickly and easily.