Introduction
For anyone looking to do big data analytics at scale, Apache Spark is your best bet. Spark's rich Dataframes API allows intuitive transformations on structured data and helps engineers build fast and optimized data pipelines.
However, developing large data applications involves numerous teams and personas with a diverse set of skills, and not everyone is comfortable writing Spark. SQL seems to be the common denominator in most data teams and this is where Apache Hive shines: Petabyte scale analytics using HiveQL (a flavor of SQL). Most transformations that are expressed using dataframes APIs on Spark can be written in SQL, as for the remaining, more complex queries, there's always User Defined Functions (UDFs).
User Defined Functions allow end users to write custom business logic that can be applied to each record of a column. UDFs are useful when the equivalent functionality requires multiple complex SQL queries, where a simple Scala UDF can do the same thing in a few lines of code. This article demonstrates how to use the Hive UDF and GenericUDF abstract classes to build user defined functions in Scala (as there are plenty of articles on building Java UDFs).
Simple UDF
Hive defines two approaches to writing UDFs: Simple and Generic. Simple UDFs can be built by extending the UDF abstract class and implementing the evaluate() function:
package HiveUDFs
import org.apache.hadoop.hive.ql.exec.UDF
class Increment extends UDF {
def evaluate(num: Int): Int = num + 1
}
Now, simply create a JAR using sbt:
$ sbt package
Add the JAR to Hive classpath, create the temporary function increment, and use it with a Select statement:
>ADD JAR hdfs:///{path}/{to}/{jar}.jar
>CREATE TEMPORARY FUNCTION increment AS 'HiveUDFs.Increment';
>SELECT increment(1);
Now obviously the above UDF is an extremely simple example that can easily be written in a simple SQL query. A practical UDF would be something more complex such as a proprietary algorithm (better for maintainability and readability to write the algo in your org's preferred programming language). That being said, Simple UDFs can be bad for performance because every call to the evaluate function performs Reflection which carries an overhead. Furthermore, while you can overload the evaluate function to accept/return a number of different primitive types, it gets complicated if your Hive table column is an Array, Map or Struct type. Here are the Hive column types and their equivalent Java types for UDFs:
Hive | Java |
---|---|
string | java.lang.String, org.apache.hadoop.io.Text |
int | int, java.lang.Integer, org.apache.hadoop.io.IntWritable |
boolean | bool, java.lang.Boolean, org.apache.hadoop.io.BooleanWritable |
array | java.util.List |
map | java.util.Map |
While simple UDFs do support arrays with List/ArrayList in Java, writing a UDF to work with arrays in Scala does not always yield expected results. This is where Generic UDFs are more useful. Generic UDFs are the only approach when dealing with a nested array or struct type in a Hive column, or when you want to work with a dynamic number of columns in a UDF.
Generic UDFs
The second way to write a UDF is with the GenericUDF abstract class. Generic UDFs are faster than simple UDFs because there's no reflective call, the arguments are parsed lazily, and Hive passes arguments as generic Object types so there's no need to instantiate and deserialize an Object when it's not needed. Generic UDFs can also deal with complex types such as structs and nested arrays, and can accept a variable number of parameters. The downside is that writing a GenericUDF is a bit more complicated as it defines three functions rather than one, and there is very little documentation to understand the purpose of these functions, especially in Scala. Let's consider writing a Generic UDF that returns the length of an array of integers. The three functions we need to implement are:
initialize:
Since Hive passes all parameters as Object types, in order to interact with the Object (get it's value and type, and write an output) we need to use an ObjectInspector. When Hive analyzes a query with a UDF, it computes the parameter types and passes the appropriate ObjectInspector type for each of the parameters into the initialize function and calls the function (In our case it will be one listInputObjectInspector). We can use this function for type checking and validation of our input. Finally, Hive expects the output to be an ObjectInpector of the return type (in our case this is a javaIntObjectInspector). We also want to store the ObjectInpector from the arguments as a class property because we will need it in the evaluate function to interact with the Deferred Objects.evaluate:
This function contains the core business logic that is applied to every record of the input column(s). The difference is that it accepts an array of Deferred Objects (the input parameters to the UDF) and we need to call the get() function on each DeferredObject to get the Object. We then need to use the stored ObjectInspector to retrieve the value(s) of the actual UDF parameter(s). Finally we can write our algorithm and return the result.getDisplayString:
Hive calls this method whenever there is an error running the UDF and so it's used to diaply troubleshooting information.
Here's how we write our ListLength GenericUDF that takes one input parameter of type array and returns an int:
package HiveUDFs
import org.apache.hadoop.hive.ql.exec.UDFArgumentException
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ListObjectInspector, ObjectInspector}
import scala.collection.JavaConverters._
class ListLength extends GenericUDF {
var listInputObjectInspector: ListObjectInspector = null;
@throws(classOf[UDFArgumentException])
override def initialize(arguments: Array[ObjectInspector]): ObjectInspector = {
assert(arguments.length == 1)
assert(arguments(0).getCategory() == ObjectInspector.Category.LIST)
this.listInputObjectInspector = arguments(0).asInstanceOf[ListObjectInspector]
PrimitiveObjectInspectorFactory.javaIntObjectInspector
}
override def evaluate(arguments: Array[GenericUDF.DeferredObject]): Integer = {
if (arguments.length != 1) return null
val in: Object = arguments(0).get()
if (in == null) return null
val list: java.util.List[_] = this.listInputObjectInspector.getList(in)
val sList = list.asScala.toList
sList.length
}
override def getDisplayString(children: Array[String]): String = "Getting size of array"
While genericUDFs can be complicated for trivial functions such as this, they can be powerful tools to apply custom logic to the millions of rows of a complex column on Hive, enabling a more diverse group of data users to reuse abstracted business logic on all sorts of structured data.
Top comments (1)
Great article! Thank you!