DEV Community

Akmal Chaudhri for SingleStore

Posted on

Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks

Abstract

In this article, we'll see how to use the Apache Spark GraphFrames package with SingleStore by using historical data about the London Underground network. We'll store the data as stations (vertices) and line connections (edges) in SingleStore and then load the data into a notebook environment and perform some queries on the data using GraphFrames.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:

  • Workspace Group Name: Spark Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: spark-demo
  • Size: S-00

Create a new notebook

From the left navigation pane in the cloud portal, we'll select Develop > Notebooks.

In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

Figure 1. New Notebook.

We'll call the notebook spark_graphframes_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let's install Spark:

!pip cache purge --quiet
!conda install -y --quiet -c conda-forge openjdk pyspark
Enter fullscreen mode Exit fullscreen mode

Next, we'll install some libraries:

!pip install folium --quiet
!pip install graphframes --quiet
Enter fullscreen mode Exit fullscreen mode

In previous articles in our Apache Spark series using notebooks, we've downloaded jar files into a local directory. However, in this article, we'll use spark.jars.packages and create our SparkSession as follows:

from pyspark.sql import SparkSession

# List of Maven coordinates for all required packages
maven_packages = [
    "graphframes:graphframes:0.8.3-spark3.5-s_2.12",
    "org.scala-lang:scala-library:2.12",
    "com.singlestore:singlestore-jdbc-client:1.2.1",
    "com.singlestore:singlestore-spark-connector_2.12:4.1.5-spark-3.5.0",
    "org.apache.commons:commons-dbcp2:2.12.0",
    "org.apache.commons:commons-pool2:2.12.0",
    "io.spray:spray-json_3:1.3.6"
]

# Create Spark session with all required packages
spark = (SparkSession
             .builder
             .config("spark.jars.packages", ",".join(maven_packages))
             .appName("Spark GraphFrames Test")
             .getOrCreate()
        )

spark.sparkContext.setLogLevel("ERROR")
Enter fullscreen mode Exit fullscreen mode

Now we'll create the connection details to SingleStore:

host = "<HOST>"
password = "<PASSWORD>"
port = "3306"
cluster = host + ":" + port
Enter fullscreen mode Exit fullscreen mode

Replace <HOST> and <PASSWORD> with the values for your environment. These values can be obtained from the workspace using Connect > SQL IDE.

We also need to set some configuration parameters:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

A database is required, so we'll create one:

DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
Enter fullscreen mode Exit fullscreen mode

We'll also create tables to store the data:

USE spark_demo;

DROP TABLE IF EXISTS connections;
CREATE ROWSTORE TABLE IF NOT EXISTS connections (
     src      INT,
     dst      INT,
     line     VARCHAR(32),
     colour   VARCHAR(8),
     time     INT,
     PRIMARY KEY(src, dst, line)
);

DROP TABLE IF EXISTS stations;
CREATE ROWSTORE TABLE IF NOT EXISTS stations (
     id          INT PRIMARY KEY,
     latitude    DOUBLE,
     longitude   DOUBLE,
     name        VARCHAR(32),
     zone        FLOAT,
     total_lines INT,
     rail        INT
);
Enter fullscreen mode Exit fullscreen mode

Now we'll download the historical data for the London Underground into Pandas. We'll also perform some adjustments for GraphFrames and some merging of the data to mirror the schema of the tables we defined above:

import pandas as pd

connections_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_connections.csv"
stations_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_stations.csv"
lines_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_lines.csv"

connections_df = pd.read_csv(connections_url)
connections_df.rename(
    columns = {"station1": "src", "station2": "dst"},
    inplace = True
)

stations_df = pd.read_csv(stations_url)
stations_df.drop(
    "display_name",
    axis = 1,
    inplace = True
)

lines_df = pd.read_csv(lines_url)
lines_df.drop(
    "stripe",
    axis = 1,
    inplace = True
)

connections_df = pd.merge(
    connections_df,
    lines_df,
    on = "line",
    how = "left"
)
connections_df.drop(
    "line",
    axis = 1,
    inplace = True
)
connections_df.rename(
    columns = {"name": "line"},
    inplace = True
)
Enter fullscreen mode Exit fullscreen mode

Just before we save our data into SingleStore, we'll create a map of the London Underground using Folium:

import folium

London = [51.509865, -0.118092]
mymap = folium.Map(location = London, zoom_start = 12)

# Add markers for stations
for idx, row in stations_df.iterrows():
    folium.Marker(
        [row["latitude"], row["longitude"]],
        popup = row["name"]
    ).add_to(mymap)

# Add lines with colours
for idx, row in connections_df.iterrows():
    source = stations_df.loc[stations_df["id"] == row["src"]]
    target = stations_df.loc[stations_df["id"] == row["dst"]]

    # Extract latitude and longitude
    source_coords = (float(source["latitude"].iloc[0]), float(source["longitude"].iloc[0]))
    target_coords = (float(target["latitude"].iloc[0]), float(target["longitude"].iloc[0]))

    folium.PolyLine(
        locations = [source_coords, target_coords],
        color = row["colour"]
    ).add_to(mymap)

