In the previous article, I wrote an introduction to using Parquet files in Java, but I did not include any examples. In this article, I will explain how to do this using the Avro library.
Parquet with Avro is one of the most popular ways to work with Parquet files in Java due to its simplicity, flexibility, and because it is the library with the most examples.
Both Avro and Parquet allow complex data structures, and there is a mapping between the types of one and the other.
The post will use the same example I used in previous articles talking about serialization. The code will be very similar to the article about Avro. For specific details about Avro, I refer you to that article.
In the example, we will work with a collection of Organization objects (Org
), which have also a list of Attributes (Attr
):
record Org(String name, String category, String country, Type type, List<Attr> attributes) {
}
record Attr(String id, byte quantity, byte amount, boolean active, double percent, short size) {
}
enum Type {
FOO, BAR, BAZ
}
Similar to saving files in Avro format, this version of Parquet with Avro allows writing files using classes generated from the IDL or the GenericRecord
data structure. This capability is specific to Avro, not Parquet, but is inherited by parquet-avro
, the library that implements this integration.
Internally, the library transforms the Avro schema into the Parquet schema, so most tools and libraries that know how to work with Avro classes will be able to work indirectly with Parquet with few changes.
Using code generation
The only difference when compared to serializing in Avro format lies in the class used for writing or reading files; otherwise, the logic for building the Avro-generated classes and reading their data remains unchanged.
Serialization
We will need to instantiate a Parquet writer that supports the writing of objects created by Avro:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
.withSchema(new Organization().getSchema())
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build();
Parquet defines a class called ParquetWriter<T>
and the parquet-avro
library extends it implementing in AvroParquetWriter<T>
the logic of converting Avro objects into calls to the Parquet API. The object we will serialize is Organization
, which has been generated using the Avro utility and implements the Avro API.
The Path
class is not the one existing in java.nio.file
, but a Hadoop-specific abstraction for referencing file paths. Whereas the OutputFile
class is Parquet's file abstraction with the capability to write to them.
Therefore:
-
Path
,OutputFile
,HadoopOutputFile
, andParquetWriter
are classes defined by the Parquet API. -
AvroParquetWriter
is a class defined by theparquet-avro
API, a library that encapsulates Parquet with Avro. -
Organization
andAttribute
are classes generated by the Avro utility, not related to Parquet.
The way to create an instance of ParquetWriter
is through a Builder, where you can configure many Parquet-specific parameters or those of the library we are using (Avro). For example:
-
withSchema
: schema of the Organization class in Avro, which internally converts to a Parquet schema. -
withCompressionCodec
: compression method to use: SNAPPY, GZIP, LZ4, etc. By default, it doesn't configure any. -
withWriteMode
: by default it is CREATE, so if the file already existed, it would not overwrite it and would throw an exception. To avoid this, use OVERWRITE. -
withValidation
: if we want it to validate the data types passed against the defined schema. -
withBloomFilterEnabled
: if we want to enable the creation of bloom filters.
A most generic configuration of both libraries (not defined in the API) can be passed with the config(String property, String value)
method. In this case, we configure it to internally use a 3-level structure to represent nested lists.
Once the ParquetWriter
class is instantiated, the greatest complexity lies in transforming your POJOs into the Organization
classes generated from Avro's IDL. The complete code would be as follows:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
.withSchema(new Organization().getSchema())
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build()) {
for (var org : organizations) {
List<Attribute> attrs = org.attributes().stream()
.map(a -> Attribute.newBuilder()
.setId(a.id())
.setQuantity(a.quantity())
.setAmount(a.amount())
.setSize(a.size())
.setPercent(a.percent())
.setActive(a.active())
.build())
.toList();
Organization organization = Organization.newBuilder()
.setName(org.name())
.setCategory(org.category())
.setCountry(org.country())
.setOrganizationType(OrganizationType.valueOf(org.type().name()))
.setAttributes(attrs)
.build();
writer.write(organization);
}
}
Instead of converting the entire collection of organizations and then writing it, we can convert and persist each Organization
one by one.
You can find the code on GitHub.
Deserialization
Deserialization is very straightforward if we agree to work with the classes generated by Avro.
To read the file, we will need to instantiate a Parquet reader:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build();
Parquet defines a class called ParquetReader<T>
and the parquet-avro
library extends it by implementing in AvroParquetReader
the logic of converting Parquet's internal data structures into the classes generated by Avro.
In this case, InputFile
is Parquet's file abstraction with the capability to read from them.
Therefore:
-
Path
,InputFile
,HadoopInputFile
, andParquetReader
are classes defined by the Parquet API. -
AvroParquetReader
implementsParquetReader
and is defined inparquet-avro
, a library that encapsulates Parquet with Avro. -
Organization
(andAttribute
) are classes generated by the Avro utility, not related to Parquet.
The instantiation of the ParquetReader
class is also done with a Builder, although the options to configure are much fewer, as all its configuration is determined by the file we are going to read. The reader does not need to know if the file uses dictionary encoding or if it is compressed, so it is not necessary to configure it; it discovers this by reading the file.
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build()) {
List<Organization> organizations = new ArrayList<>();
Organization next = null;
while ((next = reader.read()) != null) {
organizations.add(next);
}
return organizations;
}
If the IDL used to generate the code contains a subset of the attributes persisted in the file, when reading it we would be ignoring all the columns not present in the IDL. This would save us from disk reads and the deserialization/decoding of data.
You can find the code on GitHub.
Using GenericRecord
Here, it will not be necessary to generate any code, and we will work with the GenericRecord
class provided by Avro, but the code will be a bit more verbose.
Serialization
As we do not have generated files containing the embedded schema, we need to programmatically define the Avro schema we are going to use. The code is the same as in the article about Avro:
Schema attrSchema = SchemaBuilder.record("Attribute")
.fields()
.requiredString("id")
.requiredInt("quantity")
.requiredInt("amount")
.requiredInt("size")
.requiredDouble("percent")
.requiredBoolean("active")
.endRecord();
var enumSymbols = Stream.of(Type.values()).map(Type::name).toArray(String[]::new);
Schema orgsSchema = SchemaBuilder.record("Organizations")
.fields()
.requiredString("name")
.requiredString("category")
.requiredString("country")
.name("organizationType").type().enumeration("organizationType")
.symbols(enumSymbols).noDefault()
.name("attributes").type().array().items(attrSchema).noDefault()
.endRecord();
var typeField = orgsSchema.getField("organizationType").schema();
EnumMap<Type, EnumSymbol> enums = new EnumMap<>(Type.class);
enums.put(Type.BAR, new EnumSymbol(typeField, Type.BAR));
enums.put(Type.BAZ, new EnumSymbol(typeField, Type.BAZ));
enums.put(Type.FOO, new EnumSymbol(typeField, Type.FOO));
Instead of using an AvroParquetWriter
of the type Organization
, we create one of the type GenericRecord
and construct instances of it as if it were a Map
:
Path path = new Path(filePath);
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile)
.withSchema(orgsSchema)
.withWriteMode(Mode.OVERWRITE)
.config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
.build()) {
for (var org : organizations) {
List<GenericRecord> attrs = new ArrayList<>();
for (var attr : org.attributes()) {
GenericRecord attrRecord = new GenericData.Record(attrSchema);
attrRecord.put("id", attr.id());
attrRecord.put("quantity", attr.quantity());
attrRecord.put("amount", attr.amount());
attrRecord.put("size", attr.size());
attrRecord.put("percent", attr.percent());
attrRecord.put("active", attr.active());
attrs.add(attrRecord);
}
GenericRecord orgRecord = new GenericData.Record(orgsSchema);
orgRecord.put("name", org.name());
orgRecord.put("category", org.category());
orgRecord.put("country", org.country());
orgRecord.put("organizationType", enums.get(org.type()));
orgRecord.put("attributes", attrs);
writer.write(orgRecord);
}
}
You can find the code on GitHub.
Deserialization
As in the original version of Avro, most of the work consists in converting the GenericRecord
into our data structure. Because it behaves like a Map
, we will have to cast the types of the values:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
List<Org> organizations = new ArrayList<>();
GenericRecord record = null;
while ((record = reader.read()) != null) {
List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get("id").toString(),
((Integer) attr.get("quantity")).byteValue(),
((Integer) attr.get("amount")).byteValue(),
(boolean) attr.get("active"),
(double) attr.get("percent"),
((Integer) attr.get("size")).shortValue())).toList();
Utf8 name = (Utf8) record.get("name");
Utf8 category = (Utf8) record.get("category");
Utf8 country = (Utf8) record.get("country");
Type type = Type.valueOf(record.get("organizationType").toString());
organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
}
return organizations;
}
As we are using the Avro interface, it maintains its logic that Strings are encoded within the Utf8
class and it will be necessary to extract their values.
You can find the code on GitHub.
By default, when reading the file, it deserializes all fields of each row because it does not know the schema of what you need to read, and processes everything. If you want a projection of the fields, you must pass it in the form of an Avro schema when creating the ParquetReader
:
Schema projection = SchemaBuilder.record("Organizations")
.fields()
.requiredString("name")
.requiredString("category")
.requiredString("country")
.endRecord();
Configuration configuration = new Configuration();
configuration.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, orgsSchema.toString());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
.withConf(configuration)
.build()) {
....
The rest of the process would be the same, but with fewer fields. You can see the entire source code of the example here.
Performance
What performance does Parquet Avro offer when serializing and deserializing a large volume of data? To what extent do the different compression options influence? Do we choose compression with Snappy or no compression at all? And what about activating the dictionary or not?
Taking advantage of the analyses I did previously on different serialization formats, we can get an idea of their strengths and weaknesses. The benchmarks were done on the same computer, so they are comparable to give us an idea.
File Size
Both using code generation and GenericRecord, the result is the same, as they are different ways of defining the same schema and persisting the same data:
Uncompressed | Snappy | |
---|---|---|
Dictionay False | 1,034 MB | 508 MB |
Dictionay True | 289 MB | 281 MB |
Given the difference in sizes, we can see that in my synthetic example, the use of dictionaries compresses the information significantly, even more than the Snappy algorithm itself. The decision to activate compression or not will depend on the performance penalty it entails.
Serialization Time
Using code generation:
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 14,386 ms | 14,920 ms |
Dictionary True | 15,110 ms | 15,381 ms |
Using GenericRecord:
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 15,287 ms | 15,809 ms |
Dictionary True | 16,119 ms | 16,432 ms |
The time is very similar in all cases, and we can say that the different compression techniques do not significantly affect the time spent.
There are no notable time differences between generated code and the use of GenericRecord
, so performance should not be a determining factor in choosing a solution.
Compared to other serialization formats, it takes between 40% (Jackson) and 300% (Protocol Buffers/Avro) more time, but in return achieves files that are between 70% (Protocol Buffers/Avro) and 90% (Jackson) smaller.
Deserialization Time
Using code generation:
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 10,722 ms | 10,736 ms |
Dictionary True | 7,707 ms | 7,665 ms |
Using GenericRecord:
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 12,089 ms | 11,931 ms |
Dictionary True | 8,374 ms | 8,451 ms |
In this case, the use of the dictionary has a significant impact on time, as it saves decoding information that is repeated. There is definitely no reason to disable this functionality.
If we compare with other formats, it is twice as slow as Protocol Buffers and on par with Avro, but more than twice as fast as Jackson.
To put the performance into perspective, on my laptop, it reads 50,000 Organization
s per second, which in turn contain almost 3 million instances of type Attribute
, per second!
Deserialization Time Using a Projection
What is the performance like if we use a projection and we only read three fields of the Organization object and ignore its collection of attributes?
Uncompressed | Snappy | |
---|---|---|
Dictionay False | 289 ms | 304 ms |
Dictionay True | 195 ms | 203 ms |
We confirm the promise that if we access a subset of columns, we will read and decode much less information. In this case, it takes only 2.5% of the time, or in other words, it is 40 times faster at processing the same file.
This is where Parquet shows its full power, by allowing to read and decode a subset of data quickly, taking advantage of how the data is distributed in the file.
Conclusion
If you are already using Avro or are familiar with it, most of the code and nuances related to Avro will be familiar to you. If you are not, it increases the entry barrier, as you have to learn about two different technologies, and it may not be clear what corresponds to each.
The major change compared to using only Avro is how the writer and reader objects are created, where we will have to deal with all the configuration and particularities specific to Parquet.
If I had to choose between using only Avro or Parquet with Avro, I would choose the latter, as it produces more compact files and we have the opportunity to take advantage of the columnar format.
The data I have used in the example are synthetic, and the results may vary depending on the characteristics of your data. I recommend doing tests, but unless all your values are very random, the compression rates will be high.
In environments where you write once and read multiple times, the time spent serializing should not be decisive. More important, for example, are the consumption of your storage, the file transfer time, or the processing speed (especially if you can filter the columns you access).
Despite using different compression and encoding techniques, the file processing time is quite fast. Along with its ability to work with a typed schema, this makes it a data interchange format to be considered in projects with a heavy load of data.
Top comments (0)