This article guides data practitioners on how to set up a Kedro project to use the new SparkStreaming
Kedro dataset, with example use cases, and a deep-dive on some design considerations. It's meant for data practitioners familiar with Kedro so we'll not be covering the basics of a project, but you can familiarise yourself with them in the Kedro documentation.
What is Kedro?
Kedro is an open-source Python toolbox that applies software engineering principles to data science code. It makes it easier for a team to apply software engineering principles to data science code, which reduces the time spent rewriting data science experiments so that they are fit for production.
Kedro was born at QuantumBlack to solve the challenges faced regularly in data science projects and promote teamwork through standardised team workflows. It is now hosted by the LF AI & Data Foundation as an incubating project.
What are Kedro datasets?
Kedro datasets are abstractions for reading and loading data, designed to decouple these operations from your business logic. These datasets manage reading and writing data from a variety of sources, while also ensuring consistency, tracking, and versioning. They allow users to maintain focus on core data processing, leaving data I/O tasks to Kedro.
What is Spark Structured Streaming?
Spark Structured Streaming is built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data, and the Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive.
Integrating Kedro and Spark Structured Streaming
Kedro is easily extensible for your own workflows and this article explains one of the ways to add new functionality. To enable Kedro to work with Spark Structured Streaming, a team inside QuantumBlack Labs developed a new Spark Streaming Dataset, as the existing Kedro Spark dataset was not compatible with Spark Streaming use cases. To ensure seamless streaming, the new dataset has a checkpoint location specification to avoid data duplication in streaming use cases and it uses .start()
at the end of the _save
method to initiate the stream.
Set up a project to integrate Kedro with Spark Structured streaming
The project uses a Kedro dataset to build a structured data pipeline that can read and write data streams with Spark Structured Streaming and process data streams in realtime. You need to add two separate Hooks to the Kedro project to enable it to function as a streaming application.
Integration involves the following steps:
- Create a Kedro project.
- Register the necessary PySpark and streaming related Hooks.
- Configure the custom dataset in the
catalog.yml
file, defining the streaming sources and sinks. - Use Kedro’s new dataset for Spark Structured Streaming to store intermediate dataframes generated during the Spark streaming process.
Create a Kedro project
Ensure you have installed a version of Kedro greater than version 0.18.9 and kedro-datasets
greater than version 1.4.0.
pip install kedro~=0.18.0 kedro-datasets~=1.4.0
Create a new Kedro project using the Kedro pyspark
starter:
kedro new --starter=pyspark
Register the necessary PySpark and streaming related Hooks
To work with multiple streaming nodes, two hooks are required. The first is for integrating PySpark: see Build a Kedro pipeline with PySpark for details. You will also need a Hook for running a streaming query without termination unless an exception occurs.
Add the following code to src/$your_kedro_project_name/hooks.py
:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from kedro.framework.hooks import hook_impl
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader.get("spark*", "spark*/**")
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context._package_name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
class SparkStreamsHook:
@hook_impl
def after_pipeline_run(self) -> None:
"""Starts a spark streaming await session
once the pipeline reaches the last node.
"""
spark = SparkSession.builder.getOrCreate()
spark.streams.awaitAnyTermination()
Register the Hooks in src/$your_kedro_project_name/settings.py
:
"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""
from .hooks import SparkHooks, SparkStreamsHook
HOOKS = (SparkHooks(), SparkStreamsHook())
# Instantiated project hooks.
# from streaming.hooks import ProjectHooks
# HOOKS = (ProjectHooks(),)
# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)
# Class that manages storing KedroSession data.
# from kedro.framework.session.shelvestore import ShelveStore
# SESSION_STORE_CLASS = ShelveStore
# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor.
# SESSION_STORE_ARGS = {
# "path": "./sessions"
# }
# Class that manages Kedro's library components.
# from kedro.framework.context import KedroContext
# CONTEXT_CLASS = KedroContext
# Directory that holds configuration.
# CONF_SOURCE = "conf"
# Class that manages how configuration is loaded.
# CONFIG_LOADER_CLASS = ConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
# CONFIG_LOADER_ARGS = {
# "config_patterns": {
# "spark" : ["spark*/"],
# "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
# }
# }
# Class that manages the Data Catalog.
# from kedro.io import DataCatalog
# DATA_CATALOG_CLASS = DataCatalog
How to set up your Kedro project to read data from streaming sources
Once you have set up your project, you can use the new Kedro Spark streaming dataset. You need to configure the data catalog, in conf/base/catalog.yml
as follows to read from a streaming JSON file:
raw_json:
type: spark.SparkStreamingDataSet
filepath: data/01_raw/stream/inventory/
file_format: json
Additional options can be configured via the load_args
key.
int.new_inventory:
type: spark.SparkStreamingDataSet
filepath: data/02_intermediate/inventory/
file_format: csv
load_args:
header: True
How to set up your Kedro project to write data to streaming sinks
All the additional arguments can be kept under the save_args
key:
processed.sensor:
type: spark.SparkStreamingDataSet
file_format: csv
filepath: data/03_primary/processed_sensor/
save_args:
output_mode: append
checkpoint: data/04_checkpoint/processed_sensor
header: True
Note that when you use the Kafka format, the respective packages should be added to the spark.yml
configuration as follows:
spark.jars.packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
Design considerations
Pipeline design
In order to benefit from Spark's internal query optimisation, we recommend that any interim datasets are stored as memory datasets.
All streams start at the same time, so any nodes that have a dependency on another node that writes to a file sink (i.e. the input to that node is the output of another node) will fail on the first run. This is because there are no files in the file sink for the stream to process when it starts.
We recommended that you either keep intermediate datasets in memory or split out the processing into two pipelines and start by triggering the first pipeline to build up some initial history.
Feature creation
Be aware that windowing operations only allow windowing on time columns.
Watermarks must be defined for joins. Only certain types of joins are allowed, and these depend on the file types (stream-stream, stream-static) which makes joining of multiple tables a little complex at times. For further information or advice about join types and watermarking, take a look at the PySpark documentation or reach out on the Kedro Slack workspace.
Logging
When initiated, the Kedro pipeline will download the JAR required for the Spark Kafka. After the first run, it won't download the file again but simply retrieve it from where the previously downloaded file was stored.
For each node, the logs for the following will be shown: Loading data, Running nodes, Saving data, Completed x out of y tasks.
The completed log doesn't mean that the stream processing in that node has stopped. It means that the Spark plan has been created, and if the output dataset is being saved to a sink, the stream has started.
Once Kedro has run through all the nodes and the full Spark execution plan has been created, you'll see INFO Pipeline execution completed successfully
.
This doesn't mean the stream processing has stopped as the post run hook keeps the Spark Session alive. As new data comes in, new Spark logs will be shown, even after the "Pipeline execution completed" log.
If there is an error in the input data, the Spark error logs will come through and Kedro will shut down the SparkContext and all the streams within it.
In summary
In this article, we explained how to take advantage of one of the ways to extend Kedro by building a new dataset to create streaming pipelines. We created a new Kedro project using the Kedro pyspark
starter and illustrated how to work with Hooks, adding them to the Kedro project to enable it to function as a streaming application. The dataset was then easy to configure through the Kedro data catalog, making it possible to use the new dataset, defining the streaming sources and sinks.
There are currently some limitations because it is not yet ready for use with a service broker, e.g. Kafka, as an additional JAR package is required.
If you want to find out more about the ways to extend Kedro, take a look at the advanced Kedro documentation for more about Kedro plugins, datasets, and Hooks.
Contributors
This post was created by Tingting Wan, Tom Kurian, and Haris Michailidis, who are all Data Engineers in the London office of QuantumBlack, AI by McKinsey.
Top comments (0)