In this blog post, we'll guide you through the specifics of building a Kedro project that uses managed Delta tables in Databricks using the newly-released ManagedTableDataSet.
What is Kedro?
Kedro is a toolbox for production-ready data science. It's an open-source Python framework that enables the development of clean data science code, borrowing concepts from software engineering and applying them to machine-learning projects. A Kedro project provides scaffolding for complex data and machine-learning pipelines. It enables developers to spend less time on tedious "plumbing" and focus on solving new problems.
What is Databricks?
Databricks is a unified data analytics platform designed for simplifying big data processing and free-form data exploration at any scale. Based on Apache Spark, an open-source distributed computing system, Databricks provides a collaborative cloud-based environment where users can process large amounts of data.
The platform provides collaborative workspaces (notebooks) and computational resources (clusters) to run code with. Clusters are groups of nodes that run Apache Spark. Notebooks are collaborative web-based interfaces where users can write and execute code on an attached cluster.
Why use Kedro on Databricks?
As we've described, Kedro offers a framework for building modular and scalable data pipelines, while Databricks provides a platform for running Spark jobs and managing data. You can combine Kedro and Databricks to build and deploy data pipelines and get the best of both worlds. Kedro's open-source framework will help you to build well-organised and maintainable pipelines, while Databricks' platform will provide you with the scalability you need to run your pipeline in production. Check out the recently-updated Kedro documentation for a set of workflow options for integrating Kedro projects and Databricks. (Additionally, the third-party kedro-mlflow plugin integrates mlflow capabilities inside Kedro projects to enhance reproducibility for machine learning experimentation).
What are Kedro datasets?
Kedro datasets are abstractions for reading and loading data, designed to decouple these operations from your business logic. These datasets manage reading and writing data from a variety of sources, while also ensuring consistency, tracking, and versioning. They allow users to maintain focus on core data processing, leaving data I/O tasks to Kedro.
What is managed data in Databricks?
To understand the concept of managed data in Databricks, it is first necessary to outline how Databricks organises data. At the highest level, Databricks uses metastores to store the metadata associated with data objects. Databricks Unity Catalog is one such metastore. It provides data governance and management across multiple Databricks workspaces. The metastore organises tables (where your data is stored) in a hierarchical structure.
The highest level of organisation in this hierarchy is the catalog. Catalogs are a collection of databases (also referred to as schemas in Databricks' terminology). A database is the second level of organisation in the Unity Catalog namespacing model. Databases are a collection of tables. The tables in a database are the third level of organisation in this hierarchy.
A table is structured data, stored as a directory of files on cloud object storage. By default, Databricks creates tables as Delta tables, which store data using the Delta Lake format. Delta Lake is an open-source storage format that offers ACID transactions, time travel and audit history.
Databricks tables belong to one of two categories: managed and unmanaged (external) tables. Databricks manages both the data and associated metadata of managed tables. If you drop a managed table, you will delete the underlying data. The data of a managed table resides in the location of the database to which it is registered.
On the other hand, for unmanaged tables, Databricks only manages the metadata. If you drop an unmanaged table, you will not delete the underlying data. These tables require a specified location during creation.
How to work with managed Delta tables using Kedro
Let's demonstrate how to use the ManagedTableDataSet with a simple example on Databricks. You'll need to open a new Databricks notebook and attach it to a cluster to follow along with the rest of this example, which runs on a workspace using a Hive metastore. We'll create a dataset containing weather readings, save it to a managed Delta table on Databricks, append some data, and access a specific table version to showcase Delta Lake's time travel capabilities.
Run every separate code snippet in this section in a new notebook cell.
The first steps are to set up your workspace by creating a weather
database in your metastore and installing Kedro. Run the following SQL code to create the database:
%sql
create database if not exists weather;
To install Kedro and the ManagedTableDataSet
, use the %pip
magic:
%pip install kedro kedro-datasets[databricks.ManagedTableDataSet]
The first part of our program will create some weather data. We'll create a Spark DataFrame with four columns: date
, location
, temperature
, and humidity
to store our weather data. Then, we'll use a new instance of ManagedTableDataSet
to save our DataFrame to a Delta table called 2023_06_22
(the day of the readings) in the weather
database.
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)
from kedro_datasets.databricks import ManagedTableDataSet
spark_session = SparkSession.builder.getOrCreate()
# Define schema
schema = StructType([
StructField("date", StringType(), True),
StructField("location", StringType(), True),
StructField("temperature", IntegerType(), True),
StructField("humidity", IntegerType(), True),
])
# Create DataFrame
data = [
('2023-06-22', 'London', 27, 39),
('2023-06-22', 'Warsaw', 28, 40),
('2023-06-22', 'Bucharest', 32, 38),
]
spark_df = spark_session.createDataFrame(data, schema)
# Create a ManagedTableDataSet instance using a new table named '2023_06_22'
weather = ManagedTableDataSet(database="weather", table="2023_06_22")
# Save the DataFrame to the table
weather.save(spark_df)
To load our data back into a dataframe, we use the load
method on ManagedTableDataSet
:
# Load the table data into a DataFrame
reloaded = weather.load()
# Print the first 3 rows of the DataFrame
display(reloaded.take(3))
This code loads the data from the weather
table back into a Spark DataFrame and shows the first three rows of the data:
| date | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest | 32 | 38 |
|2023-06-22| London | 27 | 39 |
|2023-06-22| Warsaw | 28 | 40 |
Let's say we take some more weather readings later in the day and want to add them to our Delta table. To do this, we can write to it using a new instance of ManagedTableDataSet
initialised with "append"
passed in as an argument to write_mode
:
# Append new rows to the data
new_rows = [
('2023-06-22', 'Cairo', 35, 25),
('2023-06-22', 'Lisbon', 28, 44),
]
spark_df = spark_session.createDataFrame(new_rows, schema)
weather = ManagedTableDataSet(
database="weather",
table="2023_06_22",
write_mode="append"
)
weather.save(spark_df)
The code above adds new rows for Cairo and Lisbon to our Delta table, which creates a new version of the table.
The ManagedTableDataSet
class allows for saving data with three different write modes: overwrite
, append
, and upsert
:
overwrite
mode will completely replace the current data in the table with the new data.append
mode will add new data to the existing table.upsert
mode updates existing rows and inserts new rows, based on a specified primary key. Notably, if the table doesn't exist at save, theupsert
mode behaves similarly to append, inserting data into a new table.
Suppose we later want to access our data as it appeared earlier in the day when we had only taken three readings. The ManagedTableDataSet
class supports accessing different versions of the Delta table. We can access a specific version by defining a Kedro Version
and passing it into a new instance of ManagedTableDataSet
:
from kedro.io import Version
# Load version 0 of the table
weather = ManagedTableDataSet(
database="weather",
table="2023_06_22",
version=Version(load=0, save=None)
)
reloaded = weather.load()
display(reloaded)
# Load version 1 of the table
weather = ManagedTableDataSet(
database="weather",
table="2023_06_22",
version=Version(load=1, save=None)
)
reloaded = weather.load()
display(reloaded)
You will see two rendered tables as the output of running this code. The first corresponds to version 0 of the 2023_06_22
table, while the second corresponds to version 1:
| date | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest | 32 | 38 |
|2023-06-22| London | 27 | 39 |
|2023-06-22| Warsaw | 28 | 40 |
| date | location | temperature | humidity |
|:--------:|:--------:|:-----------:|:--------:|
|2023-06-22|Bucharest | 32 | 38 |
|2023-06-22| London | 27 | 39 |
|2023-06-22| Warsaw | 28 | 40 |
|2023-06-22| Lisbon | 28 | 44 |
|2023-06-22| Cairo | 35 | 25 |
And that's it! We've put together a simple program to show some of the usual tasks that ManagedTableDataSet
facilitates, making it easy to save, load, and manage versions of your data in Delta tables on Databricks.
Conclusion
Databricks is a fast-growing deployment vector for Kedro projects. This blog post has demonstrated how to combine the power of both Kedro and Databricks with an open-source ManagedTableDataSet
that enables streamlined data I/O operations when deploying a Kedro project on Databricks. ManagedTableDataSet
empowers you to spend more time implementing the business logic of your data pipeline or machine learning workflow and less time manually handling data.
Top comments (0)