Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

Working with JSON in .NET, Infrastructure as Code with Pulumi

Full source code available here.

This is a follow up to my previous post where I used dynamic and JSON files to make querying ElasticSearch with a HttpClient much easier.

To deploy my ElasticSearch domain on AWS I used Pulumi. ElasticSearch requires a JSON policy to define the permissions. In the post linked above, I have a heavily escaped that This policy can be complex and needs values substituted into it. In the example below I need to pass in the region, account id, domain name and allowed IP address.

Here is a very simple policy with four substitutions –

"{{
""Version"": ""2012-10-17"",
""Statement"": [
    {{
        ""Action"": ""es:*"",
        ""Principal"": {{
            ""AWS"": ""*""
        }},
        ""Effect"": ""Allow"",
        ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
        ""Condition"": {{
            ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
        }}
    }}
]
}}"

Just escaping this is not easy, and very prone to error. A more realistic policy would be significantly longer and would need more substitutions.

Using a JSON file
Here is what I think is an easier way. As in the previous post, the JSON file becomes part of my source code. It is deserialized into a dynamic object and the required values are set.

Here is the AWS policy as it appears in my JSON file. The resource (made up of region, account, and domain name) and IpAddress are left blank, but the structure of the policy is the same as you would paste into the AWS console.

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "AWS": "*"
        },
        "Action": "es:*",
        "Resource": "",
        "Condition": {
          "IpAddress": {
            "aws:SourceIp": ""
          }
        }
      }
    ]
}

In my C# I read the file, deserialize, and set the values with simple C#.

Here is an example –

private string GetAWSElasticSearchPolicy(string region, string account, string elasticSearchDomainName, string allowedIPAddress)
{
    string blankPolicy = File.ReadAllText("AWSPolicy.json");
    dynamic awsElasticSearchPolicy = JsonConvert.DeserializeObject(blankPolicy);

    awsElasticSearchPolicy.Statement[0].Resource = $"arn:aws:es:{region}:{account}:domain/{elasticSearchDomainName}/*";
    awsElasticSearchPolicy.Statement[0].Condition.IpAddress = new JObject(new JProperty("aws:SourceIp", allowedIPAddress));

    return awsElasticSearchPolicy.ToString(); // this is correctly formatted JSON that can be used with Pulumi.
}

Line 3 reads the JSON file into a string.
Line 4 turns the string into a dynamic object.
Lines 6 & 7 set the values I want.
Line 9 returns a nice JSON string that can be used with Pulumi.

This is much cleaner than the heavily escaped version in this post.

Full source code available here.

Getting Started with ElasticSearch, Part 3 – Deploying to AWS with Pulumi

Full source code available here.

This is part 3 of my short introduction to ElasticSearch. In the first part I showed how to create an ElasticSearch index, mapping, and seeded it with data. In the second I used HttpClientFactory and a typed client to query the index. In this part I going to show you how to setup ElasticSearch in AWS using infrastructure as code. Be careful, AWS charges for these things.

A few months ago Pulumi added C# to their list of supported languages. If you haven’t heard of them, they are building a tool that lets you create the IaC in a familiar programming language, at the time of writing they support TypeScript, JavaScript, Python, Go and C#. Writing in a programming language makes it easy to work with things like loops and conditionals, if you are unfamiliar with IaC, those two simple things can be extremely challenging or impossible with other tools.

I’m going to write my IaC in C#.

I’m not going to walk you through installing Pulumi, their site has all the info you need for that.

The IaC Project
Once you have installed Pulimi and tested that the command works, create a new directory called ElasticSearchDeploy.

Change to that directory and run –

pulumi new aws-csharp

Follow the instructions and open the project in VS Code or Visual Studio.

Delete the MyStack.cs file.
Create a file named MyElasticSearchStack.cs.

Paste in the below code –

using Pulumi;
using ElasticSearch = Pulumi.Aws.ElasticSearch;
using Aws = Pulumi.Aws;
using Pulumi.Aws.ElasticSearch.Inputs;

class MyElasticSearchStack : Stack
{
    public MyElasticSearchStack()
    {
        string myIPAddress = "x.x.x.x" you need to put your IP address here;
        string esDomainName = "myelasticesearch";
        var config = new Config();
        var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
        var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
        var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
        {
            DomainName = esDomainName,
            ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
            {
                InstanceType = "t2.small.elasticsearch",
            },
            EbsOptions = new DomainEbsOptionsArgs()
            {
                EbsEnabled = true,
                VolumeSize = 10,
                VolumeType = "gp2"
            },
            ElasticsearchVersion = "7.7",
            AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity).Apply(values =>
            {
                var currentRegion = values.Item1;
                var currentCallerIdentity = values.Item2;
                return @$"
                {{
                    ""Version"": ""2012-10-17"",
                    ""Statement"": [
                        {{
                            ""Action"": ""es:*"",
                            ""Principal"": {{
                                ""AWS"": ""*""
                            }},
                            ""Effect"": ""Allow"",
                            ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                            ""Condition"": {{
                                ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                            }}
                        }}
                    ]
                    }}
                ";
            }),
        });
        this.ESDomainEndpoint =  esDomain.Endpoint;
    }
    [Output]
    public Output<string> ESDomainEndpoint { get; set; }
}

Note on line 10, you need to put in the IP address you are using. Checking this with a site like https://ipstack.com/.

In Program.cs change the reference my MyStack to MyElasticSearchStack.

That’s it.

Deploying
Go to the command line, run –

pulumi up

Select ‘yes’ and then wait about 10 to 15 minutes as AWS gets your ElasticSearch domain up and running. In the output of the command you willsee the url of the ElasticSearch domain you just created, use that in the scripts from part 1 of this series.

You can also go to the AWS console, you should see something like –

There you go – ElasticSearch index creating, seeding, querying, and infrastructure as code.

In a follow up post I’ll show you how to deploy ElasticSearch with Terraform.

The JSON Problem
For those of you that dislike horribly escaped blocks of JSON inside C#, as I do, I am working on a post that will make this much nicer to look at, and to work with.

Full source code available here.