Table of Content | Read Next: Part 7: Set up the Node.js server to retrieve API data and send the data to Elasticsearch
Resources
Would you rather watch a video to learn this content? Click on the link below!
Set up Elasticsearch for data transformation and data ingestion
In part 5, we figured out:
- a plan for transforming data before ingesting data into Elasticsearch
- a desired mapping for our data
In this blog, we will accomplish two tasks:
- create an
ingest pipeline
to transform the retrieved data - create an index called
earthquakes
with the desired mapping
Before we get started, let’s talk about the data journey for our app.
We are building an app where users can search for earthquake data stored in Elasticsearch.
In part 7, we will set up our server to retrieve the data from the USGS API and send the data to Elasticsearch ingest pipeline
.
Ingest pipeline
is used for data transformation.
It consists of a series of configurable tasks called processors. Each processor performs a specialized task. For example, it can remove fields, extract values from text, enrich your data and etc.
Each processor runs in the order you set them up and they make specific changes to the incoming documents.
After the processors have run, Elasticsearch will add the transformed documents to the earthquake index we will create.
If you want to delve deeper into ingest pipelines
, check out this documentation!
Set up Elasticsearch for data transformation and data ingestion
Step 1: Review the data transformation requirements
Our ingest pipeline
will be used to transform the data retrieved from the USGS API.
Before we create an ingest pipeline
, let's review what changes we want to make to the data.
- remove the unnecessary info from the retrieved data
- change the Unix epoch time in the field
time
to human readable timestamp - create fields
coordinates.lat
andcoordinates.lon
as shown below
Step 2: Create an ingest pipeline
Ingest pipelines can be created and managed via Kibana's Ingest Pipelines feature
or the ingest APIs
.
We will be using Kibana to create this pipeline.
From the Kibana home page, click on the Stack Management
option(red box).
From the Stack Management page
, click on the Ingest Pipelines
option(red box).
Click on the Create pipeline
option(red box) and select the New pipeline
option from the drop down menu(blue box).
Name your pipeline to whatever it makes sense to you.
For this project, I named mine earthquake_data_pipeline
(red box).
Step 3: Add the desired processors to the pipeline
Click on the Add a processor
option(red box).
You should see the following pop up menu.
Task 1: Remove the fields that we do not need from the retrieved data
Here is an example of an earthquake object from the USGS earthquake API:
{
"type":"Feature",
"properties":{
"mag":1.13,
"place":"11km ENE of Coachella, CA",
"time":1650316843970,
"updated":1650317059011,
"tz":null,
"url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"detail":"https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/ci40240408.geojson",
"felt":null,
"cdi":null,
"mmi":null,
"alert":null,
"status":"automatic",
"tsunami":0,
"sig":20,
"net":"ci",
"code":"40240408",
"ids":",ci40240408,",
"sources":",ci,",
"types":",nearby-cities,origin,phase-data,scitech-link,",
"nst":37,
"dmin":0.07687,
"rms":0.26,
"gap":48,
"magType":"ml",
"type":"earthquake",
"title":"M 1.1 - 11km ENE of Coachella, CA"
},
"geometry":{
"type":"Point",
"coordinates":[
-116.0736667,
33.7276667,
2.09
]
},
"id":"ci40240408"
}
The following is a sample document with the desired fields we want to store in Elasticsearch.
{
"mag": 1.13,
"place": "11km ENE of Coachella, CA",
"time": 2022-05-02T20:07:53.266Z,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"sig": 20,
"type": "earthquake",
"depth": 2.09,
"coordinates": {
"lat": 33.7276667,
"lon": -116.0736667
}
}
As you can see, API earthquake object has additional info we do not need.
By comparing the two, we can identify the fields that we want to remove from the retrieved data.
Here is the list of fields that we do not need.
updated
tz
detail
felt
cdi
mmi
alert
status
tsunami
net
code
ids
sources
types
nst
dmin
rms
gap
magType
title
To remove these fields, we can use the Remove
processor.
Under the processor section, type Remove
in the search bar. Click on the Remove
processor(red box).
Elasticsearch will display the following menu. In the Fields
section(red box), we will add the names of the fields that we wish to remove.
Type in the name of the field you wish to remove then press enter. You will see that the field name has been added.
In the same box, repeat the same process to specify the names of all the fields we wish to remove.
Your screen should look like the image below(red box).
Activate the Ignore missing
option(red box).
Click on the Add
button(red box).
You will see that the Remove
processor has been added to the earthquake_data_pipeline
(red box).
Task 2: Change the Unix epoch time in the field time
to human readable timestamp
In order to make this change, you should use the date
processor.
The date
processor converts time from one format to another.
Click on the Add a processor
option(red box).
Under the Processor
section, type in Date
and click on the Date
option from the drop down menu(red box).
You should see the following drop down menu.
In the Field
section(red box), type the name of the field we wish to convert (time
).
In the Formats
section(red box), we will specify the desired date formats. The format shown in the results card is called UNIX_MS
.
Type it into this section and hit enter. Then, click on the Add
button(green box) to add the date
processor to the ingest pipeline
.
You will see that the Date
processor has been added to the earthquake_data_pipeline
(red box).
When the data goes through the date
processor, the content of the field time
will be converted to the UNIX_MS
format then stored in a new field called @timestamp
.
After this process is finished, we do not need the original field time
. Therefore, we will remove the field time
after the data goes through the date
processor.
From the Create pipeline
page, click on the Add a processor
option(red box).
Under the Processor
section, type in Remove
and hit enter(red box).
Under the Fields
section, type in time
and hit enter(red box). Activate the ignore missing
option(blue box). Then, click on the Add
button(green box).
You will see that the Remove
processor for the field time
has been added(red box).
Task 3: Create fields called coordinates.lat
and coordinates.lon
From the same page, click on the Add a processor
option(red box).
Under the processor
section, type in Rename
and hit enter(red box).
Under the Field
section, type in latitude
(red box). Under the Target field
section, type in coordinates.lat
(blue box).
This step will rename the field latitude
in the incoming data to coordinates.lat
.
Activate the Ignore missing
option(yellow box) then click on the Add
button(green box) to add this processor to the earthquake_data_pipeline
.
You will see that the Rename
processor for the field latitude
has been added(red box).
Next, we will repeat the same process to add a Rename
processor to rename the field longitude
from the incoming data to coordinates.lon
.
This process is identical to the steps you have performed for the field latitude
.
Once this process is done, you will see that a Rename
processor for the field longitude
has been added to the earthquake_data_pipeline
(red box).
We have added all the necessary processors to transform our data.
Before creating the earthquake_data_pipeline
, make sure the order of the processors are listed in the order you want them to run.
These processors run sequentially!
Why does the order matter?
Let's say you accidentally reversed the order of date
processor for the field time
with the Remove
processor for the field time
.
When the data goes through the ingest pipeline
, the data will go through the remove
processor before the date
processor. As a result, the field time
will be removed before it gets to the date
processor.
The date
processor will have nothing to work with!
This is why we double check the order of the processors before creating the ingest_pipeline
!
Next, we will create the earthquake_data_pipeline
by clicking on the Create pipeline
button(blue box).
You will see that the earthquake_data_pipeline
(red box) has been created. If you scroll down on the Processors
section(blue box), you will see the list of processors we have added to this pipeline.
Step 4: Create an index called earthquakes
with the desired mapping
We will accomplish this step using Kibana Dev Tools
.
Click on the menu
icon(red box).
Scroll down on the drop down menu and click on Dev Tools
(red box).
You should see Dev Tools
(also known as Kibana console) on the screen.
In the left panel of the Kibana console, copy and paste the following.
PUT earthquakes
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"coordinates": {
"type": "geo_point"
},
"depth": {
"type": "float"
},
"mag": {
"type": "float"
},
"place": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"sig": {
"type": "short"
},
"type": {
"type": "keyword"
},
"url": {
"enabled": false
}
}
}
}
Your Kibana console should look the following:
Click on the green arrow to send the request.
Expected output from Elasticsearch:
Elasticsearch will create an index called earthquakes
with the desired mapping we defined above!
Summary
In this blog, we have created:
- an
ingest pipeline
(earthquake_data_pipeline) to transform the retrieved data from the USGS API - an index called
earthquakes
with the desired mapping
In the next blog, we will set up the server to retrieve earthquake data from the USGS API and send the data to the earthquake_data_pipeline
in Elasticsearch.
Once the data transformation is complete, the transformed data will be ingested into the earthquakes
index.
Move on to Part 7 to set up the Node.js server to retrieve API data and send the data to Elasticsearch for data transformation and data ingestion!
Top comments (1)
If you don't fancy typing the entire pipeline or mapping, you can paste mine - I considered creating a PR for the repositories, but I noticed the author has moved from Elastic so I decided this to be the best alternative.
Pipeline I have used:
[
{
"remove": {
"field": [
"updated",
"tz",
"detail",
"felt",
"cdi",
"mmi",
"alert",
"status",
"tsunami",
"net",
"code",
"ids",
"sources",
"types",
"nst",
"dmin",
"rms",
"gap",
"magType",
"title"
],
"ignore_missing": true
}
},
{
"date": {
"field": "time",
"formats": [
"UNIX_MS"
]
}
},
{
"remove": {
"field": "time",
"ignore_missing": true
}
},
{
"rename": {
"field": "latitude",
"target_field": "coordinates.lat",
"ignore_missing": true
}
},
{
"rename": {
"field": "longitude",
"target_field": "coordinates.lon",
"ignore_missing": true
}
}
]
This is the mapping I used:
PUT earthquakes
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"coordinates": {
"type": "geo_point"
},
"depth": {
"type": "float"
},
"mag": {
"type": "float"
},
"place": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"sig": {
"type": "short"
},
"type": {
"type": "keyword"
},
"url": {
"enabled": false
}
}
}
}