Indexing the Works of Shakespeare in ElasticSearch – Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.

The Lambda
The code provided is based on an example from Elastic Co.

I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.

'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –

exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

DynamoDb, Reading and Writing Data with .Net Core – Part 2

Full source code available here.

A few weeks ago I posted about reading and writing data to DynamoDb. I gave instruction on how to get create tables on localstack and how to use the AWS Document Model approach. I also pointed out that I was not a big fan of this, reading data look like –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    var person = JsonSerializer.Deserialize<Person> ((await people.GetItemAsync(personId)).ToJson());
    return Ok(person);
}

You have to cast to JSON, then deserialize, I think you should be able be able to do something more like – people.GetItemAsync(personId), but you can’t

And writing data looked like –

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    
    var document = new Document();
    document["PersonId"] = person.PersonId;
    document["State"] = person.State;
    document["FirstName"] = person.FirstName;
    document["LastName"] = person.LastName;
    await people.PutItemAsync(document);
    
    return Created("", document.ToJson());
}

For me this feels even worse, having to name the keys in the document, very error prone and hard.

Luckily there is another approach that is a little better. You have to create a class with attributes that indicate what table the class represents and what properties represent the keys in the table.

using Amazon.DynamoDBv2.DataModel;

namespace DynamoDbApiObjectApproach.Models
{
    
    [DynamoDBTable("People")]
    public class PersonDynamoDb
    {

        [DynamoDBHashKey]
        public int PersonId {get;set;}
        public string State {get;set;}
        public string FirstName {get;set;}
        public string LastName {get;set;}
    }
}

Because of these attributes I don’t want to expose this class too broadly, so I create a simple POCO to represent a person.

public class Person
{
    public int PersonId {get;set;}
    public string State {get;set;}
    public string FirstName {get;set;}
    public string LastName {get;set;}
}

I use AutoMapper to map between the two classes, never exposing the PersonDynamoDb to the outside world. If you need help getting started with AutoMapper I wrote a couple of posts recently on this.

Here’s how reading and writing looks now –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    var personDynamoDb = await _dynamoDBContext.LoadAsync<PersonDynamoDb>(personId);
    var person = _mapper.Map<Person>(personDynamoDb);
    return Ok(person);
}

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    var personDynamoDb = _mapper.Map<PersonDynamoDb>(person);
    await _dynamoDBContext.SaveAsync(personDynamoDb);
    return Created("", person.PersonId);
}

This is an improvement, but still not thrilled with the .NET SDK for DynamoDb.

Full source code available here.