DEV Community

Jim Hatcher
Jim Hatcher

Posted on • Updated on

Apache Spark job to update CockroachDB data

Apache Spark is a distributed execution framework which is a wonderfully complementary tool for working with distributed systems.

When working in Spark, my go-to interface is Spark SQL since I can leverage my SQL skills to get at the data I want. However, you can't do any kind of data mutations via SparkSQL (only reads). For writes, you have to use the DataFrames/DataSets or the RDD interfaces.

If you're doing an initial import of a table from CSV (or something similar), you can do something like this:

dataFrameCSV.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()
Enter fullscreen mode Exit fullscreen mode

This is a great option for cases when you're either loading the whole table for the first time.

But, for cases when you want to update some of the records in the table, this write() interface is not great because Spark will either complain that the table already has records, or if you tell Spark to overwrite, it will wipe out the records in your table.

So, for more nuanced mutations, you can drop down into the java.sql library and have Spark execute batches of updates.

Here's how I went about doing this.

First, I needed to install Spark on my environment (my MacBook), so I used brew to install:

  • Scala 2.12 (Spark 3 doesn't like Scala 2.11)
  • Oracle Java 11 (Spark doesn't seem to like more recent OpenJDK versions)
  • Spark (Spark 3.2.1 is the latest as of the time of me writing this blog)

I already had a Postgres driver installed via Maven, but you can install this too, if necessary.

I ran the spark shell using this command:

spark-shell \
  --driver-class-path /Users/jimhatcher/.m2/repository/org/postgresql/postgresql/42.2.19/postgresql-42.2.19.jar \
  --jars /Users/jimhatcher/.m2/repository/org/postgresql/postgresql/42.2.19/postgresql-42.2.19.jar \
  --conf spark.jars=/Users/jimhatcher/.m2/repository/org/postgresql/postgresql/42.2.19/postgresql-42.2.19.jar \
  --conf spark.executor.memory=4g \
  --conf spark.driver.memory=4g
Enter fullscreen mode Exit fullscreen mode

Spark runs as three separate components: the Spark master, the driver, and the executor(s). I passed references to the Postgres jar in several parameters to make sure that the various pieces knew how to reference the Postgres driver.

I also bumped up the memory available to the executor and drivers.

The next thing I needed to do was to make sure that I had good certs that I could use to access my CockroachDB instance. When CRDB issues keys, it uses a PEM format. Using JDBC (as we will in this Spark job), Java doesn't mind the certs, but it wants the keys in a DER format. So, I ran the following command to convert my client key to DER:

openssl pkcs8 -topk8 -inform PEM -outform DER -in client.root.key -out client.root.key.der -nocrypt
Enter fullscreen mode Exit fullscreen mode

I wrote the following program to read a table from Cockroach, find the records where a certain field was null and then update that field.

After some trial and error, this is what I ended up with:

import java.sql._
import org.apache.spark.sql._

val url = "jdbc:postgresql://<host_name_here>:26257/test?sslmode=require&sslcert=/Users/jimhatcher/spark-cluster-certs/client.root.crt&sslkey=/Users/jimhatcher/spark-cluster-certs/client.root.key.der&sslrootcert=/Users/jimhatcher/spark-cluster-certs/ca.crt&reWriteBatchedInserts=true"
val sql = "( select id from test.test_table where f1 IS NULL) t1"
val df = spark.read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", sql)
  .option("user", "root")
  .option("partitionColumn", "id")
  .option("lowerBound", "1")
  .option("upperBound", "30000000")
  .option("numPartitions", "10")
  .load

df.coalesce(10).foreachPartition(
  (partition: Iterator[Row]) => {

    val dbc: Connection = DriverManager.getConnection(url, "root", "")
    val batchSize = 10000
    val st: PreparedStatement = dbc.prepareStatement("UPDATE test.test_table SET f1 = 1 WHERE id = ?")

    partition.grouped(batchSize).foreach (
      batch => {
        batch.foreach (
          row => {
            st.setLong(1, row.getLong(0)) 
            st.addBatch()
          }
        )
        st.executeBatch()
      }
    )

    dbc.close()

  }
)
Enter fullscreen mode Exit fullscreen mode

I ran this in the Spark shell by copying/pasting. (The ":paste" command is nice for this.)

This program wasn't particularly fast. I think there's some optimization I could do against it. I also didn't run it on a real Spark cluster. One super cool thing about Spark is that it's so easy to parallelize jobs like this by throwing more hardware resources at them.

I'd like to go back and try to optimize this further. But in the meantime, hopefully this is a decent example that folks can build on for doing Spark jobs against CockroachDB.

Top comments (0)