Indexing the Works of Shakespeare in Elasticsearch - Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in Elasticsearch. In part one I set up the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post, I didn’t explain how the lambda works. Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small Elasticsearch instances if you are indexing a high volume of documents one at a time.

The Lambda

The code provided is based on an example from Elastic Co.

I add the AWS SDK and an Elasticsearch connector. The connector provides makes it possible to use IAM authentication when calling Elasticsearch.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array, and calls the bulk indexing method -

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
1
2
3
4
5
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

comments powered by Disqus

Related