DEV Community

milktea02
milktea02

Posted on • Edited on

Consuming and deserializing avro messages with Apache kafka-console-consumer and AWS Glue as the Schema Registry

I had to do this recently and found little to no documentation on what was required to consume avro logs from kafka and deserialize them with the Glue schema registry so I'll document my findings here. For reference, Confluent has one that just works out of the box.

Requirements/Pre-requisites/Assumptions/?:

  • Apache Kafka
    • I think I was using 3.3.1 with Scala 2.13
  • Kafka cluster with topics that have avro messages
    • We are using an AWS MSK cluster with kafka v2.6.x
    • This also assumes the messages are being serialized using the following headers
  • AWS Glue Registry with avro schema you're using
    • You can find documentation on how to set this up with some schemas
  • jars from https://repo1.maven.org/maven2/org/apache/
  • this also assumes you have your iam role permissions/policies configured

Caveat: This is what worked with our setup and the jars we needed. I don't have any examples or screenshots.

Download required jars into <location of kafka>/libs/:

List of jars:

Group Id Artifact Id Version
software.amazon.glue schema-registry-common 1.1.14
software.amazon.glue schema-registry-serde 1.1.14
software.amazon.awssdk glue 2.17.12
software.amazon.awssdk arns 2.17.12
software.amazon.awssdk url-connection-client 2.17.12
org.apache.avro avro 1.11.0
com.google.guava guava 30.0-jre
com.google.guava failureaccess 1.0.1

Configuring kafka-console-consumer.sh:

You'll need to supply some properties to the glue deserializer; I've used a wrapper script kafka-glue-avro-console-consumer.sh:

#!/bin/sh

kafka-console-consumer.sh --value-deserializer com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer \
        --property value.deserializer.region=<hardcoded-aws-region> \
        --property value.deserializer.dataFormat=AVRO \
        --property value.deserializer.avroRecordType=GENERIC_RECORD "$@"
Enter fullscreen mode Exit fullscreen mode

You'll need to supply other properties such as --bootstrap-server <list of servers:port>, --topic <topic-name>, --consumer.config <config-file>.

Example:

$ ./kafka-glue-avro-console-consumer.sh --bootstrap-server broker-01:9098 --topic my-topic --consumer.config my-config --max-message 1 --from-beginning
Enter fullscreen mode Exit fullscreen mode

If you want to specify the glue registry and schema you can do that with:

--property value.deserializer.registry.name=<registryName>
--property value.deserializer.schemaName=<schemaName>
Enter fullscreen mode Exit fullscreen mode

You can check the source code for additional properties you can use: https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.14/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java

Here is a snippet of ansible code to grab all the jars:

- name: install artifacts from maven to consume AVRO records
  maven_artifact:
    group_id: "{{ item.group_id }}"
    artifact_id: "{{ item.artifact_id }}"
    version: "{{ item.version }}"
    dest: /home/ubuntu/kafka_{{ kafka_client_version }}/libs/{{ item.artifact_id }}-{{ item.version }}.jar
    group: ubuntu
    owner: ubuntu
    mode: 0664
  loop:
    - { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-common', version: 1.1.14 }
    - { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-serde', version: 1.1.14 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'glue', version: 2.17.122 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'arns', version: 2.17.122 }
    - { group_id: 'software.amazon.awssdk', artifact_id: 'url-connection-client', version: 2.17.122 }
    - { group_id: 'org.apache.avro', artifact_id: 'avro', version: 1.11.0 }
    - { group_id: 'com.google.guava', artifact_id: 'guava', version: 30.0-jre }
    - { group_id: 'com.google.guava', artifact_id: 'failureaccess', version: 1.0.1 }
Enter fullscreen mode Exit fullscreen mode

(we are testing some things which is why everything is in the user's home directory!)

Top comments (1)

Collapse
 
aitor_mars_efd5412c813b15 profile image
Aitor Mars

Hi, I got below error. I tried to add software.amazon.awssdk.utils package to libs, but it didn't solve the issue.
Exception in thread "main" java.lang.NoClassDefFoundError: software/amazon/awssdk/utils/internal/EnumUtils
at software.amazon.awssdk.services.glue.model.Compatibility.(Compatibility.java:42)
at com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.(AWSSchemaRegistryConstants.java:114)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetCompatibility(GlueSchemaRegistryConfiguration.java:168)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:93)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74)
at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer.configure(GlueSchemaRegistryKafkaDeserializer.java:89)
at kafka.tools.DefaultMessageFormatter.getDeserializerProperty(ConsoleConsumer.scala:586)
at kafka.tools.DefaultMessageFormatter.$anonfun$configure$22(ConsoleConsumer.scala:492)
at kafka.tools.DefaultMessageFormatter.configure(ConsoleConsumer.scala:592)
at kafka.tools.ConsoleConsumer$ConsumerConfig.(ConsoleConsumer.scala:317)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.utils.internal.EnumUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more