DEV Community

Lisa Jung
Lisa Jung

Posted on

Part 6: Set up Elasticsearch for data transformation and data ingestion

Table of Content | Read Next: Part 7: Set up the Node.js server to retrieve API data and send the data to Elasticsearch

Resources

Would you rather watch a video to learn this content? Click on the link below!

Set up Elasticsearch for data transformation and data ingestion

In part 5, we figured out:

  1. a plan for transforming data before ingesting data into Elasticsearch
  2. a desired mapping for our data

In this blog, we will accomplish two tasks:

  1. create an ingest pipeline to transform the retrieved data
  2. create an index called earthquakes with the desired mapping

Image description

Before we get started, let’s talk about the data journey for our app.

We are building an app where users can search for earthquake data stored in Elasticsearch.

In part 7, we will set up our server to retrieve the data from the USGS API and send the data to Elasticsearch ingest pipeline.

ezgif com-gif-maker (10)

Ingest pipeline is used for data transformation.

It consists of a series of configurable tasks called processors. Each processor performs a specialized task. For example, it can remove fields, extract values from text, enrich your data and etc.

ezgif com-gif-maker (11)

Each processor runs in the order you set them up and they make specific changes to the incoming documents.

After the processors have run, Elasticsearch will add the transformed documents to the earthquake index we will create.

ezgif com-gif-maker (12)

If you want to delve deeper into ingest pipelines, check out this documentation!

Set up Elasticsearch for data transformation and data ingestion

Step 1: Review the data transformation requirements

Our ingest pipeline will be used to transform the data retrieved from the USGS API.

Before we create an ingest pipeline, let's review what changes we want to make to the data.

  1. remove the unnecessary info from the retrieved data
  2. change the Unix epoch time in the field time to human readable timestamp
  3. create fields coordinates.lat and coordinates.lon as shown below Image description

Step 2: Create an ingest pipeline

Ingest pipelines can be created and managed via Kibana's Ingest Pipelines feature or the ingest APIs.

We will be using Kibana to create this pipeline.

From the Kibana home page, click on the Stack Management option(red box).

Image description

From the Stack Management page, click on the Ingest Pipelines option(red box).

Image description

Click on the Create pipeline option(red box) and select the New pipeline option from the drop down menu(blue box).

Image description

Name your pipeline to whatever it makes sense to you.

For this project, I named mine earthquake_data_pipeline(red box).

Image description

Step 3: Add the desired processors to the pipeline

Click on the Add a processor option(red box).

Image description

You should see the following pop up menu.

Image description

Task 1: Remove the fields that we do not need from the retrieved data

Here is an example of an earthquake object from the USGS earthquake API:



   {
     "type":"Feature",
     "properties":{
        "mag":1.13,
        "place":"11km ENE of Coachella, CA",
        "time":1650316843970,
        "updated":1650317059011,
        "tz":null, 
        "url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
        "detail":"https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/ci40240408.geojson",                
        "felt":null,
        "cdi":null,
        "mmi":null,
        "alert":null,
        "status":"automatic",
        "tsunami":0,
        "sig":20,
        "net":"ci",
        "code":"40240408",
        "ids":",ci40240408,",
        "sources":",ci,",
        "types":",nearby-cities,origin,phase-data,scitech-link,",
        "nst":37,
        "dmin":0.07687,
        "rms":0.26,
        "gap":48,
        "magType":"ml",
        "type":"earthquake",
        "title":"M 1.1 - 11km ENE of Coachella, CA"
     },
     "geometry":{
        "type":"Point",
        "coordinates":[
          -116.0736667,
          33.7276667,
          2.09
        ]

    },
     "id":"ci40240408"
   }


Enter fullscreen mode Exit fullscreen mode

The following is a sample document with the desired fields we want to store in Elasticsearch.



{
  "mag": 1.13,
  "place": "11km ENE of Coachella, CA",
  "time": 2022-05-02T20:07:53.266Z,
  "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  "sig": 20,
  "type": "earthquake",
  "depth": 2.09,
  "coordinates": {
    "lat": 33.7276667,
    "lon": -116.0736667
  }
}


Enter fullscreen mode Exit fullscreen mode

As you can see, API earthquake object has additional info we do not need.

By comparing the two, we can identify the fields that we want to remove from the retrieved data.

Here is the list of fields that we do not need.

  1. updated
  2. tz
  3. detail
  4. felt
  5. cdi
  6. mmi
  7. alert
  8. status
  9. tsunami
  10. net
  11. code
  12. ids
  13. sources
  14. types
  15. nst
  16. dmin
  17. rms
  18. gap
  19. magType
  20. title

To remove these fields, we can use the Remove processor.

Under the processor section, type Remove in the search bar. Click on the Remove processor(red box).

Image description

Elasticsearch will display the following menu. In the Fields section(red box), we will add the names of the fields that we wish to remove.

Image description

Type in the name of the field you wish to remove then press enter. You will see that the field name has been added.

Image description

In the same box, repeat the same process to specify the names of all the fields we wish to remove.

Your screen should look like the image below(red box).

Image description

Activate the Ignore missing option(red box).

Image description

Click on the Add button(red box).

Image description

You will see that the Remove processor has been added to the earthquake_data_pipeline(red box).

Image description

Task 2: Change the Unix epoch time in the field time to human readable timestamp

Image description

In order to make this change, you should use the date processor.

The date processor converts time from one format to another.

Click on the Add a processor option(red box).

Image description

