DEV Community

Saša Zejnilović
Saša Zejnilović

Posted on

Working with nested structures in Spark

Table of Content

Intro

I want to introduce a library to you called spark-hats, full name Spark Helpers for Array Transformation*s*, but do not let the name fool you. It works with structs as well. This library saves me a lot of time and energy when developing new spark applications that have to work with nested structures. Hope it will help you too.

The core of the library are methods add a column, map a column, drop a column. All of these engineered so you can turn this:

val dfOut = df.select(col("id"), transform(col("my_array"), c => {
  struct(c.getField("a").as("a"),
  c.getField("b").as("b"),
  (c.getField("a") + 1).as("c"))
}).as("my_array"))
Enter fullscreen mode Exit fullscreen mode

into this:

val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)
Enter fullscreen mode Exit fullscreen mode

Let's get started with imports and the structure that will be used for examples.

I will use spark-shell with the package using this command in the shell:

$> spark-shell --packages za.co.absa:spark-hats_2.11:0.2.1
Enter fullscreen mode Exit fullscreen mode

and then in the spark-shell:

scala> import za.co.absa.spark.hats.Extensions._
import za.co.absa.spark.hats.Extensions._

scala> df.printSchema()
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)

scala> df.show(false)
+---+------------------------------+
|id |my_array                      |
+---+------------------------------+
|1  |[[1, foo]]                    |
|2  |[[1, bar], [2, baz], [3, foz]]|
+---+------------------------------+
Enter fullscreen mode Exit fullscreen mode

Now let's move to the methods.

Add Column

Add column comes in two variants. Simple and extended. Simple allows adding of a new field in nested structures. Extend does the same while allowing you to reference other elements.

The simple one is pretty straight forward. You get your DataFrame, and instead of calling withColumn, you call nestedWithColumn. Let's add a literal to a struct.

scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = false)

scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
+---+---------------------------------------------------+
|id |my_array                                           |
+---+---------------------------------------------------+
|1  |[[1, foo, hello]]                                  |
|2  |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
+---+---------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

The extended version can then use other elements of the array. The API also differs. Here the method nestedWithColumnExtended expects a function returning a column as a second parameter. Moreover, this function has an argument which is a function itself, the getField() function. The getField() function can be used in the transformation to reference other columns in the DataFrame by their fully qualified name.

scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
         concat(col("id").cast("string"), getField("my_array.b"))
       )

scala> dfOut.printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)

scala> dfOut.show(false)
+---+------------------------------------------------+
|id |my_array                                        |
+---+------------------------------------------------+
|1  |[[1, foo, 1foo]]                                |
|2  |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
+---+------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Notice that for root-level columns it is enough to use col, but getField would still be fine.

Drop Column

By the second method, you might have already caught to the naming convention. This method is called nestedDropColumn and is the most straight forward of the three. Just provide a fully qualified name.

scala> df.nestedDropColumn("my_array.b").printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)

scala> df.nestedDropColumn("my_array.b").show(false)
+---+---------------+
|id |my_array       |
+---+---------------+
|1  |[[1]]          |
|2  |[[1], [2], [3]]|
+---+---------------+
Enter fullscreen mode Exit fullscreen mode

Map column

Map column is probably the one with the most use-cases. The map will apply a function on each element of your struct and puts an output on the same level by default, or somewhere else if specified.

If the input column is a primitive, then a simple function will suffice. If it is a struct, then you will have to use getField again.

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
root
 |-- id: long (nullable = true)
 |-- my_array: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
+---+---------------------------------------+
|id |my_array                               |
+---+---------------------------------------+
|1  |[[1, foo, 2]]                          |
|2  |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Afterword

I hope these methods and the library will help you as much as they helped me. They make working with structures a lot easier and keep my code more concise, which in my head means less error-prone.

For more info go to https://github.com/AbsaOSS/spark-hats

Good luck and happy coding!

Top comments (1)

Collapse
 
unsmashedavo profile image
anish

this is gold!