Indexing the Works of Shakespeare in Elasticsearch - Part 1, Infrastructure as Code

Full source code available here.

WARNING - be careful when using this, Kinesis costs money and is not on the AWS free tier. As I write this post, a couple of Elasticsearch instance types are included with the free tier, but you can only have one instance running at a time. I made a mistake and spun up two Elasticsearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a four-parter where I’m going to show how to index the complete works of Shakespeare in Elasticsearch. This first part will set up the infrastructure on AWS. The second will go through the lambda that bulk loads data into Elasticsearch. The third will show how to, in Node.js, create the index on the Elasticsearch domain, read the works of Shakespeare from CSV and send to Kinesis, and the fourth will put a .NET 5 Web Api in front of the Elasticsearch cluster.


A few weeks ago I wrote a post describing how to get Elasticsearch up and running on AWS using Pulumi. It was a simple approach with the expectation that the user would send documents directly to Elasticsearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small Elasticsearch instance and get socket timeouts or run simply run out of sockets.

One way to reduce the likelihood of this occurring is to make bulk requests to index documents in Elasticsearch - instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is a reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to Elasticsearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed

Quite a lot is needed to get this up and running.

On the IaC side -

    An AWS Role An AWS policy attachment A Kinesis stream An Elasticsearch instance An AWS Lambda The code the Lambda will execute A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts -

    An Elasticsearch mapping document A tool to send data to Kinesis for indexing

I also want to limit access to Elasticsearch service to my IP address, this is easy to figure out with a call to an API like

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead, I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with a random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help with adding a prefix to resource names like “test-01-”, or whatever you want.

The Stack

The IaC code starts with a query to a third-party API to get the IP address my computer is using.

1public MyStack()
3    HttpClient httpClient = new HttpClient()
4    {
5        BaseAddress = new System.Uri("")
6    };
7    string myIPAddress = httpClient.GetStringAsync("?format=text").GetAwaiter().GetResult();

I then zip up the Lambda source, this is not what you would normally do when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

2using (ZipFile zip = new ZipFile())
4    zip.AddDirectory("lambda");
5    zip.Save("");

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is - this creates the role, policy attachment, Kinesis Stream, Elasticsearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the Elasticsearch domain.

  1var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
  3    Name = PrefixName("elasticsearch_indexing_role"),
  4    AssumeRolePolicy = @"{
  5                            ""Version"": ""2012-10-17"",
  6                            ""Statement"": [
  7                                {
  8                                ""Action"": ""sts:AssumeRole"",
  9                                ""Principal"": {
 10                                    ""Service"": """"
 11                                },
 12                                ""Effect"": ""Allow"",
 13                                ""Sid"": """"
 14                                }
 15                            ]
 16                        }",
 19var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
 21    Name = PrefixName("lambdaKinesisPolicyAttachment"),
 22    Roles =
 23    {
 24        elasticsearch_indexing_role.Name
 25    },
 26    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
 29var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
 31    Name = PrefixName("elasticsearch_kinesis"),
 32    RetentionPeriod = 24,
 33    ShardCount = 1,
 34    ShardLevelMetrics =
 35    {
 36        "IncomingBytes",
 37        "OutgoingBytes",
 38    },
 41string esDomainName = PrefixName("elasticsearch");
 42var config = new Config();
 43var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
 44var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
 45var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
 47    DomainName = esDomainName,
 48    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
 49    {
 50        InstanceType = "t2.small.elasticsearch",
 51    },
 52    EbsOptions = new DomainEbsOptionsArgs()
 53    {
 54        EbsEnabled = true,
 55        VolumeSize = 10,
 56        VolumeType = "gp2"
 57    },
 58    ElasticsearchVersion = "7.8",
 59    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
 60    {
 61        var currentRegion = values.Item1;
 62        var currentCallerIdentity = values.Item2;
 63        return $@"
 64        {{
 65            ""Version"": ""2012-10-17"",
 66            ""Statement"": [
 67                {{
 68                    ""Effect"": ""Allow"",
 69                    ""Principal"": {{
 70                        ""AWS"": ""{values.Item3}""
 71                    }},
 72                    ""Action"": ""es:*"",
 73                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
 74                }},
 75                {{
 76                    ""Action"": ""es:*"",
 77                    ""Principal"": {{
 78                        ""AWS"": ""*""
 79                    }},
 80                    ""Effect"": ""Allow"",
 81                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
 82                    ""Condition"": {{
 83                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
 84                    }}
 85                }}
 86            ]
 87        }}
 88        ";
 89    }),
 91this.ESDomainEndpoint = esDomain.Endpoint;
 94var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
 95lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);
 97var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
 99    Handler = "index.handler",
100    MemorySize = 128,
101    Name = PrefixName("elasticsearch_indexing_function"),
102    Publish = false,
103    ReservedConcurrentExecutions = -1,
104    Role = elasticsearch_indexing_role.Arn,
105    Runtime = "nodejs12.x",
106    Timeout = 4,
107    Code = new FileArchive(""),
108    Environment = lambdaEnvironmentVariables
111var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
113    EventSourceArn = elasticsearch_kinesis.Arn,
114    FunctionName = elasticsearch_indexing_function.Arn,
115    StartingPosition = "LATEST",

Finally, I want to print out the URL of the Elasticsearch domain.

2public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for Elasticsearch to startup, sometimes as much as 20 minutes.

To deploy run -

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

comments powered by Disqus