C# and AWS Lambdas, Part 3 – Pulumi IaC for Web API and an API Gateway

Full source code available here.

In this the third in a series of posts on using .NET in AWS lambdas I build on the previous where I connected a Http Gateway to a lambda running a Web API application. In that post I built the infrastructure by hand, i.e. via the GUI, pointing and clicking.

In this post I will show how to build everything with Pulumi. I’ve written a few posts on Pulimi before, you can find them here.

As with all IaC, the hard part isn’t the IaC itself, it’s knowing what to do on the platform you are deploying to.
In the previous post I worked out all the required steps and components and got everything working. But the UI takes care of some things that you have to explicitly do in IaC and this is where the challenge is as you get familiar with the platform you are working on.

What’s needed
When you add up all the components needed, it feels like a lot, but eighty lines of C# code sets it all up.

IAM role for the lambda.
Policy attachment for the above role letting it execute the lambda.
The lambda function. This where the Web API code runs.
A Http Gateway.
An integration between the Http Gateway and the lambda.
A route on the Http Gateway that forwards to the integration.
A stage on the Http Gateway.
Permissions for the Http Gateway to execute the lambda.

That last one was not obvious, thank you to Piers Karsenbarg at Pulumi for his assistance.

The code
You need to have Pulumi installed, check their site for instructions.

Here is the stack to setup all the components listed above.

using Pulumi;
using Aws = Pulumi.Aws;

