loading...

Working with nested structures in Spark

zejnilovic profile image Saša Zejnilović ・3 min read

Table of Content

Intro

I want to introduce a library to you called spark-hats, full name Spark Helpers for Array Transformation*s*, but do not let the name fool you. It works with structs as well. This library saves me a lot of time and energy when developing new spark applications that have to work with nested structures. Hope it will help you too.

The core of the library are methods add a column, map a column, drop a column. All of these engineered so you can turn this:

val dfOut = df.select(col("id"), transform(col("my_array"), c => {
  struct(c.getField("a").as("a"),
  c.getField("b").as("b"),
  (c.getField("a") + 1).as("c"))
}).as("my_array"))

into this:

val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)

Let's get started with imports and the structure that will be used for examples.

I will use spark-shell with the package using this command in the shell:

$> spark-shell --packages za.co.absa:spark-hats_2.11:0.2.1

and then in the spark-shell:

scala> import za.co.absa.spark.hats.Extensions._
import za.co.absa.spark.hats.Extensions._

scala> df.printSchema()
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)

scala> df.show(false)
+---+------------------------------+
|id |my_array                      |
+---+------------------------------+
|1  |[[1, foo]]                    |
|2  |[[1, bar], [2, baz], [3, foz]]|
+---+------------------------------+

Now let's move to the methods.

Add Column

Add column comes in two variants. Simple and extended. Simple allows adding of a new field in nested structures. Extend does the same while allowing you to reference other elements.

The simple one is pretty straight forward. You get your DataFrame, and instead of calling withColumn, you call nestedWithColumn. Let's add a literal to a struct.

scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = false)

scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
+---+---------------------------------------------------+
|id |my_array                                           |
+---+---------------------------------------------------+
|1  |[[1, foo, hello]]                                  |
|2  |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
+---+---------------------------------------------------+

The extended version can then use other elements of the array. The API also differs. Here the method nestedWithColumnExtended expects a function returning a column as a second parameter. Moreover, this function has an argument which is a function itself, the getField() function. The getField() function can be used in the transformation to reference other columns in the DataFrame by their fully qualified name.

scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
         concat(col("id").cast("string"), getField("my_array.b"))
       )

scala> dfOut.printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)

scala> dfOut.show(false)
+---+------------------------------------------------+
|id |my_array                                        |
+---+------------------------------------------------+
|1  |[[1, foo, 1foo]]                                |
|2  |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
+---+------------------------------------------------+

Notice that for root-level columns it is enough to use col, but getField would still be fine.

Drop Column

By the second method, you might have already caught to the naming convention. This method is called nestedDropColumn and is the most straight forward of the three. Just provide a fully qualified name.

scala> df.nestedDropColumn("my_array.b").printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)

scala> df.nestedDropColumn("my_array.b").show(false)
+---+---------------+
|id |my_array       |
+---+---------------+
|1  |[[1]]          |
|2  |[[1], [2], [3]]|
+---+---------------+

Map column

Map column is probably the one with the most use-cases. The map will apply a function on each element of your struct and puts an output on the same level by default, or somewhere else if specified.

If the input column is a primitive, then a simple function will suffice. If it is a struct, then you will have to use getField again.

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
+---+---------------------------------------+
|id |my_array                               |
+---+---------------------------------------+
|1  |[[1, foo, 2]]                          |
|2  |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+

Afterword

I hope these methods and the library will help you as much as they helped me. They make working with structures a lot easier and keep my code more concise, which in my head means less error-prone.

For more info go to https://github.com/AbsaOSS/spark-hats

Good luck and happy coding!

Discussion

pic
Editor guide
Collapse
unsmashedavo profile image
anish

this is gold!