mymap
Enter fullscreen mode Exit fullscreen mode

This produces a map, as shown in Figure 2. We can scroll and zoom the map. When clicked, a marker will show the station name and the lines are coloured according to the London Underground scheme.

Figure 2. Map using Folium.

Figure 2. Map using Folium.

We'll now prepare the connection to SingleStore:

from sqlalchemy import *

db_connection = create_engine(connection_url)
Enter fullscreen mode Exit fullscreen mode

and write the connections data:

connections_df.to_sql(
    "connections",
    con = db_connection,
    if_exists = "append",
    index = False,
    chunksize = 1000
)
Enter fullscreen mode Exit fullscreen mode

and stations data:

stations_df.to_sql(
    "stations",
    con = db_connection,
    if_exists = "append",
    index = False,
    chunksize = 1000
)
Enter fullscreen mode Exit fullscreen mode

We can check the data in the connections table:

SELECT * FROM connections LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

and stations table:

SELECT * FROM stations LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

With the data safely stored in SingleStore, we can now read it back into Spark and use GraphFrames. First the connections data:

connections = (spark.read
    .format("singlestore")
    .load("spark_demo.connections")
)
Enter fullscreen mode Exit fullscreen mode

and then the stations data:

stations = (spark.read
    .format("singlestore")
    .load("spark_demo.stations")
)
Enter fullscreen mode Exit fullscreen mode

Now we'll create a GraphFrame:

from graphframes import GraphFrame

underground = GraphFrame(stations, connections)
Enter fullscreen mode Exit fullscreen mode

We can show the vertices:

underground.vertices.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+--------+---------+---------------+----+-----------+----+
| id|latitude|longitude|           name|zone|total_lines|rail|
+---+--------+---------+---------------+----+-----------+----+
| 25|  51.512|  -0.1031|    Blackfriars| 1.0|          2|   0|
| 39| 51.5481|  -0.1188|Caledonian Road| 2.0|          1|   0|
| 43| 51.5147|   0.0082|   Canning Town| 3.0|          2|   0|
| 50| 51.7052|   -0.611|        Chesham|10.0|          1|   0|
| 60| 51.5129|  -0.1243|  Covent Garden| 1.0|          1|   0|
+---+--------+---------+---------------+----+-----------+----+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

We can show the edges:

underground.edges.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+---+--------------------+-------+----+
|src|dst|                line| colour|time|
+---+---+--------------------+-------+----+
|  7|145|       Northern Line|#000000|   2|
| 11|163|       Bakerloo Line|#B36305|   1|
| 19| 97|Docklands Light R...|#00A4A7|   2|
| 28|192|        Central Line|#E32017|   1|
| 49|151|       Northern Line|#000000|   2|
+---+---+--------------------+-------+----+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

We can check how many stations are in each London Underground Zone:

(underground
    .vertices
    .groupBy("zone")
    .count()
    .orderBy("count", ascending = False)
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+----+-----+
|zone|count|
+----+-----+
| 2.0|   75|
| 1.0|   60|
| 3.0|   47|
| 4.0|   38|
| 5.0|   28|
| 6.0|   18|
| 2.5|   17|
| 3.5|    6|
| 1.5|    4|
| 8.0|    2|
|10.0|    2|
| 7.0|    2|
| 9.0|    1|
| 6.5|    1|
| 5.5|    1|
+----+-----+
Enter fullscreen mode Exit fullscreen mode

It may be useful to find the number of stations by the line name:

(underground
    .edges
    .filter("line = 'District Line'")
    .count()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

59
Enter fullscreen mode Exit fullscreen mode

It could be interesting to know the maximum number of lines running through a station:

(underground
    .vertices
    .groupBy()
    .max("total_lines")
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+----------------+
|max(total_lines)|
+----------------+
|               6|
+----------------+
Enter fullscreen mode Exit fullscreen mode

and to find the station with the most lines running through it:

(underground
    .vertices
    .filter("total_lines == 6")
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+--------+---------+--------------------+----+-----------+----+
| id|latitude|longitude|                name|zone|total_lines|rail|
+---+--------+---------+--------------------+----+-----------+----+
|145| 51.5308|  -0.1238|King's Cross St. ...| 1.0|          6|   1|
+---+--------+---------+--------------------+----+-----------+----+
Enter fullscreen mode Exit fullscreen mode

Many more types of queries are possible. The GraphFrames documentation is a good place to start.

Finally, we can stop Spark:

spark.stop()
Enter fullscreen mode Exit fullscreen mode

Summary

In this short article, we've seen the ease with which we can store graph data in SingleStore and how we can use GraphFrames to perform various queries on the data.

Top comments (0)