Indexing the Works of Shakespeare in ElasticSearch – Part 4, Searching via Web API in .NET 5

Full source code available here.

This is part four of my four part series on indexing the works of Shakespeare in ElasticSearch.

In this I’ll show how to use the ElasticSearch “low level client” to perform the search. In the past I wrote a blog showing how to use a HttpClient to perform the search using Json, and this works fine, but Steve Gordon suggested I try to the Elastic client as it supports things like connection pooling and still lets me use Json directly with ElasticSearch.

To go along with the Elastic “low level client” there is a “high level client” called NEST, I have tried both and prefer to stick with Json, but you may find them more useful.

Because I develop on a few languages, Json is the natural choice for me. I use it when querying from Node.js, inside a HTTP client (Fiddler, Rest Client, etc) when figuring out my queries and I want to use it in .NET.

But Json and C# don’t go together very well, you have to jump through hoops to make it work with escaping. Or, as I have doe, use a creative approach to deserializing via dynamic objects (I know some people won’t like this), I find this much more convenient than converting my Json queries to the Elastic client syntaxes.

This examples shows how to use the a Web API application to search for a piece of text in isolation or within specific play.

The setup
There is very little to do here.

In Startup.cs add the following to the ConfigureServices(..) method –

services.AddSingleton<ElasticLowLevelClient>(new ElasticLowLevelClient(new ConnectionConfiguration(new Uri("http://localhost:9200"))));

In the SearchController add the following to pass the ElasticSearch client in via dependency injection –

public class SearchController : ControllerBase
{
    private readonly ElasticLowLevelClient _lowLevelClient;
    public SearchController(ElasticLowLevelClient lowLevelClient)
    {
        _lowLevelClient = lowLevelClient;
    }
//snip ..

I have two action methods, one to search for a play and line, and one to search for a line across all plays (I know they could be combined into a single action method, I want keep things simple) –

[HttpGet("Line")]
public ActionResult Line(string text)
{
    string queryWithParams = GetLineQuery(text);
    var lines = PerformQuery(queryWithParams);
    
    return Ok(lines);
}

[HttpGet("PlayAndLine")]
public ActionResult PlayAndLine(string play, string text)
{
    string queryWithParams = GetPlayAndLineQuery(play, text);
    var lines = PerformQuery(queryWithParams);

    return Ok(lines);
}

All very straightforward so far, but now comes the “creative” approach to handling the Json problems.

I put my ElasticSearch queries into their own files. The first is Line.Json

{
    "query": {
        "match_phrase_prefix" :{
            "Line": ""
        }
    }
} 

And the second is PlayAndLine.Json

{
    "query":{
        "bool": {
            "must": [
                { "match": { "Play": "" } }
               ,{ "match_phrase_prefix": { "Line": "" } }
            ]
        }
    }
}

These Json queries are loaded into dynamic objects and the relevant values are set in C#.
See lines 5 and 14 & 15.

private string GetLineQuery(string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/Line.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

private string GetPlayAndLineQuery(string play, string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/PlayAndLine.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.@bool.must[0].match.Play = play;
    workableElasticSearchQuery.query.@bool.must[1].match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

The strings the above methods return are the queries that will be sent to ElasticSearch.

The below method makes the request, and deserializes the response into the ESResponse class. That class was generated by using https://json2csharp.com/.

private ESResponse PerformQuery(string queryWithParams)
{
    var response = _lowLevelClient.Search<StringResponse>("shakespeare", queryWithParams);
    ESResponse esResponse = System.Text.Json.JsonSerializer.Deserialize<ESResponse>(response.Body);
    return esResponse;
}

You might have noticed that I use System.Text.Json and Newtonsoft, this is because System.Text.Json does not support dynamic deserialization, see this discussion – https://github.com/dotnet/runtime/issues/29690.

That’s it, searching, and parsing of ElasticSearch results via a Web API application, feels a bit messy, but hope it helps.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 3, Sending the Lines to Kinesis

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to ElasticSearch for indexing.

You need to configure the script to point to your ElasticSearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the ElasticSearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row, converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to ElasticSearch, but there are a few drawbacks –

  1. If I have a small ElasticSearch servers (as is the case if you are following along from part 1 where I used Pulumi to setup the infrastructure), sending tens of thousands of index requests directly to ElasticSearch could overwhelm it, I have manged to do this a few times. To alleviate this, I could send in bulk, but I wanted to something with one more added piece of resilience – retries.
  2. If ElasticSearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to ElasticSearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an ElasticSearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules –

npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const elasticSearchClient = new Client({
    node: "https://your elastic search server"
});

const csv=require('csvtojson');
const awsRegion = 'us-east-1';

let kinesis = new AWS.Kinesis({region: awsRegion});

function readCSVAndSendToKinesis(streamName){
    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_small.csv')
    .then((json) => {
        json.forEach((row) =>
        {   
            sendEntryToKinesisStream(row, streamName);
        });
    })
}

//read the file, foreach over and pass to sendEntryToKinesisStream
function sendEntryToKinesisStream(entry, streamName){
    var kinesisRecord = {
        Data: JSON.stringify(entry),
        PartitionKey: entry.Id.toString(),
        StreamName: streamName
    };
    kinesis.putRecord(kinesisRecord, function (err, data){
        if(err){
            console.log('ERROR ' + err);
        }
        else {
            console.log(entry.Id + ' added to Kinesis stream');
        }
    });
}

async function createIndexIfNotExists(indexName) {
    let result = await elasticSearchClient.indices.exists({
        index: indexName
    });
    if (result.body === true) {
        console.log(indexName + ' already exists');
    } else {
        console.log(indexName + ' will be created');
        await createIndex(indexName);
    }
}

async function createIndex(indexName) {
    let result = await elasticSearchClient.indices.create({
        index: indexName,
        body: {
            "mappings": {
                "properties": {
                    "Id": {
                        "type": "integer"
                    },
                    "play": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword" 
                            }
                        }
                    },
                    "characterLineNumber": {
                        "type": "integer"
                    },
                    "actSceneLine": {
                        "type": "text"
                    },
                    "character": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword"
                            }
                        }
                    },
                    "line": {
                        "type": "text"
                    },
                }
            }
        }
    });
    console.log(result.statusCode);
}

