Indexing the Works of Shakespeare in Elasticsearch - Part 2, Bulk Indexing
Want to learn more about AWS Lambda and .NET? Check out my A Cloud Guru course on ASP.NET Web API and Lambda.
Full source code available here, look inside the lambda folder.
- Part 1 - Infrastructure as Code
- Part 2 - Bulk Indexing
- Part 3 - Sending the Lines to Kinesis
- Part 4 - Searching via Web API in .Net 5
This is part two of my series on indexing the works of Shakespeare in Elasticsearch. In part one I set up the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post, I didn’t explain how the lambda works. Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small Elasticsearch instances if you are indexing a high volume of documents one at a time.
The Lambda
The code provided is based on an example from Elastic Co.
I add the AWS SDK and an Elasticsearch connector. The connector provides makes it possible to use IAM authentication when calling Elasticsearch.
1'use strict'
2const AWS = require('aws-sdk');
3
4require('array.prototype.flatmap').shim();
5
6const { Client } = require("@elastic/elasticsearch"); // see here for more https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
7const elasticSearchConnector = require('aws-elasticsearch-connector');
8
9const client = new Client({
10 ...elasticSearchConnector(AWS.config),
11 node: "https://" + process.env.esUrl
12});
The handler is simple, it reads the incoming records from Kinesis adds them to an array, and calls the bulk indexing method -
1exports.handler = (event, context, callback) => {
2 let dataToIndex = [];
3
4 if(event.Records != null){
5 event.Records.forEach(record => {
6 let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
7 let obj = JSON.parse(rawData);
8 dataToIndex.push(obj);
9 });
10
11 if(dataToIndex.length > 0) {
12 indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
13 }
14 }
15 callback(null, "data indexed");
16};
1async function indexDataInElasticSearch(dataToIndex, indexName) {
2 console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
3 const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
4 const { body: bulkResponse } = await client.bulk({ refresh: true, body });
5}
That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.
Full source code available here, look inside the lambda folder.