loading...
Cover image for Using Aerospike Connect For Spark
Aerospike

Using Aerospike Connect For Spark

kentune profile image Ken Tune ・5 min read

Aerospike is a highly scalable key value database offering best in class performance. It is typically deployed into real-time environments managing terabyte to petabyte data volumes.

Aerospike will typically be run alongside other scalable distributed software such as Kafka, for system coupling or Spark for analytics. The Aerospike Connect product line makes integration as easy as possible.

This article looks at how Aerospike Spark Connect works in practice by offering a comprehensive and easily reproduced end to end example using aerospike-ansible.

Database Cluster Setup

First take a look at Ansible for Aerospike which explains how to use aerospike-ansible.

In this example I set cluster_instance_type to c5d.18xlarge in vars/cluster-config.yml.

Follow the instructions up to and including one touch setup. You'll get as far as

ansible-playbook aws-setup-plus-aerospike-install.yml
ansible-playbook aerospike-java-client-setup.yml
Enter fullscreen mode Exit fullscreen mode

which will give you a 3 node cluster by default, plus a client instance with relevant software installed.

Spark Cluster Setup

This is done via

ansible-playbook spark-cluster-setup.yml
Enter fullscreen mode Exit fullscreen mode

For this example, prior to running, I set spark_instance_type to c5d.4xlarge in vars/cluster-config.yml.

This playbook creates a 3 node Spark cluster, of the given instance type, with Spark installed and running. It also installs Aerospike Spark Connect.

Note you will need to set enterprise: true and provide a path to a valid Aerospike feature key using feature_key: /your/path/to/key in vars/cluster-config.yml. You must therefore be either a licensed Aerospike customer, or running an Aerospike trial.

Near the end of the process you will see

TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************
ok: [localhost] => {
    "msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077."
}
Enter fullscreen mode Exit fullscreen mode

Make a note of the Spark master internal url - it is needed later.

Load Data

Our example makes use of 20m records from the 1bn NYC Taxi ride corpus, available in compressed form at https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz. We load to Aerospike using aerospike loader, which is installed on the client machine set up above. First of all we get the addresses of the hosts in the Aerospike cluster - these are needed later.

source ./scripts/cluster-ip-address-list.sh
Enter fullscreen mode Exit fullscreen mode

Sample output

Adds cluster ips to this array- AERO_CLUSTER_IPS
Use as ${ AERO_CLUSTER_IPS[index]}
There are 3 entries

##########################################################

cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58
cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234
cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95
Enter fullscreen mode Exit fullscreen mode

aerospike loader requires a config file to load the data into Aerospike. This maps csv column postions to named and typed bins. A sample entry looks like

{
    "name": "pkup_datetime",
    "value": {
        "column_position": 3,
        "type": "timestamp",
        "encoding": "yyyy-MM-dd hh:mm:ss",
        "dst_type": "integer"
    }
}
Enter fullscreen mode Exit fullscreen mode

This is provided in the repo at recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json. We upload this to the client instance.

source ./scripts/client-ip-address-list.sh 
scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~
Enter fullscreen mode Exit fullscreen mode

Next get the data onto the client machine. There's more than one way to do this, but you need to plan as the dataset is 7.6Gb when uncompressed. I used the below, but specifics will depend on the specifics of your drives and filesystem.

./scripts/client-quick-ssh.sh # to log in, followed by

sudo mkfs.ext4 /dev/nvme1n1
sudo mkdir /data
sudo mount -t ext4 /dev/nvme1n1 /data
sudo chmod 777 /data

wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz
gunzip /data/trips_xaa.csv.gz
Enter fullscreen mode Exit fullscreen mode

Finally we load our data in, using the config file we uploaded.

cd ~/aerospike-loader
./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv 
Enter fullscreen mode Exit fullscreen mode

Note we're using one of the cluster ip addresses we recorded earlier.

Using Spark

Log into one of the Spark nodes. Via aerospike-ansible there is a utility script for this

./scripts/spark-quick-ssh.sh 
Enter fullscreen mode Exit fullscreen mode

Start up a Spark shell, using the Spark master URL we saw when running the Spark cluster setup playbook.

/spark/bin/spark-shell --master spark://10.0.2.122:7077
Enter fullscreen mode Exit fullscreen mode

Import relevant libraries

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import java.util.Date
import java.text.SimpleDateFormat
Enter fullscreen mode Exit fullscreen mode

Supply Aerospike configuration - note we supply the cluster ip used previously:

spark.conf.set("aerospike.seedhost", "10.0.0.234")
spark.conf.set("aerospike.namespace", "test")
Enter fullscreen mode Exit fullscreen mode

Define a view, and a function we will be using

val sqlContext = spark.sqlContext
sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))
val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load
taxi.createOrReplaceTempView("taxi")
Enter fullscreen mode Exit fullscreen mode

Finally we run our queries

// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()

+--------+--------+                                                             
|cab_type|   count|
+--------+--------+
|   green|20000000|
+--------+--------+

// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()

+-------------+----------+                                                      
|passenger_cnt|avg_amount|
+-------------+----------+
|            0|     10.86|
|            1|     14.63|
|            2|     15.75|
|            3|     15.87|
|            4|     15.85|
|            5|     14.76|
|            6|     15.42|
|            7|     23.74|
|            8|     19.52|
|            9|      34.9|
+-------------+----------+

// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()

+-------------+---------+--------+                                              
|passenger_cnt|trip_year|   count|
+-------------+---------+--------+
|            0|     2014|    4106|
|            1|     2014|16557518|
|            2|     2014| 1473578|
|            3|     2014|  507862|
|            4|     2014|  160714|
|            5|     2014|  939276|
|            6|     2014|  355846|
|            7|     2014|     492|
|            8|     2014|     494|
|            9|     2014|     114|
+-------------+---------+--------+

// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")
result.show()

+-------------+---------+--------+-------+                                      
|passenger_cnt|trip_year|distance|  trips|
+-------------+---------+--------+-------+
|            1|     2014|     1.0|5321230|
|            1|     2014|     2.0|3500458|
|            1|     2014|     3.0|2166462|
|            1|     2014|     4.0|1418494|
|            1|     2014|     5.0| 918460|
|            1|     2014|     0.0| 868210|
|            1|     2014|     6.0| 653646|
|            1|     2014|     7.0| 488416|
|            2|     2014|     1.0| 433746|
|            1|     2014|     8.0| 345728|
|            2|     2014|     2.0| 305578|
|            5|     2014|     1.0| 302120|
|            1|     2014|     9.0| 226278|
|            5|     2014|     2.0| 199968|
|            2|     2014|     3.0| 199522|
|            1|     2014|    10.0| 163928|
|            3|     2014|     1.0| 145580|
|            2|     2014|     4.0| 137152|
|            5|     2014|     3.0| 122714|
|            1|     2014|    11.0| 117570|
+-------------+---------+--------+-------+
only showing top 20 rows

Enter fullscreen mode Exit fullscreen mode

Conclusion

This shows you how quickly you can get up and running with a large data corpus. The example was done with 20m rows but this is easily extended to the full corpus. We can also see just how quickly you can get up and running with the aerospike-ansible tooling.

Discussion

pic
Editor guide