In this blog post, we'll guide you through the specifics of building a Kedro project that uses managed Delta tables in Databricks using the newly-released ManagedTableDataSet.
Kedro is a toolbox for production-ready data science. It's an open-source Python framework that enables the development of clean data science code, borrowing concepts from software engineering and applying them to machine-learning projects. A Kedro project provides scaffolding for complex data and machine-learning pipelines. It enables developers to spend less time on tedious "plumbing" and focus on solving new problems.
Databricks is a unified data analytics platform designed for simplifying big data processing and free-form data exploration at any scale. Based on Apache Spark, an open-source distributed computing system, Databricks provides a collaborative cloud-based environment where users can process large amounts of data.
The platform provides collaborative workspaces (notebooks) and computational resources (clusters) to run code with. Clusters are groups of nodes that run Apache Spark. Notebooks are collaborative web-based interfaces where users can write and execute code on an attached cluster.
As we've described, Kedro offers a framework for building modular and scalable data pipelines, while Databricks provides a platform for running Spark jobs and managing data. You can combine Kedro and Databricks to build and deploy data pipelines and get the best of both worlds. Kedro's open-source framework will help you to build well-organised and maintainable pipelines, while Databricks' platform will provide you with the scalability you need to run your pipeline in production. Check out the recently-updated Kedro documentation for a set of workflow options for integrating Kedro projects and Databricks. (Additionally, the third-party kedro-mlflow plugin integrates mlflow capabilities inside Kedro projects to enhance reproducibility for machine learning experimentation).
Kedro datasets are abstractions for reading and loading data, designed to decouple these operations from your business logic. These datasets manage reading and writing data from a variety of sources, while also ensuring consistency, tracking, and versioning. They allow users to maintain focus on core data processing, leaving data I/O tasks to Kedro.
To understand the concept of managed data in Databricks, it is first necessary to outline how Databricks organises data. At the highest level, Databricks uses metastores to store the metadata associated with data objects. Databricks Unity Catalog is one such metastore. It provides data governance and management across multiple Databricks workspaces. The metastore organises tables (where your data is stored) in a hierarchical structure.
The highest level of organisation in this hierarchy is the catalog. Catalogs are a collection of databases (also referred to as schemas in Databricks' terminology). A database is the second level of organisation in the Unity Catalog namespacing model. Databases are a collection of tables. The tables in a database are the third level of organisation in this hierarchy.
A table is structured data, stored as a directory of files on cloud object storage. By default, Databricks creates tables as Delta tables, which store data using the Delta Lake format. Delta Lake is an open-source storage format that offers ACID transactions, time travel and audit history.
Databricks tables belong to one of two categories: managed and unmanaged (external) tables. Databricks manages both the data and associated metadata of managed tables. If you drop a managed table, you will delete the underlying data. The data of a managed table resides in the location of the database to which it is registered.
On the other hand, for unmanaged tables, Databricks only manages the metadata. If you drop an unmanaged table, you will not delete the underlying data. These tables require a specified location during creation.
Let's demonstrate how to use the ManagedTableDataSet with a simple example on Databricks. You'll need to open a new Databricks notebook and attach it to a cluster to follow along with the rest of this example, which runs on a workspace using a Hive metastore. We'll create a dataset containing weather readings, save it to a managed Delta table on Databricks, append some data, and access a specific table version to showcase Delta Lake's time travel capabilities.
Run every separate code snippet in this section in a new notebook cell.
The first steps are to set up your workspace by creating a
weather database in your metastore and installing Kedro. Run the following SQL code to create the database:
%sql create database if not exists weather;
To install Kedro and the
ManagedTableDataSet, use the
%pip install kedro kedro-datasets[databricks.ManagedTableDataSet]
The first part of our program will create some weather data. We'll create a Spark DataFrame with four columns:
humidity to store our weather data. Then, we'll use a new instance of
ManagedTableDataSet to save our DataFrame to a Delta table called
2023_06_22 (the day of the readings) in the
from pyspark.sql import SparkSession from pyspark.sql.types import (StructField, StringType, IntegerType, StructType) from kedro_datasets.databricks import ManagedTableDataSet spark_session = SparkSession.builder.getOrCreate() # Define schema schema = StructType([ StructField("date", StringType(), True), StructField("location", StringType(), True), StructField("temperature", IntegerType(), True), StructField("humidity", IntegerType(), True), ]) # Create DataFrame data = [ ('2023-06-22', 'London', 27, 39), ('2023-06-22', 'Warsaw', 28, 40), ('2023-06-22', 'Bucharest', 32, 38), ] spark_df = spark_session.createDataFrame(data, schema) # Create a ManagedTableDataSet instance using a new table named '2023_06_22' weather = ManagedTableDataSet(database="weather", table="2023_06_22") # Save the DataFrame to the table weather.save(spark_df)
To load our data back into a dataframe, we use the
load method on
# Load the table data into a DataFrame reloaded = weather.load() # Print the first 3 rows of the DataFrame display(reloaded.take(3))
This code loads the data from the
weather table back into a Spark DataFrame and shows the first three rows of the data:
| date | location | temperature | humidity | |:--------:|:--------:|:-----------:|:--------:| |2023-06-22|Bucharest | 32 | 38 | |2023-06-22| London | 27 | 39 | |2023-06-22| Warsaw | 28 | 40 |
Let's say we take some more weather readings later in the day and want to add them to our Delta table. To do this, we can write to it using a new instance of
ManagedTableDataSet initialised with
"append" passed in as an argument to
# Append new rows to the data new_rows = [ ('2023-06-22', 'Cairo', 35, 25), ('2023-06-22', 'Lisbon', 28, 44), ] spark_df = spark_session.createDataFrame(new_rows, schema) weather = ManagedTableDataSet( database="weather", table="2023_06_22", write_mode="append" ) weather.save(spark_df)
The code above adds new rows for Cairo and Lisbon to our Delta table, which creates a new version of the table.
ManagedTableDataSet class allows for saving data with three different write modes:
overwritemode will completely replace the current data in the table with the new data.
appendmode will add new data to the existing table.
upsertmode updates existing rows and inserts new rows, based on a specified primary key. Notably, if the table doesn't exist at save, the
upsertmode behaves similarly to append, inserting data into a new table.
Suppose we later want to access our data as it appeared earlier in the day when we had only taken three readings. The
ManagedTableDataSet class supports accessing different versions of the Delta table. We can access a specific version by defining a Kedro
Version and passing it into a new instance of
from kedro.io import Version # Load version 0 of the table weather = ManagedTableDataSet( database="weather", table="2023_06_22", version=Version(load=0, save=None) ) reloaded = weather.load() display(reloaded) # Load version 1 of the table weather = ManagedTableDataSet( database="weather", table="2023_06_22", version=Version(load=1, save=None) ) reloaded = weather.load() display(reloaded)
You will see two rendered tables as the output of running this code. The first corresponds to version 0 of the
2023_06_22 table, while the second corresponds to version 1:
| date | location | temperature | humidity | |:--------:|:--------:|:-----------:|:--------:| |2023-06-22|Bucharest | 32 | 38 | |2023-06-22| London | 27 | 39 | |2023-06-22| Warsaw | 28 | 40 | | date | location | temperature | humidity | |:--------:|:--------:|:-----------:|:--------:| |2023-06-22|Bucharest | 32 | 38 | |2023-06-22| London | 27 | 39 | |2023-06-22| Warsaw | 28 | 40 | |2023-06-22| Lisbon | 28 | 44 | |2023-06-22| Cairo | 35 | 25 |
And that's it! We've put together a simple program to show some of the usual tasks that
ManagedTableDataSet facilitates, making it easy to save, load, and manage versions of your data in Delta tables on Databricks.
Databricks is a fast-growing deployment vector for Kedro projects. This blog post has demonstrated how to combine the power of both Kedro and Databricks with an open-source
ManagedTableDataSet that enables streamlined data I/O operations when deploying a Kedro project on Databricks.
ManagedTableDataSet empowers you to spend more time implementing the business logic of your data pipeline or machine learning workflow and less time manually handling data.