class MyStack : Stack
{
    public MyStack()
    {

        var lambdaRole = new Aws.Iam.Role("PulumiWebApiGateway_LambdaRole", new Aws.Iam.RoleArgs
        {
            AssumeRolePolicy = 
@"{
    ""Version"": ""2012-10-17"",
    ""Statement"": [
        {
        ""Action"": ""sts:AssumeRole"",
        ""Principal"": {
            ""Service"": ""lambda.amazonaws.com""
        },
        ""Effect"": ""Allow"",
        ""Sid"": """"
        }
    ]
}",
        });

        var lambdaPolicyAttachment = new Aws.Iam.PolicyAttachment("PulumiWebApiGateway_LambdaPolicyAttachment", new Aws.Iam.PolicyAttachmentArgs
        {
            Roles =
            {
                lambdaRole.Name
            },
            PolicyArn = Aws.Iam.ManagedPolicy.AWSLambdaBasicExecutionRole.ToString(), 
        });

        var lambdaFunction = new Aws.Lambda.Function("PulumiWebApiGateway_LambdaFunction", new Aws.Lambda.FunctionArgs
        {
            Handler = "WebAPILambda::WebAPILambda.LambdaEntryPoint::FunctionHandlerAsync",
            MemorySize = 128,
            Publish = false,
            ReservedConcurrentExecutions = -1,
            Role = lambdaRole.Arn,
            Runtime = Aws.Lambda.Runtime.DotnetCore3d1,
            Timeout = 4,
            Code = new FileArchive("WebAPILambda.zip"), // I put the zip file in the same dir as this code for this demo, but you should not do this.
        });
        System.Console.WriteLine(Aws.Iam.ManagedPolicy.AWSLambdaBasicExecutionRole.ToString());

        var httpApiGateway = new Pulumi.Aws.ApiGatewayV2.Api("PulumiWebApiGateway_ApiGateway", new Pulumi.Aws.ApiGatewayV2.ApiArgs
        {
            ProtocolType = "HTTP",
            RouteSelectionExpression = "${request.method} ${request.path}",
        });

        var httpApiGateway_LambdaIntegration = new Pulumi.Aws.ApiGatewayV2.Integration("PulumiWebApiGateway_ApiGatewayIntegration", new Pulumi.Aws.ApiGatewayV2.IntegrationArgs
        {
            ApiId = httpApiGateway.Id,
            IntegrationType = "AWS_PROXY",
            IntegrationMethod = "POST",
            IntegrationUri = lambdaFunction.Arn,
            PayloadFormatVersion = "2.0",
            TimeoutMilliseconds = 30000,
        });

        var httpApiGatewayRoute = new Pulumi.Aws.ApiGatewayV2.Route("PulumiWebApiGateway_ApiGatewayRoute", new Pulumi.Aws.ApiGatewayV2.RouteArgs
        {
            ApiId = httpApiGateway.Id,
            RouteKey = "$default",
            Target = httpApiGateway_LambdaIntegration.Id.Apply(id => $"integrations/{id}"),
        });

        var httpApiGatewayStage = new Pulumi.Aws.ApiGatewayV2.Stage("PulumiWebApiGateway_ApiGatewayStage", new Pulumi.Aws.ApiGatewayV2.StageArgs
        {
            ApiId = httpApiGateway.Id,
            AutoDeploy = true,
            Name = "$default",
        });

        var lambdaPermissionsForApiGateway = new Aws.Lambda.Permission("PulumiWebApiGateway_LambdaPermission", new Aws.Lambda.PermissionArgs
        {
            Action = "lambda:InvokeFunction",
            Function = lambdaFunction.Name,
            Principal = "apigateway.amazonaws.com",
            SourceArn = Output.Format($"{httpApiGateway.ExecutionArn}/*") // note it's the ExecutionArn.
            // SourceArn = httpApiGateway.ExecutionArn.Apply(arn => $"{arn}/*") // this is another way of doing the same thing
        });

        this.ApiEndpoint = httpApiGateway.ApiEndpoint.Apply(endpoint =>  $"{endpoint}/api/values");
    }

    [Output]
    public Output<string> ApiEndpoint { get; set; }
}

Run –

pulumi up

This is will show you what is going to be deployed, if it looks ok select yes.

After a short wait you should see something like this the below image indicating that everything has been setup

Now you can click on the APIEndpoint url and it should execute your lambda and return the hello world message.

Full source code available here.

C# and AWS Lambdas, Part 2 – Web API and an API Gateway

Full source code available here.

In the previous post I created a lambda that executed a C# console application. In this post I walkthrough creating a .NET 3.1 Web API application inside a lambda and making it reachable from the web, just like a normal Web API application.

First, update the lambda templates, they have fixed a bug in the csproj file of the template, that I described in the in first post, and added some new templates.

dotnet new -i Amazon.Lambda.Templates::5.1.0

Another useful thing to install are the Amazon.Lambda.Tools package, but I don’t use them in this post.

dotnet tool install -g Amazon.Lambda.Tools 

Or update it if you have it installed already.

dotnet tool update -g Amazon.Lambda.Tools

Create a new application.

dotnet new serverless.AspNetCoreWebAPI --name WebAPILambda

In LambdaEntryPoint.cs change the class it inherits from to Amazon.Lambda.AspNetCoreServer.APIGatewayHttpApiV2ProxyFunction.

For the sake of a demo I’m going to change very little, and show a GET and a POST call.

Add a Person class.

namespace WebAPILambda.Models
{
    public class Person
    {
        public string FirstName { get; set; }
        public string LastName { get; set; }

        public override string ToString()
        {
            return $"{FirstName} {LastName}";
        }
    }
}

Replace the ValuesController class with this –

[Route("api/[controller]")]
public class ValuesController : ControllerBase
{

    [HttpGet]
    public string Get()
    {
        return $"Hello World from inside a lambda {DateTime.Now}";
    }

    [HttpGet("{id}")]
    public string Get(int id)
    {
        return $"You asked for {id}";
    }

    [HttpPost]
    public IActionResult Post([FromBody] Person person)
    {
        return Ok($"You sent {person.ToString()}");
    }
}

Build it. You can try it by starting the application locally and use like a normal Web API application.

Zip the contents of the WebAPILambda/bin/Debug/netcoreapp3.1 directory in a file named WebAPILambda.zip.

We’re done with the code.

Create the Lambda

I’m not going to show the detailed steps because they are described in part 1 of this series of posts.

But there is one difference – make this the handler WebAPILambda::WebAPILambda.LambdaEntryPoint::FunctionHandlerAsync.

Upload the zip as shown in part 1.

You should be able to paste in the content of the test/WebAPILambda.Tests/SampleRequests/ValuesController-Get.json from the source code into the lambda test feature, but the Json generated by the template does NOT work with a LambdaEntryPoint that inherits from APIGatewayHttpApiV2ProxyFunction.

That’s the lambda setup.

Now we need to send some web requests to it.

API Gateway
There are a few choices here, I am going to show how to use the Http API because it is simpler.

Click Create API.

Top of the list should be HTTP API, click Build.

Hit Add integration, choose Lambda, and pick the lambda function defined in the previous step.
Select Version 2.0.
Give the API a name.
Hit next.

For route configuration leave the METHOD as ANY.
Change resource path to /api/values – this matches the route in Web API controller in the application.

In Configure Stages don’t change anything for this simple example.
Hit next.

In Review and Create confirm that it looks correct and hit Create in the bottom right.

You should now have an API Gateway setup that looks something like –

Click on the Invoke URL and you should get a response like –

{
message: “Not Found”
}

Change the url to include the route of the controller and by adding /api/values and you will get –

That is hitting the GET method. In the attached source code there is file named test.http, it has an example of a POST method.

There you go, an API inside a lambda reachable from the web! Not so hard after you’ve done it once.

Full source code available here.

C# and AWS Lambdas, Part 1 – Hello World

Full source code available here.

This is the first in a series of posts on using .NET with AWS Lambdas. It will start with the simplest example that converts a lowercase string to an uppercase string, but by the end you will be running a .NET Web API powered by lambda, fronted by an API gateway where all the infrastructure is setup by Pulumi – this will take a few posts over the next while.

Getting Started
Install the AWS Lambda Templates.

dotnet new -i Amazon.Lambda.Templates

Now you can create projects based on these templates.

Here is the full list of available templates.

Templates                                                 Short Name                                        Language          Tags                  
----------------------------------------------------      --------------------------------------------      ------------      ----------------------
Order Flowers Chatbot Tutorial                            lambda.OrderFlowersChatbot                        [C#]              AWS/Lambda/Function   
Lambda Custom Runtime Function (.NET 5.0)                 lambda.CustomRuntimeFunction                      [C#], F#          AWS/Lambda/Function   
Lambda Detect Image Labels                                lambda.DetectImageLabels                          [C#], F#          AWS/Lambda/Function   
Lambda Empty Function                                     lambda.EmptyFunction                              [C#], F#          AWS/Lambda/Function   
Lex Book Trip Sample                                      lambda.LexBookTripSample                          [C#]              AWS/Lambda/Function   
Lambda Simple Application Load Balancer Function          lambda.SimpleApplicationLoadBalancerFunction      [C#]              AWS/Lambda/Function   
Lambda Simple DynamoDB Function                           lambda.DynamoDB                                   [C#], F#          AWS/Lambda/Function   
Lambda Simple Kinesis Firehose Function                   lambda.KinesisFirehose                            [C#]              AWS/Lambda/Function   
Lambda Simple Kinesis Function                            lambda.Kinesis                                    [C#], F#          AWS/Lambda/Function   
Lambda Simple S3 Function                                 lambda.S3                                         [C#], F#          AWS/Lambda/Function   
Lambda Simple SNS Function                                lambda.SNS                                        [C#]              AWS/Lambda/Function   
Lambda Simple SQS Function                                lambda.SQS                                        [C#]              AWS/Lambda/Function   
Lambda ASP.NET Core Web API                               serverless.AspNetCoreWebAPI                       [C#], F#          AWS/Lambda/Serverless 
Lambda ASP.NET Core Web Application with Razor Pages      serverless.AspNetCoreWebApp                       [C#]              AWS/Lambda/Serverless 
Serverless Detect Image Labels                            serverless.DetectImageLabels                      [C#], F#          AWS/Lambda/Serverless 
Lambda DynamoDB Blog API                                  serverless.DynamoDBBlogAPI                        [C#]              AWS/Lambda/Serverless 
Lambda Empty Serverless                                   serverless.EmptyServerless                        [C#], F#          AWS/Lambda/Serverless 
Lambda Giraffe Web App                                    serverless.Giraffe                                F#                AWS/Lambda/Serverless 
Serverless Simple S3 Function                             serverless.S3                                     [C#], F#          AWS/Lambda/Serverless 
Step Functions Hello World                                serverless.StepFunctionsHelloWorld                [C#], F#          AWS/Lambda/Serverless 
Serverless WebSocket API                                  serverless.WebSocketAPI                           [C#]              AWS/Lambda/Serverless 

The Application
For this example we are going to use lambda.EmptyFunction.

dotnet new lambda.EmptyFunction --name HelloWorldLambda

This sets up two new projects, one for the lambda, and one for the tests.

There seems to be a bug in the template as it is missing line 6, this is very important. Without it, you will get errors like – An assembly specified in the application dependencies manifest (HelloWorldLambda.deps.json) was not found: package: ‘Amazon.Lambda.Core’, version: ‘1.1.0’

Be sure to add

<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>

to HelloWorldLambda.csproj.

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
    <AWSProjectType>Lambda</AWSProjectType>
    <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Amazon.Lambda.Core" Version="1.1.0" />
    <PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.0.1" />
  </ItemGroup>
</Project>

The code of the lambda can stay the same, it takes an input string and converts it to uppercase and returns a string.

using Amazon.Lambda.Core;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace HelloWorldLambda
{
    public class Function
    {
        
        /// <summary>
        /// A simple function that takes a string and does a ToUpper
        /// </summary>
        /// <param name="input"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public string FunctionHandler(string input, ILambdaContext context)
        {
            return input?.ToUpper();
        }
    }
}

Build this, and zip up the all the files in the bin/debug/netcoreapp3.1 directory. This zip will be uploaded to the AWS soon.

The Lambda Function
For this post I’m going to show how to create the lambda using the AWS UI. In later post I’ll create it with Pulumi.

Open the AWS Lambada page – https://console.aws.amazon.com/lambda.

Click Create Function.

Give the function a name – HelloWorld is a good choice.
Change the runtime to .NET Core 3.1 (C#/PowerShell).

Click Create Function in the bottom right (no shown in the image above).

Upload the zip created above by clicking Action in the Function Code section of the screen.

Edit the Handler to read

HelloWorldLambda::HelloWorldLambda.Function::FunctionHandler 

This matches the namespace in the application we created.

Open the test tool in the top right of the screen and click “Configure test events”.

Set the event name, and replace the body with this “hello world”.

That’s everything setup. Let’s run it.

Running the Lambda
Hit the “Test” button in the top right.

You should see something like – “Execution result: succeeded(logs)”. Expand the Details and you will see “HELLO WORLD” and bunch of other information about the execution of the lambda.

That’s it, a C# .NET Core 3.1 Hello World lambda up and running.

In the next post I’ll do something a little more interesting, I’ll put an API Gateway in front of a lambda, so the lambda can be called from the web.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 4, Searching via Web API in .NET 5

Full source code available here.

This is part four of my four part series on indexing the works of Shakespeare in ElasticSearch.

In this I’ll show how to use the ElasticSearch “low level client” to perform the search. In the past I wrote a blog showing how to use a HttpClient to perform the search using Json, and this works fine, but Steve Gordon suggested I try to the Elastic client as it supports things like connection pooling and still lets me use Json directly with ElasticSearch.

To go along with the Elastic “low level client” there is a “high level client” called NEST, I have tried both and prefer to stick with Json, but you may find them more useful.

Because I develop on a few languages, Json is the natural choice for me. I use it when querying from Node.js, inside a HTTP client (Fiddler, Rest Client, etc) when figuring out my queries and I want to use it in .NET.

But Json and C# don’t go together very well, you have to jump through hoops to make it work with escaping. Or, as I have doe, use a creative approach to deserializing via dynamic objects (I know some people won’t like this), I find this much more convenient than converting my Json queries to the Elastic client syntaxes.

This examples shows how to use the a Web API application to search for a piece of text in isolation or within specific play.

The setup
There is very little to do here.

In Startup.cs add the following to the ConfigureServices(..) method –

services.AddSingleton<ElasticLowLevelClient>(new ElasticLowLevelClient(new ConnectionConfiguration(new Uri("http://localhost:9200"))));

In the SearchController add the following to pass the ElasticSearch client in via dependency injection –

public class SearchController : ControllerBase
{
    private readonly ElasticLowLevelClient _lowLevelClient;
    public SearchController(ElasticLowLevelClient lowLevelClient)
    {
        _lowLevelClient = lowLevelClient;
    }
//snip ..

I have two action methods, one to search for a play and line, and one to search for a line across all plays (I know they could be combined into a single action method, I want keep things simple) –

[HttpGet("Line")]
public ActionResult Line(string text)
{
    string queryWithParams = GetLineQuery(text);
    var lines = PerformQuery(queryWithParams);
    
    return Ok(lines);
}

[HttpGet("PlayAndLine")]
public ActionResult PlayAndLine(string play, string text)
{
    string queryWithParams = GetPlayAndLineQuery(play, text);
    var lines = PerformQuery(queryWithParams);

    return Ok(lines);
}

All very straightforward so far, but now comes the “creative” approach to handling the Json problems.

I put my ElasticSearch queries into their own files. The first is Line.Json

{
    "query": {
        "match_phrase_prefix" :{
            "Line": ""
        }
    }
} 

And the second is PlayAndLine.Json

{
    "query":{
        "bool": {
            "must": [
                { "match": { "Play": "" } }
               ,{ "match_phrase_prefix": { "Line": "" } }
            ]
        }
    }
}

These Json queries are loaded into dynamic objects and the relevant values are set in C#.
See lines 5 and 14 & 15.

private string GetLineQuery(string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/Line.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

private string GetPlayAndLineQuery(string play, string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/PlayAndLine.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.@bool.must[0].match.Play = play;
    workableElasticSearchQuery.query.@bool.must[1].match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

The strings the above methods return are the queries that will be sent to ElasticSearch.

The below method makes the request, and deserializes the response into the ESResponse class. That class was generated by https://json2csharp.com/.

private ESResponse PerformQuery(string queryWithParams)
{
    var response = _lowLevelClient.Search<StringResponse>("shakespeare", queryWithParams);
    ESResponse esResponse = System.Text.Json.JsonSerializer.Deserialize<ESResponse>(response.Body);
    return esResponse;
}

You might have noticed that I use System.Text.Json and Newtonsoft Json, this is because System.Text.Json does not support dynamic deserialization, see this discussion – https://github.com/dotnet/runtime/issues/29690.

That’s it, searching, and parsing of ElasticSearch results via a Web API application, feels a bit messy, but hope it helps.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 3, Sending the Lines to Kinesis

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to ElasticSearch for indexing.

You need to configure the script to point to your ElasticSearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the ElasticSearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row, converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to ElasticSearch, but there are a few drawbacks –

  1. If I have a small ElasticSearch servers (as is the case if you are following along from part 1 where I used Pulumi to setup the infrastructure), sending tens of thousands of index requests directly to ElasticSearch could overwhelm it, I have manged to do this a few times. To alleviate this, I could send in bulk, but I wanted to something with one more added piece of resilience – retries.
  2. If ElasticSearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to ElasticSearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an ElasticSearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules –

npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const elasticSearchClient = new Client({
    node: "https://your elastic search server"
});

const csv=require('csvtojson');
const awsRegion = 'us-east-1';

let kinesis = new AWS.Kinesis({region: awsRegion});

function readCSVAndSendToKinesis(streamName){
    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_small.csv')
    .then((json) => {
        json.forEach((row) =>
        {   
            sendEntryToKinesisStream(row, streamName);
        });
    })
}

//read the file, foreach over and pass to sendEntryToKinesisStream
function sendEntryToKinesisStream(entry, streamName){
    var kinesisRecord = {
        Data: JSON.stringify(entry),
        PartitionKey: entry.Id.toString(),
        StreamName: streamName
    };
    kinesis.putRecord(kinesisRecord, function (err, data){
        if(err){
            console.log('ERROR ' + err);
        }
        else {
            console.log(entry.Id + ' added to Kinesis stream');
        }
    });
}

async function createIndexIfNotExists(indexName) {
    let result = await elasticSearchClient.indices.exists({
        index: indexName
    });
    if (result.body === true) {
        console.log(indexName + ' already exists');
    } else {
        console.log(indexName + ' will be created');
        await createIndex(indexName);
    }
}

async function createIndex(indexName) {
    let result = await elasticSearchClient.indices.create({
        index: indexName,
        body: {
            "mappings": {
                "properties": {
                    "Id": {
                        "type": "integer"
                    },
                    "play": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword" 
                            }
                        }
                    },
                    "characterLineNumber": {
                        "type": "integer"
                    },
                    "actSceneLine": {
                        "type": "text"
                    },
                    "character": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword"
                            }
                        }
                    },
                    "line": {
                        "type": "text"
                    },
                }
            }
        }
    });
    console.log(result.statusCode);
}

async function seed(indexName, streamName) {
    await createIndexIfNotExists(indexName);
    readCSVAndSendToKinesis(streamName);
}

seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in ElasticSearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and ElasticSearch client to for .NET to perform the search.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.

The Lambda
The code provided is based on an example from Elastic Co.

I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.

'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –

exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

DynamoDb, Reading and Writing Data with .Net Core – Part 2

Full source code available here.

A few weeks ago I posted about reading and writing data to DynamoDb. I gave instruction on how to get create tables on localstack and how to use the AWS Document Model approach. I also pointed out that I was not a big fan of this, reading data look like –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    var person = JsonSerializer.Deserialize<Person> ((await people.GetItemAsync(personId)).ToJson());
    return Ok(person);
}

You have to cast to JSON, then deserialize, I think you should be able be able to do something more like – people.GetItemAsync(personId), but you can’t

And writing data looked like –

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    
    var document = new Document();
    document["PersonId"] = person.PersonId;
    document["State"] = person.State;
    document["FirstName"] = person.FirstName;
    document["LastName"] = person.LastName;
    await people.PutItemAsync(document);
    
    return Created("", document.ToJson());
}

For me this feels even worse, having to name the keys in the document, very error prone and hard.

Luckily there is another approach that is a little better. You have to create a class with attributes that indicate what table the class represents and what properties represent the keys in the table.

using Amazon.DynamoDBv2.DataModel;

namespace DynamoDbApiObjectApproach.Models
{
    
    [DynamoDBTable("People")]
    public class PersonDynamoDb
    {

        [DynamoDBHashKey]
        public int PersonId {get;set;}
        public string State {get;set;}
        public string FirstName {get;set;}
        public string LastName {get;set;}
    }
}

Because of these attributes I don’t want to expose this class too broadly, so I create a simple POCO to represent a person.

public class Person
{
    public int PersonId {get;set;}
    public string State {get;set;}
    public string FirstName {get;set;}
    public string LastName {get;set;}
}

I use AutoMapper to map between the two classes, never exposing the PersonDynamoDb to the outside world. If you need help getting started with AutoMapper I wrote a couple of posts recently on this.

Here’s how reading and writing looks now –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    var personDynamoDb = await _dynamoDBContext.LoadAsync<PersonDynamoDb>(personId);
    var person = _mapper.Map<Person>(personDynamoDb);
    return Ok(person);
}

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    var personDynamoDb = _mapper.Map<PersonDynamoDb>(person);
    await _dynamoDBContext.SaveAsync(personDynamoDb);
    return Created("", person.PersonId);
}

This is an improvement, but still not thrilled with the .NET SDK for DynamoDb.

Full source code available here.