DEV Community

Akmal Chaudhri for SingleStore

Posted on

Quick tip: Using SingleStore with PyIceberg

Abstract

In a previous article, we implemented an Iceberg catalog using SingleStore and JDBC. Another way that we can create the catalog is using PyIceberg. In this article, we'll see how.

The notebook file used in this article is available on GitHub.

Introduction

PyIceberg is a Python library that provides a native interface for working with Apache Iceberg. It allows users to efficiently handle table metadata, schema evolution, and data partitioning. PyIceberg provides methods to interact with Iceberg tables, independent of the processing engine. By offering a flexible API, PyIceberg helps users manage data more effectively.

Create a SingleStoreDB Cloud account

A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings:

  • Workspace Group Name: PyIceberg Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: pyiceberg-demo
  • Size: S-00

We'll make a note of the password and store it in the secrets vault using the name password.

Import the notebook

We'll download the notebook from GitHub.

From the left navigation pane in the SingleStore cloud portal, we'll select DEVELOP > Data Studio.

In the top right of the web page, we'll select New Notebook > Import From File. We'll use the wizard to locate and import the notebook we downloaded from GitHub.

Run the notebook

After checking that we are connected to our SingleStore workspace, we'll run the cells one by one.

We'll use PyIceberg to create a tiny Iceberg Lakehouse in the SingleStore portal for testing purposes.

For production environments, please use a robust file system for your Lakehouse.

In the Iceberg Lakehouse, we'll store the Iris flower data set. We'll first download the Iris CSV file into a Pandas Dataframe.

We'll need to create a SingleStore database to use with Iceberg:

DROP DATABASE IF EXISTS iris_db;
CREATE DATABASE IF NOT EXISTS iris_db;
Enter fullscreen mode Exit fullscreen mode

And a table to store some of our Iris data:

DROP TABLE IF EXISTS iris;

CREATE TABLE IF NOT EXISTS iris (
    sepal_length FLOAT,
    sepal_width FLOAT,
    petal_length FLOAT,
    petal_width FLOAT,
    species VARCHAR(20)
);
Enter fullscreen mode Exit fullscreen mode

A quick and easy way to find the connection details for the database is to use the following:

from sqlalchemy import *

db_connection = create_engine(connection_url)
url = db_connection.url
Enter fullscreen mode Exit fullscreen mode

The url will contain the host, the port, and the database name. We'll use these details for storing the catalog in SingleStore.

Next, from the Pandas Dataframe, we'll save all Iris-virginica records in SingleStore:

pandas_df[pandas_df["species"] == "Iris-virginica"].to_sql(
    "iris",
    con = db_connection,
    if_exists = "append",
    index = False
)
Enter fullscreen mode Exit fullscreen mode

We'll then create the Iceberg catalog and namespace in SingleStore:

from pyiceberg.catalog.sql import SqlCatalog

config = {
    "uri": f"singlestoredb://admin:{password}@{url.host}:{url.port}/{url.database}",
    "warehouse": "warehouse",
}

catalog = SqlCatalog(
    name = "s2_catalog",
    **config
)

catalog.create_namespace("default")
Enter fullscreen mode Exit fullscreen mode

Next, we'll create an Iceberg table from the Pandas Dataframe:

import pyarrow as pa

df = pa.Table.from_pandas(pandas_df)

table = catalog.create_table(
    table_identifier,
    schema = df.schema
)
Enter fullscreen mode Exit fullscreen mode

and store it in our Lakehouse:

table.append(df)

len(table.scan().to_arrow())
Enter fullscreen mode Exit fullscreen mode

Now, we'll retrieve all records, except Iris-virginica:

df = table.scan(row_filter = "species != 'Iris-virginica'").to_arrow()
Enter fullscreen mode Exit fullscreen mode

and overwrite the existing table:

table.overwrite(df)

len(table.scan().to_arrow())
Enter fullscreen mode Exit fullscreen mode

So, this has deleted all Iris-virginica records in the Lakehouse. We can check this as follows:

arrow_table = table.scan().to_arrow()

species_counts = arrow_table["species"].value_counts()

print(species_counts.to_pandas())
Enter fullscreen mode Exit fullscreen mode

Example output:

0        {'values': 'Iris-setosa', 'counts': 50}
1    {'values': 'Iris-versicolor', 'counts': 50}
dtype: object
Enter fullscreen mode Exit fullscreen mode

From SingleStore, we'll retrieve the previously saved Iris-virginica records:

new_df = pd.read_sql(
    "SELECT * FROM iris WHERE species = 'Iris-virginica'",
    con = db_connection
)
Enter fullscreen mode Exit fullscreen mode

and append these to the Iceberg Lakehouse:

table.append(pa.Table.from_pandas(new_df))

len(table.scan().to_arrow())
Enter fullscreen mode Exit fullscreen mode

So, this restores the Iceberg Lakehouse to the full Iris data set. We can check this, as follows:

arrow_table = table.scan().to_arrow()

species_counts = arrow_table["species"].value_counts()

print(species_counts.to_pandas())
Enter fullscreen mode Exit fullscreen mode

Example output:

0     {'values': 'Iris-virginica', 'counts': 50}
1        {'values': 'Iris-setosa', 'counts': 50}
2    {'values': 'Iris-versicolor', 'counts': 50}
dtype: object
Enter fullscreen mode Exit fullscreen mode

Summary

In this short article, we've seen how to store an Iceberg catalog in SingleStore using PyIceberg. We've also stored some data in SingleStore and used that to restore an Iceberg Lakehouse. In a future article, we'll look at snapshots and time travel.

Top comments (0)