Under the Processor section, type in Date and click on the Date option from the drop down menu(red box).

Image description

You should see the following drop down menu.

In the Field section(red box), type the name of the field we wish to convert (time).

Image description

In the Formats section(red box), we will specify the desired date formats. The format shown in the results card is called UNIX_MS.

Type it into this section and hit enter. Then, click on the Add button(green box) to add the date processor to the ingest pipeline.

Image description

You will see that the Date processor has been added to the earthquake_data_pipeline(red box).

Image description

When the data goes through the date processor, the content of the field time will be converted to the UNIX_MS format then stored in a new field called @timestamp.

After this process is finished, we do not need the original field time. Therefore, we will remove the field time after the data goes through the date processor.

From the Create pipeline page, click on the Add a processor option(red box).

Image description

Under the Processor section, type in Remove and hit enter(red box).

Image description

Under the Fields section, type in time and hit enter(red box). Activate the ignore missing option(blue box). Then, click on the Add button(green box).

Image description

You will see that the Remove processor for the field time has been added(red box).

Image description

Task 3: Create fields called coordinates.lat and coordinates.lon

From the same page, click on the Add a processor option(red box).

Image description

Under the processor section, type in Rename and hit enter(red box).

Image description

Under the Field section, type in latitude(red box). Under the Target field section, type in coordinates.lat(blue box).

Image description

This step will rename the field latitude in the incoming data to coordinates.lat.

Activate the Ignore missing option(yellow box) then click on the Add button(green box) to add this processor to the earthquake_data_pipeline.

You will see that the Rename processor for the field latitude has been added(red box).

Image description

Next, we will repeat the same process to add a Rename processor to rename the field longitude from the incoming data to coordinates.lon.

This process is identical to the steps you have performed for the field latitude.

Once this process is done, you will see that a Rename processor for the field longitude has been added to the earthquake_data_pipeline(red box).

Image description

We have added all the necessary processors to transform our data.

Before creating the earthquake_data_pipeline, make sure the order of the processors are listed in the order you want them to run.

These processors run sequentially!

Why does the order matter?

Let's say you accidentally reversed the order of date processor for the field time with the Remove processor for the field time.

When the data goes through the ingest pipeline, the data will go through the remove processor before the date processor. As a result, the field time will be removed before it gets to the date processor.

The date processor will have nothing to work with!

This is why we double check the order of the processors before creating the ingest_pipeline!

Next, we will create the earthquake_data_pipeline by clicking on the Create pipeline button(blue box).

Image description

You will see that the earthquake_data_pipeline(red box) has been created. If you scroll down on the Processors section(blue box), you will see the list of processors we have added to this pipeline.

Image description

Step 4: Create an index called earthquakes with the desired mapping

We will accomplish this step using Kibana Dev Tools.

Click on the menu icon(red box).

Image description

Scroll down on the drop down menu and click on Dev Tools(red box).

Image description

You should see Dev Tools(also known as Kibana console) on the screen.

Image description

In the left panel of the Kibana console, copy and paste the following.



PUT earthquakes
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "coordinates": {
        "type": "geo_point"
      },
      "depth": {
        "type": "float"
      },
      "mag": {
        "type": "float"
      },
      "place": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "sig": {
        "type": "short"
      },
      "type": {
        "type": "keyword"
      },
      "url": {
        "enabled": false
      }
    }
  }
}


Enter fullscreen mode Exit fullscreen mode

Your Kibana console should look the following:

Image description

Click on the green arrow to send the request.

Expected output from Elasticsearch:
Elasticsearch will create an index called earthquakes with the desired mapping we defined above!

Image description

Summary

In this blog, we have created:

  1. an ingest pipeline(earthquake_data_pipeline) to transform the retrieved data from the USGS API
  2. an index called earthquakes with the desired mapping

In the next blog, we will set up the server to retrieve earthquake data from the USGS API and send the data to the earthquake_data_pipeline in Elasticsearch.

Image description

Once the data transformation is complete, the transformed data will be ingested into the earthquakes index.

Move on to Part 7 to set up the Node.js server to retrieve API data and send the data to Elasticsearch for data transformation and data ingestion!

Top comments (1)

Collapse
 
mnuman profile image
Milco Numan

If you don't fancy typing the entire pipeline or mapping, you can paste mine - I considered creating a PR for the repositories, but I noticed the author has moved from Elastic so I decided this to be the best alternative.

Pipeline I have used:

[
{
"remove": {
"field": [
"updated",
"tz",
"detail",
"felt",
"cdi",
"mmi",
"alert",
"status",
"tsunami",
"net",
"code",
"ids",
"sources",
"types",
"nst",
"dmin",
"rms",
"gap",
"magType",
"title"
],
"ignore_missing": true
}
},
{
"date": {
"field": "time",
"formats": [
"UNIX_MS"
]
}
},
{
"remove": {
"field": "time",
"ignore_missing": true
}
},
{
"rename": {
"field": "latitude",
"target_field": "coordinates.lat",
"ignore_missing": true
}
},
{
"rename": {
"field": "longitude",
"target_field": "coordinates.lon",
"ignore_missing": true
}
}
]

This is the mapping I used:
PUT earthquakes
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"coordinates": {
"type": "geo_point"
},
"depth": {
"type": "float"
},
"mag": {
"type": "float"
},
"place": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"sig": {
"type": "short"
},
"type": {
"type": "keyword"
},
"url": {
"enabled": false
}
}
}
}