Indexing the Works of Shakespeare in ElasticSearch – Part 3, Sending the Lines to Kinesis

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to ElasticSearch for indexing.

You need to configure the script to point to your ElasticSearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the ElasticSearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row, converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to ElasticSearch, but there are a few drawbacks –

  1. If I have a small ElasticSearch servers (as is the case if you are following along from part 1 where I used Pulumi to setup the infrastructure), sending tens of thousands of index requests directly to ElasticSearch could overwhelm it, I have manged to do this a few times. To alleviate this, I could send in bulk, but I wanted to something with one more added piece of resilience – retries.
  2. If ElasticSearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to ElasticSearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an ElasticSearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules –

npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const elasticSearchClient = new Client({
    node: "https://your elastic search server"
});

const csv=require('csvtojson');
const awsRegion = 'us-east-1';

let kinesis = new AWS.Kinesis({region: awsRegion});

function readCSVAndSendToKinesis(streamName){
    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_small.csv')
    .then((json) => {
        json.forEach((row) =>
        {   
            sendEntryToKinesisStream(row, streamName);
        });
    })
}

//read the file, foreach over and pass to sendEntryToKinesisStream
function sendEntryToKinesisStream(entry, streamName){
    var kinesisRecord = {
        Data: JSON.stringify(entry),
        PartitionKey: entry.Id.toString(),
        StreamName: streamName
    };
    kinesis.putRecord(kinesisRecord, function (err, data){
        if(err){
            console.log('ERROR ' + err);
        }
        else {
            console.log(entry.Id + ' added to Kinesis stream');
        }
    });
}

async function createIndexIfNotExists(indexName) {
    let result = await elasticSearchClient.indices.exists({
        index: indexName
    });
    if (result.body === true) {
        console.log(indexName + ' already exists');
    } else {
        console.log(indexName + ' will be created');
        await createIndex(indexName);
    }
}

async function createIndex(indexName) {
    let result = await elasticSearchClient.indices.create({
        index: indexName,
        body: {
            "mappings": {
                "properties": {
                    "Id": {
                        "type": "integer"
                    },
                    "play": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword" 
                            }
                        }
                    },
                    "characterLineNumber": {
                        "type": "integer"
                    },
                    "actSceneLine": {
                        "type": "text"
                    },
                    "character": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword"
                            }
                        }
                    },
                    "line": {
                        "type": "text"
                    },
                }
            }
        }
    });
    console.log(result.statusCode);
}

async function seed(indexName, streamName) {
    await createIndexIfNotExists(indexName);
    readCSVAndSendToKinesis(streamName);
}

seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in ElasticSearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and ElasticSearch client to for .NET to perform the search.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.

The Lambda
The code provided is based on an example from Elastic Co.

I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.

'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –

exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.