Full source code available here, look inside the lambda folder.
This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.
The Lambda
The code provided is based on an example from Elastic Co.
I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.
'use strict' const AWS = require('aws-sdk'); require('array.prototype.flatmap').shim(); const { Client } = require("@elastic/elasticsearch"); // see here for more https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html const elasticSearchConnector = require('aws-elasticsearch-connector'); const client = new Client({ ...elasticSearchConnector(AWS.config), node: "https://" + process.env.esUrl });
The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –
exports.handler = (event, context, callback) => { let dataToIndex = []; if(event.Records != null){ event.Records.forEach(record => { let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii"); let obj = JSON.parse(rawData); dataToIndex.push(obj); }); if(dataToIndex.length > 0) { indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too } } callback(null, "data indexed"); };
async function indexDataInElasticSearch(dataToIndex, indexName) { console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id); const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]); const { body: bulkResponse } = await client.bulk({ refresh: true, body }); }
That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.
Full source code available here, look inside the lambda folder.