Previously, I have introduced the evolutionary history of data infrastructure, and I have also introduced real-time data analysis in detail.
Both of these articles mention a key player, Debezium. In fact, Debezium has had a place in the modern infrastructure. Let's use a diagram to understand why.
This architecture diagram should be consistent with the data ingest path of most data infrastructures. In order to store and analyze data in a unified way, centralizing the data in a data warehouse is a general solution.
Debezium captures data changes from various source databases and then writes them to the data warehouse via Kafka and Kafka's consumers. In other words, Debezium is also a Kafka producer.
This architecture looks fine, once there are changes in the data will be captured by Debezium and eventually written to the data warehouse, but there is actually a difficult problem behind needs to be solved, that is, the database schema. In order to provide data analytics, the data warehouse is "schemaful", but how to make the schema of the data warehouse align with the schema of each source database?
The good news is Debezium supports schema registry, it can capture DDL changes from source databases and synchronize them to the schema registry so that Kafka consumers can get schema from the schema registry and synchronize the schema in the data warehouse.
Let's make the architecture mentioned above even more complete.
The most common format for describing schema in this scenario is Apache Avro.
Unfortunately, a schemaless database like MongoDB doesn't have DDL at all, so of course Debezium can't capture DDL, then how does Debezium upload the schema in this scenario? The answer is to guess, by getting the data to determine the data type, and write the type into the schema registry.
As you can imagine, this is very risky, and the data warehouse schema often faces compatibility challenges due to such guessing. Therefore, there needs to be a way to proactively upload the schema so that the Kafka consumer can correctly recognize the data type and correct the schema of the data warehouse.
Knowing it intellectually is one thing, but knowing it emotionally is quite another.
The story is this: our microservice is developed in Golang and uses mgm as the ORM for MongoDB.
In Golang, we use struct
as a Data Transfer Object (DTO), and in the mgm
scenario, these DTOs can be marshaled as JSON. So, what we need to do is to generate the corresponding Avro schema based on the definition of struct
.
https://github.com/wirelessr/avroschema
The use of this go module is quite simple.
If we have a struct
as follows.
type Entity struct {
AStrField string `json:"a_str_field"`
AIntField int `json:"a_int_field"`
ABoolField bool `json:"a_bool_field"`
AFloatField float32 `json:"a_float_field"`
ADoubleField float64 `json:"a_double_field"`
}
Then just import
the module to generate the corresponding Avro.
import "github.com/wirelessr/avroschema"
avroschema.Reflect(&Entity{})
The result will be an Avro JSON.
{
"name": "Entity",
"type": "record",
"fields": [
{"name": "a_str_field", "type": "string"},
{"name": "a_int_field", "type": "int"},
{"name": "a_bool_field", "type": "boolean"},
{"name": "a_float_field", "type": "float"},
{"name": "a_double_field", "type": "double"}
]
}
In addition, the module also supports various complex types and nested types as follows.
-
array
andslice
map
-
struct
instruct
time.Time
-
array
ofstruct
- etc.
Of course, the omitempty
tag used by JSON in Golang is also supported.
Fields marked with omitempty become Avro's union type, which is an optional field. Let's take a more complicated example.
type Entity struct {
UnionArrayField []int `json:"union_array_field,omitempty"`
}
If it is an optional int array, then it will become a union type with null when converted to Avro.
{
"name": "Entity",
"type": "record",
"fields": [
{"name": "union_array_field", "type": ["null", {
"type": "array", "items": "int"
}]}
]
}
More detailed tests can be found in the unit test file: reflect_test.go.
However, this is not enough for us, because we need to support various special types in mgm
, such as primitive.DateTime
or primitive.ObjectID
, and most especially mgm.DefaultModel
.
Therefore, it is necessary for this module to provide customization capabilities to extend support for more special types, and so a plugin is provided for this module to be used. To support mgm
, you can use the already written submodule.
The following is an example of a struct
that uses the mgm
type.
type Book struct {
mgm.DefaultModel `bson:",inline"`
Name string `json:"name" bson:"name"`
Pages int `json:"pages" bson:"pages"`
ObjId primitive.ObjectID `json:"obj_id" bson:"obj_id"`
ArrivedAt primitive.DateTime `json:"arrived_at" bson:"arrived_at"`
RefData bson.M `json:"ref_data" bson:"ref_data"`
Author []string `json:"author" bson:"author"`
}
To support these special types, you can use the built-in custom MgmExtension
. This works just like the original Reflect
, except it requires the use of a customized instance.
import (
"github.com/wirelessr/avroschema"
"github.com/wirelessr/avroschema/mongo"
)
reflector := new(avroschema.Reflector)
reflector.Mapper = MgmExtension
reflector.Reflect(&Book{})
There are a couple of important points to note here.
-
DefaultModel
is a type that directly generates three fields,_id
,created_at
andupdated_at
. - If
primitive.ObjectID
then it is treated as astring
, since the UUID mentioned in Avro specification is also treated as astring
. - If
primitive.DateTime
, then it is treated astime.Time
. - A special case is
bson.M
, which is treated as a JSON string because there is no way to be sure what is in it.
According to the above rules, the final Avro schema is as follows.
{
"name": "Book",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "created_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "updated_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "name", "type": "string" },
{ "name": "pages", "type": "int" },
{ "name": "obj_id", "type": "string" },
{ "name": "arrived_at", "type": "long", "logicalType": "timestamp-millis" },
{ "name": "ref_data", "type": "string" },
{ "name": "author", "type": "array", "items": "string" }
]
}
Conclusion
When implementing the requirement to convert Golang struct
to Avro schema I first tried to look for existing packages, but unfortunately I couldn't find any. However, there is a similar project that aims to convert Golang struct
to JSON schema, so I referred to his practice and made a version of Avro schema.
In fact, there are still some specifications in the Avro schema that I don't support, such as the enum
type, and there are also many optional parameters that I don't support because they don't correspond to struct
, such as namespace
, aliases
, etc.
But for now, this implementation satisfies my need for data ingest, and allows Kafka consumers to have the correct schema to work with. Maybe I'll continue to improve this project sometime in the future so that it can really support all kinds of Avro specifications. But I have to say, in terms of functionality, it's pretty useful right now.
The project has full unit testing and linter, and I'm sure the code structure isn't too complicated, so feel free to contribute.
Top comments (0)