Table of Content
Intro
I want to introduce a library to you called spark-hats, full name Spark Helpers for Array Transformation*s*, but do not let the name fool you. It works with structs as well. This library saves me a lot of time and energy when developing new spark applications that have to work with nested structures. Hope it will help you too.
The core of the library are methods add a column, map a column, drop a column. All of these engineered so you can turn this:
val dfOut = df.select(col("id"), transform(col("my_array"), c => {
struct(c.getField("a").as("a"),
c.getField("b").as("b"),
(c.getField("a") + 1).as("c"))
}).as("my_array"))
into this:
val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)
Let's get started with imports and the structure that will be used for examples.
I will use spark-shell with the package using this command in the shell:
$> spark-shell --packages za.co.absa:spark-hats_2.11:0.2.1
and then in the spark-shell:
scala> import za.co.absa.spark.hats.Extensions._
import za.co.absa.spark.hats.Extensions._
scala> df.printSchema()
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
scala> df.show(false)
+---+------------------------------+
|id |my_array |
+---+------------------------------+
|1 |[[1, foo]] |
|2 |[[1, bar], [2, baz], [3, foz]]|
+---+------------------------------+
Now let's move to the methods.
Add Column
Add column comes in two variants. Simple and extended. Simple allows adding of a new field in nested structures. Extend does the same while allowing you to reference other elements.
The simple one is pretty straight forward. You get your DataFrame, and instead of calling withColumn
, you call nestedWithColumn
. Let's add a literal to a struct.
scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = false)
scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
+---+---------------------------------------------------+
|id |my_array |
+---+---------------------------------------------------+
|1 |[[1, foo, hello]] |
|2 |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
+---+---------------------------------------------------+
The extended version can then use other elements of the array. The API also differs. Here the method nestedWithColumnExtended
expects a function returning a column as a second parameter. Moreover, this function has an argument which is a function itself, the getField() function. The getField() function can be used in the transformation to reference other columns in the DataFrame by their fully qualified name.
scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
concat(col("id").cast("string"), getField("my_array.b"))
)
scala> dfOut.printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
scala> dfOut.show(false)
+---+------------------------------------------------+
|id |my_array |
+---+------------------------------------------------+
|1 |[[1, foo, 1foo]] |
|2 |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
+---+------------------------------------------------+
Notice that for root-level columns it is enough to use col
, but getField
would still be fine.
Drop Column
By the second method, you might have already caught to the naming convention. This method is called nestedDropColumn
and is the most straight forward of the three. Just provide a fully qualified name.
scala> df.nestedDropColumn("my_array.b").printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
scala> df.nestedDropColumn("my_array.b").show(false)
+---+---------------+
|id |my_array |
+---+---------------+
|1 |[[1]] |
|2 |[[1], [2], [3]]|
+---+---------------+
Map column
Map column is probably the one with the most use-cases. The map will apply a function on each element of your struct and puts an output on the same level by default, or somewhere else if specified.
If the input column is a primitive, then a simple function will suffice. If it is a struct, then you will have to use getField
again.
scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: long (nullable = true)
scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
+---+---------------------------------------+
|id |my_array |
+---+---------------------------------------+
|1 |[[1, foo, 2]] |
|2 |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+
Afterword
I hope these methods and the library will help you as much as they helped me. They make working with structures a lot easier and keep my code more concise, which in my head means less error-prone.
For more info go to https://github.com/AbsaOSS/spark-hats
Good luck and happy coding!
Top comments (1)
this is gold!