DEV Community

Cover image for Using pipelines to route documents in Elasticsearch indices
Benoit Luttringer
Benoit Luttringer

Posted on • Originally published at medium.zenika.com

Using pipelines to route documents in Elasticsearch indices

Routing documents

When applications need to add documents to Elasticsearch, they have first to know what is the destination index.

This could seem obvious when you are in a trivial case where a certain type of document always go to a specific index but it can be a little trickier when your indices’s names can vary according to miscellaneous parameters (whether they are external to your system —current date for instance — or intrinsic to the document you try to store — most of the time the value of one of your document’s field).
When this last case occurs (we mean the case where the index name can vary), it’s up to your application to compute the name of the destination index before issuing the indexation command to Elasticsearch.

Moreover — even if that can look like an anti-pattern at first — you can have several applications that need to index the same type of documents in indices whose names are subject to change. Now you have to maintain the logic of the index name computation that is duplicated across several components : that’s not good news in terms of maintainability and agility.

Logstash — a well known member of the Elastic Stack — can help a lot to centralize such a logic but at the cost of maintaining another piece of running software, which needs configuration, knowledge, etc.

What we want to present in this article is a solution to address this problem by delegating the index name computation to Elasticsearch instead of our applications.

Use Case : Time Based Indices

This is a very known use case, usually found when you want to deal with logs.
The idea is to index documents in indices whose names are composed of a root name and a value computed from the date of the log event. This date is actually a field of the document you want to index.

That is not the point of this article but there several advantages in indexing documents this way, including the easier data management, enabling a cold / warm architecture, etc.

Let’s take an example.

Let’s say we have to deal with data coming from several sources — IoT for instance. Each of our objects send some data every minute to different back-ends (yes, that’s really sad but our objects do not talk over the same networks, so the choice was made to deal with this through several systems).
The data sent by the objects are transformed into json documents that look like this :

POST data/_doc?pipeline=compute-index-name
{
  "objectId": 1234,
  "manufacturer": "SHIELD",
  "payload": "some_data",
  "date": "2019-04-01T12:10:30.000Z"
}

We have an UID for the object transmitting data, a manufacturer id, a payload part and a date field.

Index Name Computation

Let’s say we want to store the object’s data in an index called data-{YYYYMMDD} where the root name is data followed by a date pattern.

Based on the latter example, what should the back-end do when it receives this document ?

First it has to parse it to extract the value of the date field, then it has to compute the destination index name from the date it has found in the document. In the end, it issues the indexing request to Elasticsearch to the index whose name has just been computed.

document.date = "2019-04-01T12:10:30Z"
index.name = "data-" + "20190401"

In our case, we have several back-ends that have to know how to compute the index name and thus have to be aware of the naming logic of our index.

Wouldn’t it be smarter if the index name computation was made directly by Elasticsearch ?

Pipeline power

Starting with the version 5 of Elasticsearch, we now have a type of node that is called ingest.

All nodes of a cluster have the ingest type by default.

Those nodes have the power to execute what is called pipelines before indexing a document. A pipeline is a group of processors that can each transform the input document in some specific way.
What is useful here is that not only can a pipeline transform the intrinsic data of a document, but it can also modify the document metadata, specifically its _index property.

Now let’s go back to our example.

Instead of delegating the index name computation to the application, we propose to define a pipeline that will do the job.

Lucky us, there is a built-in processor that can do that : the Date Index Name Processor !
According to the documentation, this processor allows you to define the name of the field containing the date, a root name (a prefix) for your index and a rounding method to compute the date to append to this prefix.

If we want to add our IoT data to an index whose pattern is data-{YYYYMMDD} we just have to create a pipeline like the following :

PUT _ingest/pipeline/compute-index-name
{
  "description": "Set the document destination index by appending a prefix and its 'date' field",
  "processors": [
    {
      "date_index_name": {
        "field": "date",
        "index_name_prefix": "data-",
        "date_rounding": "d",
        "index_name_format": "yyyyMMdd"
      }
    }
  ]
}

One index = one pipeline ?