async function seed(indexName, streamName) {
    await createIndexIfNotExists(indexName);
    readCSVAndSendToKinesis(streamName);
}

seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in ElasticSearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and ElasticSearch client to for .NET to perform the search.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.

The Lambda
The code provided is based on an example from Elastic Co.

I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.

'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –

exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

DynamoDb, Reading and Writing Data with .Net Core – Part 2

Full source code available here.

A few weeks ago I posted about reading and writing data to DynamoDb. I gave instruction on how to get create tables on localstack and how to use the AWS Document Model approach. I also pointed out that I was not a big fan of this, reading data look like –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    var person = JsonSerializer.Deserialize<Person> ((await people.GetItemAsync(personId)).ToJson());
    return Ok(person);
}

You have to cast to JSON, then deserialize, I think you should be able be able to do something more like – people.GetItemAsync(personId), but you can’t

And writing data looked like –

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    Table people = Table.LoadTable(_amazonDynamoDbClient, "People");
    
    var document = new Document();
    document["PersonId"] = person.PersonId;
    document["State"] = person.State;
    document["FirstName"] = person.FirstName;
    document["LastName"] = person.LastName;
    await people.PutItemAsync(document);
    
    return Created("", document.ToJson());
}

For me this feels even worse, having to name the keys in the document, very error prone and hard.

Luckily there is another approach that is a little better. You have to create a class with attributes that indicate what table the class represents and what properties represent the keys in the table.

using Amazon.DynamoDBv2.DataModel;

namespace DynamoDbApiObjectApproach.Models
{
    
    [DynamoDBTable("People")]
    public class PersonDynamoDb
    {

        [DynamoDBHashKey]
        public int PersonId {get;set;}
        public string State {get;set;}
        public string FirstName {get;set;}
        public string LastName {get;set;}
    }
}

Because of these attributes I don’t want to expose this class too broadly, so I create a simple POCO to represent a person.

public class Person
{
    public int PersonId {get;set;}
    public string State {get;set;}
    public string FirstName {get;set;}
    public string LastName {get;set;}
}

I use AutoMapper to map between the two classes, never exposing the PersonDynamoDb to the outside world. If you need help getting started with AutoMapper I wrote a couple of posts recently on this.

Here’s how reading and writing looks now –

[HttpGet("{personId}")]
public async Task<IActionResult> Get(int personId)
{
    var personDynamoDb = await _dynamoDBContext.LoadAsync<PersonDynamoDb>(personId);
    var person = _mapper.Map<Person>(personDynamoDb);
    return Ok(person);
}

[HttpPost]
public async Task<IActionResult> Post(Person person)
{
    var personDynamoDb = _mapper.Map<PersonDynamoDb>(person);
    await _dynamoDBContext.SaveAsync(personDynamoDb);
    return Created("", person.PersonId);
}

This is an improvement, but still not thrilled with the .NET SDK for DynamoDb.

Full source code available here.