Elasticsearch in Java projects – data enrichment
The previous articles present how to index and query documents using a single index. Let’s imagine more real-life scenario when the relevant information is kept in few indices and the data should be put together. The one approach is to read documents from each index separately and use pure Java to merge the results, however it may be error-prone. The other solution comes directly with Elasticsearch, it is called data enrichment, where the data from source indices may enrich ingested documents. This article presents what is the data enrichment, how it can be configured and used in Java projects in practise
Data Enrichment
The enrichment processor was introduced in Elasticsearch 7.5.0. It can be used to add data from existing indices into incoming documents during the ingestion. In case of the demo application the drivers data is enriched with statistics.
ingest pipeline
A set of processors that can be used to manipulate data upon ingestion without much overhead. Each processor executes a specific task e.g. enrichment.
enrich policy
A set of configuration options used to add the right enrich data to the right incoming documents. It contains:
- A list of one or more source indices which store enrich data as documents
- A match field from the source indices used to match incoming documents
- Enrich fields containing enrich data from the source indices you want to add to incoming documents
Before it can be used with an enrich processor, an enrich policy must be executed. When executed, an enrich policy uses enrich data from the policyâs source indices to create a streamlined system index called the enrich index. The processor uses this index to match and enrich incoming documents.
source index
An index which stores enrich data to be added to incoming documents. There may be used multiple source indices in an enrich policy or the same source index in multiple enrich policies.
enrich index
A special read-only system index tied to a specific enrich policy that always begins with .enrich-*. It is used by the enrich processor to speed up the process and should not be used for other purposes.
Demo project
In the demo application the another set of data is introduced – drivers career statistics. They are stored in separate index db-statistics
. Once the drivers documents are indexed, they should be enriched with the information about the statistics.
Setup of the enrich processor in Java API
Setup of the enrich processor requires few steps to complete that are described below with addition of some code snippets.
1. create an enrich policy
The first step is creating the enrich policy using the EnrichClient
object , which is obtained from High Level Rest Client. It is important to mention that there can be only one policy with given name, so that it is required to check whether any policy with the given name exists. Otherwise, Elasticsearch throws an exception once it detects the try to create the duplicated policy. The policy request requires information about:
- policy name;
- type;
- a list of source indices;
- field under which the data is matched;
- list of enriched fields;
Once the enrich policy has been created, it cannot be updated or change. It needs to be just deleted and re-created if needed.
private final RestHighLevelClient client;
EnrichClient enrichClient = client.enrich();
GetPolicyResponse policy = enrichClient.getPolicy(new GetPolicyRequest(policyName), RequestOptions.DEFAULT);
if (policy.getPolicies().isEmpty()) {
PutPolicyRequest policyRequest = new PutPolicyRequest(policyName, "match",
List.of(sourceIndex),
matchField,
enrichedFields);
enrichClient.putPolicy(policyRequest, RequestOptions.DEFAULT);
}
2. execute the enrich policy
When the enrich policy is created, it needs to be executed so that the enrich index is being created. It is done the same as above using the same EnrichClient
object.
enrichClient.executePolicy(new ExecutePolicyRequest(policyName), RequestOptions.DEFAULT)
3. add enrich processor to the ingest pipeline
Next step is to define the enrich processor together with previously created policy into ingest pipeline. For the enrich processor there is defined matching field and the target field too. In the demo application the enriched information is stored under 'statistics’ property.
As was mentioned before, the ingest pipeline may contain one or more processors. You can find another one remove
on below code snippet.
{
"description": "Enrich drivers statistics information",
"processors" : [
{
"enrich" : {
"policy_name": "enrich-statistics-policy",
"field" : "driverId",
"target_field": "statistics"
}
},
{
"remove": { "field": "statistics.driverId"
}
}
]
}
public void preparePipeline(String pipelineName, String pipelinePath) {
IngestClient ingestClient = client.ingest();
try {
InputStream inputStream = new ClassPathResource(pipelinePath).getInputStream();
byte[] bytes = ByteStreams.toByteArray(inputStream);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(bytes), XContentType.JSON);
ingestClient.putPipeline(putPipelineRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Error during creating pipeline: {}", e.getMessage(), e);
}
}
4. add the ingest pipeline when new index is created
In order to run the pipeline once new documents are stored, it is enough to add the information about the pipeline to the IndexRequest
. As a result when new documents are indexed, they are enriched with required data.
new IndexRequest(indexName).source(source).setPipeline(pipeline)
Summary
The enrichment process is extremely useful feature of the Elasticsearch that allows to add additional properties from existing indices once new documents are indexing. The article explains what is the data enrichment, how it works, and shows how to introduce this into java project, using Java API.