Ok, now we know how we can define a pipeline to build an name for a specific destination index. But we can do more by manipulating the document meta data !

Say we have different types of documents, each having a date field but needing to be indexed in different indices.

The logic for computing the destination index name will be the same for each document type but using the above strategy will lead to create several pipelines.
Let’s try to make something easier and reusable.

Back to our example, we now have two document types : one that needs to be indexed in a data-{YYYYMMDD} index (as before) and one whose destination is an index named new_data-{YYYYMMDD}.

The documents whose destination is new_data have the following structure :

{
  "newObjectId": 1234,
  "source": "HYDRA",
  "payload": "some_data",
  "date": "2019-04-02T13:10:30.000Z"
}

The structure is slightly different from the standard IoT document, but what is important here is that the date field is present in both mappings.

Now we want to define a pipeline that will compute the destination index for both our documents types. All we have to do is to build the destination index name by analyzing the requested destination, as issued through the index API.

PUT _ingest/pipeline/compute-index-name
{
  "description": "Set the document destination index by appending the requested index and its 'date' field",
  "processors": [
    {
      "date_index_name": {
        "field": "date",
        "index_name_prefix": "{{ _index }}-",
        "date_rounding": "d",
        "index_name_format": "yyyyMMdd"
      }
    }
  ]
}

Note that the index name prefix is now found in the indexation meta data field named _index. By using this field, our pipeline is now generic and can be used with any index — given the destination index is computed according to the same rules.

Using our “routing” pipeline

Now that we have a generic pipeline that is able to compute the name of a destination index based on the date field of our documents, let’s see how we will tell Elasticsearch to use it.

There are two ways we can tell Elasticsearch to use a pipeline, let’s evaluate them.

Index API call

The first — and straightforward solution — is to use the pipeline parameter of the Index API.
In other words : each time you want to index a document, you have to tell Elasticsearch the pipeline to use.

POST data/_doc?pipeline=compute-index-name
{
  "objectId": 1234,
  "manufacturer": "SHIELD",
  "payload": "some_data",
  "date": "2019-04-01T12:10:30.000Z"
}

Now every time we will add a document in an index by indicating the compute-index-name pipeline, the document will be added to the right time-based index. In this example the destination index will be data-20190401.

What about the data index that we gave to the Index API ? It can be seen as a fake index : it’s just used to perform the API call and to be the root of the real destination index, it doesn’t have to exist !

Default pipeline : introducing the “Virtual Index”

The index default pipeline is another useful way to use a pipeline : when you create an index, there is a settings called index.default_pipeline that can be set to the name of a pipeline which will be executed whenever you add a document to the corresponding index and no pipeline is added to the API call. You can also bypass this default index by using the special pipeline named _none when indexing your document. By using this feature, you can define what I’ll call a “Virtual Index” and associate it a default pipeline that will act as the routing pipeline we have seen above.

Let’s apply this to our example.

We suppose that our generic routing pipeline compute-index-name has been created. We can now create an index called data that will use this pipeline as its default one.

PUT data
{
  "settings" : {
    "index" : {
      "number_of_shards" : 3, 
      "number_of_replicas" : 1,
      "default_pipeline" : "compute-index-name"
    }
  }
}

Now every time we will ask Elasticsearch to index a document in the data index, the compute-index-name pipeline will take care of the real routing of this document. As a consequence, there will never be a single document indexed in the data index but we fully delegate the responsibility to call the pipeline to Elasticsearch.

Conclusion

We have just seen here how we can leverage the pipeline power in Elasticsearch to route documents based on their intrinsic properties.

Pipelines have much more to give than just being a replacement for Logstash filters : you can define complex pipelines, using several processors (one specific processor even allows you to call another pipeline), conditionals, etc.

In my opinion, the “Virtual Index” seen at the end of this article can be very interesting. The feature consisting of creating such an index-that-is-not-really-an-index only to create an entry point to a routing pipeline could even be a new and useful feature of Elasticsearch, why not ?

Note : Each API call has been tested using an instance of Elasticsearch 7.2

Top comments (0)