DEV Community

Kevin Wallimann
Kevin Wallimann

Posted on • Updated on

Upgrading ABRiS from version 3 to version 4

With release v4.0.1, a new fluent API was introduced to ABRiS to reduce configuration errors and provide more type safety. While this change is a huge improvement going forward, it causes a breaking change for users migrating from version 3. This article walks you through an upgrade of some common use-cases of ABRiS.

More information can be found on the Github Page. More usage examples can be found on the documentation pages. Documentation for version 3 can be found under branch 3.2.

Reading records

A common use-case is to read data from a topic with both key and value schema. In ABRiS 3, this could be done like this:

val keyConfig = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
  SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.name",
  SchemaManager.PARAM_KEY_SCHEMA_ID -> "latest"
)

val valueConfig = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
  SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
  SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest",
  SchemaManager.PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "record.name",
  SchemaManager.PARAM_VALUE_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> "record.namespace"
)

import za.co.absa.abris.avro.functions.from_confluent_avro

val result: DataFrame  = dataFrame.select(
    from_confluent_avro(col("key"), keyConfig) as 'key,
    from_confluent_avro(col("value"), valueConfig) as 'value)
Enter fullscreen mode Exit fullscreen mode

With ABRiS 4, it looks like this:

val keyConfig: FromAvroConfig = AbrisConfig
    .fromConfluentAvro
    .downloadReaderSchemaByLatestVersion
    .andTopicNameStrategy("topicName", isKey=true)
    .usingSchemaRegistry("http://localhost:8081")

val valueConfig: FromAvroConfig = AbrisConfig
    .fromConfluentAvro
    .downloadReaderSchemaByLatestVersion
    .andTopicRecordNameStrategy("topicName", "record.name", "record.namespace")
    .usingSchemaRegistry("http://localhost:8081")

import za.co.absa.abris.avro.functions.from_avro
val result: DataFrame = dataFrame.select(
   from_avro(col("key"), keyConfig) as 'key,
   from_avro(col("value"), valueConfig) as 'value)
Enter fullscreen mode Exit fullscreen mode

First and foremost, a new object was introduced, AbrisConfig. This is the entry point for the new fluent API.

Second, the method from_confluent_avro was removed and should be replaced with from_avro. To use the confluent format, specify .fromConfluentAvro on AbrisConfig. If you've been using simple vanilla avro, choose .fromSimpleAvro instead.

Third, notice the second parameter of .andTopicNameStrategy. The default value of isKey is false, which is ok for value schemas. However, in the case of key schemas, isKey must be set to true.

Writing records

Using an existing schema

In ABRiS 3, writing records providing an existing schema id could be done like this:

def writeAvro(dataFrame: DataFrame): DataFrame = {
  val config = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "42"
  )

  import za.co.absa.abris.avro.functions.to_confluent_avro
  val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
  dataFrame.select(to_confluent_avro(allColumns, config) as 'value)
}
Enter fullscreen mode Exit fullscreen mode

In ABRiS 4, it's like this:

def writeAvro(dataFrame: DataFrame): DataFrame = {
  val config: ToAvroConfig = AbrisConfig
    .toConfluentAvro
    .downloadSchemaById(42)
    .usingSchemaRegistry("http://localhost:8081")

  import za.co.absa.abris.avro.functions.to_avro

  val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
  dataFrame.select(to_avro(allColumns, config) as 'value)
}
Enter fullscreen mode Exit fullscreen mode

Here again, the method to_confluent_avro was removed and you have to use .toConfluentAvro from AbrisConfig.

Generating the schema from the records

In ABRiS 3, it was incredibly easy (too easy!) to simply have ABRiS generate the schema for you from the records if you didn't provide the schema, like this:

def writeAvro(dataFrame: DataFrame): DataFrame = {

  val config = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "example_topic",
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.record.name"
  )

  val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
  dataFrame.select(to_confluent_avro(allColumns, config) as 'value)
}
Enter fullscreen mode Exit fullscreen mode

Generating and registering the schema had to be done during the evaluation of the Spark expression, which was inefficient. Therefore this functionality was removed in v4 and now the schema needs to be registered before the evaluation phase and passed to the ABRiS config.

import org.apache.spark.sql.avro.SchemaConverters.toAvroType
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.avro.registry.SchemaSubject

def writeAvro(dataFrame: DataFrame): DataFrame = {
  // generate schema
  val allColumns = struct(dataFrame.columns.map(c => dataFrame(c)): _*)
  val expression = allColumns.expr
  val schema = toAvroType(expression.dataType, expression.nullable)

  // register schema
  val schemaRegistryClientConfig = 
Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081")
  val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)
  val subject = SchemaSubject.usingTopicNameStrategy("topic", isKey=false)
  val schemaId = schemaManager.register(subject, schema)

  // create config
  val config = AbrisConfig
    .toConfluentAvro
    .downloadSchemaById(schemaId)
    .usingSchemaRegistry("http://localhost:8081")

  val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
  dataFrame.select(to_avro(allColumns, config) as 'value)
}
Enter fullscreen mode Exit fullscreen mode

Notice that we used the topic name strategy in this example. SchemaSubject offers methods for the record name strategy (.usingRecordNameStrategy) and topic record name strategy as well (.usingTopicRecordNameStrategy)

Top comments (1)

Collapse
 
hatharom profile image
Zoli • Edited

Thanks! Greate guide !
I struggled a bit with the absence of the auto-generation support.