If you use Spark to transform your data and you are planning on load it directly from spark to Elastic Search then perhaps this short article is for you.
Not so long ago I was in the same situation and started reading the Elastic search documentation to build my PySpark job, but to be honest at the end of my reading I felt like I still had a lot of questions about it.
So thanks to the help of multiple questions answered in StackOverflow and other forums I was able to implement a PySpark job that actually works.
That is why I decided to write this small tutorial and to explain a little bit what are the options available.
If you want to check the full code then you can find it in this Gist.
Connection parameters
The base parameters we need to send in order to connect to Elastic.
df.write.format("org.elasticsearch.spark.sql") \
.option("es.nodes", <elastic_host>) \
.option("es.port", <elastic_port>)
If you are using a cloud environment like AWS or Google Cloud you would also need to add the following options.
.option("es.nodes.wan.only", "true") \
.option("es.nodes.discovery", "false")
Authentication and SSL
This part will depend a little bit on the kind of configurations you currently have on your Elastic Search instance but these are the main options:
SSL enabled or disabled:
This option set as true
well enable the ssl for the connection.
.option("es.net.ssl", "true") \
Authentication by user name and password:
If you choose to use the authentication by user name and password then these two need to be added.
.option("es.net.http.auth.user", <user_name>) \
.option("es.net.http.auth.pass", <password>) \
Authentication by ApiKey:
A better way to authenticate is to create an ApiKey and use the Bearer header as an option.
AUTH_TOKEN = f"ApiKey {API_KEY}"
.option("es.net.http.header.Authorization", AUTH_TOKEN) \
Use of a SSL Certificate:
This one can be a little bit tricky since you need to load the .jks
truststore file of your Elastic Search instance to all of the cluster nodes and then add the following options.
.option("es.net.ssl.truststore.location", <truststore_location>) \
.option("es.net.ssl.truststore.pass", <truststore_pass>) \
.option("es.net.ssl.cert.allow.self.signed", "true")
Remember that this trustore_location most be accessible from your Spark application, otherwise it will fail.
Index resource options
WRITE_MODE = "append"
# If you want to create the index if it doesn't exists
.option("es.index.auto.index", "true") \
# This is the type of index loading ['update', 'index', 'create'...]
.option("es.write.operation", "index") \
# This is the actual name of the index
.option("es.resource", <elastic_index>) \
# Mapping for the _id field in Elastic
.option("es.mapping.id", "id") \
# Save all the empty values as null
.option("es.field.read.empty.as.null", "true") \
# The write mode can be 'append' or 'overwrite'
.mode(WRITE_MODE) \
In general the configurations for the spark elastic loader are self explanatory, we just need to find the correct options for our specific case.
Hope this can help you!
Top comments (0)