Logging to DataDog with Serliog and .Net 5

Full source code available here.

This is a quick post to showing how to setup a .Net 5 Web Api application with logging to DataDog from your local computer.

Getting Datadog up and running

First step is to create an account with Datadog and then install the Datadog agent on your computer, in my case I am using Ubuntu.

There are a few small configuration steps to get the agent up and running.

In the datadog.yaml file look for an entry called logs_enabled, make sure this is set to true.

##################################
## Log collection Configuration ##
##################################

## @param logs_enabled - boolean - optional - default: false
## Enable Datadog Agent log collection by setting logs_enabled to true.
#
logs_enabled: true

In the /etc/datadog-agent/conf.d/ create a directory called mydotnet5app.d.

Inside that directory create a file called conf.yaml and paste in the below.

#Log section
logs:

    # - type : (mandatory) type of log input source (tcp / udp / file)
    #   port / path : (mandatory) Set port if type is tcp or udp. Set path if type is file
    #   service : (mandatory) name of the service owning the log
    #   source : (mandatory) attribute that defines which integration is sending the log
    #   sourcecategory : (optional) Multiple value attribute. Can be used to refine the source attribute
    #   tags: (optional) add tags to each log collected

  - type: file
    path: /path/to/my/logfile/application.log
    service: csharp
    source: csharp

The .Net 5 Application

I have a simple .Net 5 Web Api application. I added Serilog to log to a file at the location specified in the conf.yaml above.

In Program.cs I add the below. Note the JsonFormatter on line 4, that is important for structured logging in DataDog –

public static void Main(string[] args)
{
    Log.Logger = new LoggerConfiguration()
        .WriteTo.File(new JsonFormatter(renderMessage: true), "webapi.log")
        .CreateLogger();

    try
    {
        Log.Information("Starting up...");
        CreateHostBuilder(args).Build().Run();
    }
    catch (Exception ex)
    {
        Log.Fatal(ex, "Application start-up failed");
    }
    finally
    {
        Log.CloseAndFlush();
    }
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .UseSerilog()
        .ConfigureWebHostDefaults(webBuilder =>
        {
            webBuilder.UseStartup<Startup>();
        });
}

That’s all you need to configure logging, all that is left is to add a few logging statements.

Here is a simple controller that lets you do division from, giving the opportunity to generate and log divide by zero exceptions.

[ApiController]
[Route("[controller]")]
public class MathController : ControllerBase
{
    private readonly ILogger<MathController> _logger;

    public MathController(ILogger<MathController> logger)
    {
        _logger = logger;
    }

    [HttpGet("division")]
    public ActionResult Get(int a, int b)
    {
        _logger.LogInformation("Dividing {numerator} by {denominator}", a, b); // note how I can change the names of the attributes that DataDog logs to numerator and denominator.

        int answer;
        try
        {
            answer = a / b;
        }
        catch (DivideByZeroException ex)
        {
            _logger.LogError(ex, "SomeId: {someId}. ErrorCode: {errorCode} Division didn't work, was dividing {numerator} by {denominator}", Guid.NewGuid(), -99, a, b);

            return BadRequest("Dividing by zero is not allowed");
        }
        _logger.LogInformation("Returning {result}", answer); // again the name of the attribute logged is different from the variable name in the code.
        return Ok(answer);
    }
}

You can also log more complex objects in DataDog, like a Person with an id, first name, last name and date of birth. The full code for this is in the attached zip in the PersonController.cs file, but it is as simple as –

public ActionResult Post(Person person)
{
    _logger.LogDebug("Received new person: {@person}",person);

And here is what it looks like in DataDog.

A successful request –

And one that generates an exception –

And one that logs a complex object –

Now, there is a huge variety of things you can do within DataDog to search, alert, monitor, etc on the data but I’m not going into that here.

Full source code available here.

Blazor, Updating or Refreshing the Display During a Method Call

Full source code available BlazorUpdatingUIEverySecondWasm.

This is a quick post to show how to show how to refresh or update components on a Blazor page asynchronously, for example in response to a task or tasks completing.

In this example I have a button that calls a method, inside the method I have for loop, inside each iteration of the for loop, I increment the counter twice, delay twice, and update the display twice using a call to StateHasChanged().

Here is the code –

@page "/"

<h1>Updating the state of the page every second</h1>

<p>Current count: @currentCount</p>

<button class="btn btn-primary" @onclick="StartCounterAsync">Start counter</button>

@code {
    private int currentCount = 0;
    private async Task StartCounterAsync()
    {
        for (int i = 1; i <= 10;)
        {
            await Task.Delay(1000);
            currentCount = i++;
            StateHasChanged();  
            await Task.Delay(1000);
            currentCount = i++;
            StateHasChanged();  
        }
    }
}

Once you know how to do it’s easy.

There is scenario where using StateHasChanged() won’t work and instead you’ll get an error like –

Exception has occurred: CLR/System.InvalidOperationException
An exception of type 'System.InvalidOperationException' occurred in Microsoft.AspNetCore.Components.dll but was not handled in user code: 'The current thread is not associated with the Dispatcher. Use InvokeAsync() to switch execution to the Dispatcher when triggering rendering or component state.'
   at Microsoft.AspNetCore.Components.Dispatcher.AssertAccess()
   at Microsoft.AspNetCore.Components.RenderTree.Renderer.AddToRenderQueue(Int32 componentId, RenderFragment renderFragment)
   at Microsoft.AspNetCore.Components.ComponentBase.StateHasChanged()

In this case use InvokeAsync(StateHasChanged); instead and it should work.

Full source code available BlazorUpdatingUIEverySecondWasm.

Polly and Blazor, Part 1 – Simple Wait and Retry

Full source code available here.

A few weeks ago I gave a talk on Polly at Dotnetconf (you can check it out here), at the end I got many questions about using Polly with Blazor, I had never tried it, but assumed it would work.

In this blog, part 1 of a few on Polly and Blazor, I’ll show you how to get a simple retry mechanism up and working in a Blazor Server App. I am not concerned with dependency injections, policy registries or anything other fancy things in this post, that will come later..

The setup
I have a .Net 5 Blazor Server app created with

dotnet new blazorserver

I stripped out the Blazor pages I didn’t need, and changed the code of the Index.razor file to make a HTTP call to a controller that generates errors 75% of time. I removed links to the other Blazor pages too.

Around the HTTP call I use the Polly Wait and Retry Policy to retry up three times, with a delay between each request. If you haven’t used Polly before, I have written a LOT about it, and have a Pluralsight course on it.

I made a few updates to the UI to show what the HttpClient and Polly are doing.

The Wait and Retry policy will retry after 2, 4 and 6 seconds. It prints out the to the console and updates the UI with the errors it gets from the remote system and details about the retry.

Because this is my first Blazor app, it took a little longer and may have some less than perfect approaches, but you should be able to take the retry idea and apply it to you application.

Before I show the code for retrying, let me show how I added a familiar Web API controller.

In Configure(..) method of Startup.cs I added endpoints.MapControllers(); to the below block so it looks like this –

app.UseEndpoints(endpoints =>
{
    endpoints.MapControllers(); // added this
    endpoints.MapBlazorHub();
    endpoints.MapFallbackToPage("/_Host");
});

Then I created a Controllers directory and added a new file FaultyController.cs.

Here is its code –

using Microsoft.AspNetCore.Mvc;

namespace WebApiDataDog.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class FaultyController : ControllerBase
    {
        private static int counter = 0;

        public FaultyController()
        {
        }

        [HttpGet]
        public ActionResult Get()
        {
            if (++counter % 4 == 0)
            {
                return Ok(counter);
            }
            else if(counter % 4 == 1)
            {
                return StatusCode(501);
            }
            else if(counter % 4 == 2)
            {
                return StatusCode(502);
            }
            else
            {
                return StatusCode(503);
            }
        }
    }
}

And here is my Razor page that uses the Polly Wait and Retry policy.

@page "/"
@using Polly

<h1>Polly Test</h1>
<p>This demo uses a HttpClient to call a controller that is setup to fail 75% of the time (first three request will fail, fourth will succeed). The Polly Retry policy will kick in a retry up to three time.</p>

<p>Status: @status</p>
<p>Polly Summary : @pollyMessage</p>

<p>Response Code : @responseCode</p>
<p>Response Body : @number</p>

<button class="btn btn-primary" @onclick="CallRemoteServiceAsync">Call remote service</button>

@code {
   HttpClient httpClient = new HttpClient() {
        BaseAddress =  new Uri("http://localhost:5000")
    };

    private string status;
    private string pollyMessage;
    private string responseCode;
    private string number;

    private async Task CallRemoteServiceAsync()
    {
        IAsyncPolicy<HttpResponseMessage> _httpRequestPolicy = Policy.HandleResult<HttpResponseMessage>(
            r => !r.IsSuccessStatusCode)
            .WaitAndRetryAsync(3,
                retryAttempt => TimeSpan.FromSeconds(retryAttempt * 2), onRetry: (response, retryDelay, retryCount, context) => {
                    pollyMessage = $"Recieved: {response.Result.StatusCode}, retryCount: {retryCount}, delaying: {retryDelay.Seconds} seconds\n";
                    Console.WriteLine(pollyMessage);
                    InvokeAsync(StateHasChanged);
        });

        status = "Calling remote service...";
        string requestEndpoint = "faulty/";

        HttpResponseMessage httpResponse = await _httpRequestPolicy.ExecuteAsync(() => httpClient.GetAsync(requestEndpoint));
        
        responseCode = httpResponse.StatusCode.ToString();
        number = await httpResponse.Content.ReadAsStringAsync();
        status = "Finished";
        pollyMessage = "";
    }
}

As I said above, I’m going to write a few more posts on Polly and Blazor to show some other scenarios, like using the Polly Context to pass data around and dependency injection (hopefully).

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 4, Searching via Web API in .NET 5

Full source code available here.

This is part four of my four part series on indexing the works of Shakespeare in ElasticSearch.

In this I’ll show how to use the ElasticSearch “low level client” to perform the search. In the past I wrote a blog showing how to use a HttpClient to perform the search using Json, and this works fine, but Steve Gordon suggested I try to the Elastic client as it supports things like connection pooling and still lets me use Json directly with ElasticSearch.

To go along with the Elastic “low level client” there is a “high level client” called NEST, I have tried both and prefer to stick with Json, but you may find them more useful.

Because I develop on a few languages, Json is the natural choice for me. I use it when querying from Node.js, inside a HTTP client (Fiddler, Rest Client, etc) when figuring out my queries and I want to use it in .NET.

But Json and C# don’t go together very well, you have to jump through hoops to make it work with escaping. Or, as I have doe, use a creative approach to deserializing via dynamic objects (I know some people won’t like this), I find this much more convenient than converting my Json queries to the Elastic client syntaxes.

This examples shows how to use the a Web API application to search for a piece of text in isolation or within specific play.

The setup
There is very little to do here.

In Startup.cs add the following to the ConfigureServices(..) method –

services.AddSingleton<ElasticLowLevelClient>(new ElasticLowLevelClient(new ConnectionConfiguration(new Uri("http://localhost:9200"))));

In the SearchController add the following to pass the ElasticSearch client in via dependency injection –

public class SearchController : ControllerBase
{
    private readonly ElasticLowLevelClient _lowLevelClient;
    public SearchController(ElasticLowLevelClient lowLevelClient)
    {
        _lowLevelClient = lowLevelClient;
    }
//snip ..

I have two action methods, one to search for a play and line, and one to search for a line across all plays (I know they could be combined into a single action method, I want keep things simple) –

[HttpGet("Line")]
public ActionResult Line(string text)
{
    string queryWithParams = GetLineQuery(text);
    var lines = PerformQuery(queryWithParams);
    
    return Ok(lines);
}

[HttpGet("PlayAndLine")]
public ActionResult PlayAndLine(string play, string text)
{
    string queryWithParams = GetPlayAndLineQuery(play, text);
    var lines = PerformQuery(queryWithParams);

    return Ok(lines);
}

All very straightforward so far, but now comes the “creative” approach to handling the Json problems.

I put my ElasticSearch queries into their own files. The first is Line.Json

{
    "query": {
        "match_phrase_prefix" :{
            "Line": ""
        }
    }
} 

And the second is PlayAndLine.Json

{
    "query":{
        "bool": {
            "must": [
                { "match": { "Play": "" } }
               ,{ "match_phrase_prefix": { "Line": "" } }
            ]
        }
    }
}

These Json queries are loaded into dynamic objects and the relevant values are set in C#.
See lines 5 and 14 & 15.

private string GetLineQuery(string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/Line.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

private string GetPlayAndLineQuery(string play, string text)
{
    string elasticSearchQuery = System.IO.File.ReadAllText($"Queries/PlayAndLine.json");
    dynamic workableElasticSearchQuery = JsonConvert.DeserializeObject(elasticSearchQuery);
    workableElasticSearchQuery.query.@bool.must[0].match.Play = play;
    workableElasticSearchQuery.query.@bool.must[1].match_phrase_prefix.Line = text;

    return workableElasticSearchQuery.ToString();
}

The strings the above methods return are the queries that will be sent to ElasticSearch.

The below method makes the request, and deserializes the response into the ESResponse class. That class was generated by https://json2csharp.com/.

private ESResponse PerformQuery(string queryWithParams)
{
    var response = _lowLevelClient.Search<StringResponse>("shakespeare", queryWithParams);
    ESResponse esResponse = System.Text.Json.JsonSerializer.Deserialize<ESResponse>(response.Body);
    return esResponse;
}

You might have noticed that I use System.Text.Json and Newtonsoft Json, this is because System.Text.Json does not support dynamic deserialization, see this discussion – https://github.com/dotnet/runtime/issues/29690.

That’s it, searching, and parsing of ElasticSearch results via a Web API application, feels a bit messy, but hope it helps.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 3, Sending the Lines to Kinesis

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to ElasticSearch for indexing.

You need to configure the script to point to your ElasticSearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the ElasticSearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row, converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to ElasticSearch, but there are a few drawbacks –

  1. If I have a small ElasticSearch servers (as is the case if you are following along from part 1 where I used Pulumi to setup the infrastructure), sending tens of thousands of index requests directly to ElasticSearch could overwhelm it, I have manged to do this a few times. To alleviate this, I could send in bulk, but I wanted to something with one more added piece of resilience – retries.
  2. If ElasticSearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to ElasticSearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an ElasticSearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules –

npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const elasticSearchClient = new Client({
    node: "https://your elastic search server"
});

const csv=require('csvtojson');
const awsRegion = 'us-east-1';

let kinesis = new AWS.Kinesis({region: awsRegion});

function readCSVAndSendToKinesis(streamName){
    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_small.csv')
    .then((json) => {
        json.forEach((row) =>
        {   
            sendEntryToKinesisStream(row, streamName);
        });
    })
}

//read the file, foreach over and pass to sendEntryToKinesisStream
function sendEntryToKinesisStream(entry, streamName){
    var kinesisRecord = {
        Data: JSON.stringify(entry),
        PartitionKey: entry.Id.toString(),
        StreamName: streamName
    };
    kinesis.putRecord(kinesisRecord, function (err, data){
        if(err){
            console.log('ERROR ' + err);
        }
        else {
            console.log(entry.Id + ' added to Kinesis stream');
        }
    });
}

async function createIndexIfNotExists(indexName) {
    let result = await elasticSearchClient.indices.exists({
        index: indexName
    });
    if (result.body === true) {
        console.log(indexName + ' already exists');
    } else {
        console.log(indexName + ' will be created');
        await createIndex(indexName);
    }
}

async function createIndex(indexName) {
    let result = await elasticSearchClient.indices.create({
        index: indexName,
        body: {
            "mappings": {
                "properties": {
                    "Id": {
                        "type": "integer"
                    },
                    "play": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword" 
                            }
                        }
                    },
                    "characterLineNumber": {
                        "type": "integer"
                    },
                    "actSceneLine": {
                        "type": "text"
                    },
                    "character": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword"
                            }
                        }
                    },
                    "line": {
                        "type": "text"
                    },
                }
            }
        }
    });
    console.log(result.statusCode);
}

async function seed(indexName, streamName) {
    await createIndexIfNotExists(indexName);
    readCSVAndSendToKinesis(streamName);
}

seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in ElasticSearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and ElasticSearch client to for .NET to perform the search.

Full source code available here.

Indexing the Works of Shakespeare in ElasticSearch – Part 2, Bulk Indexing

Full source code available here, look inside the lambda folder.

This is part two of my series on indexing the works of Shakespeare in ElasticSearch. In part one I setup the infrastructure as code where I created all the necessary AWS resources, including the lambda that is used for indexing data in bulk. But in that post I didn’t explain how the lambda works.
Indexing in bulk is a more reliable and scalable way to index as it is easy to overwhelm small ElasticSearch instances if you are indexing a high volume of documents one at time.

The Lambda
The code provided is based on an example from Elastic Co.

I add the AWS SDK and an ElasticSearch connector. The connector provides makes it possible to use IAM authentication when calling ElasticSearch.

'use strict'
const AWS = require('aws-sdk');

require('array.prototype.flatmap').shim();

const { Client } = require("@elastic/elasticsearch"); // see here for more  https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/bulk_examples.html
const elasticSearchConnector = require('aws-elasticsearch-connector');

const client = new Client({
    ...elasticSearchConnector(AWS.config),
    node: "https://" + process.env.esUrl
});

The handler is simple, it reads the incoming records from Kinesis adds them to an array and calls the bulk indexing method –

exports.handler = (event, context, callback) => {
    let dataToIndex = [];

    if(event.Records != null){
        event.Records.forEach(record => {
            let rawData = Buffer.from(record.kinesis.data, 'base64').toString("ascii");
            let obj = JSON.parse(rawData);
            dataToIndex.push(obj);
        });

        if(dataToIndex.length > 0) {
            indexDataInElasticSearch(dataToIndex, 'shakespeare'); // this could be passed in from via th stream data too
        }
    }
    callback(null, "data indexed");
};
async function indexDataInElasticSearch(dataToIndex, indexName) { 
    console.log('Seeding...' + dataToIndex[0].Id + " - " + dataToIndex[dataToIndex.length - 1].Id);
    const body = dataToIndex.flatMap(entry => [{ index: { _index: indexName, _id: entry.Id, _type: '_doc' } }, entry]);
    const { body: bulkResponse } =   await client.bulk({ refresh: true, body });
}

That’s it, now all we need is a way of sending data to Kinesis, and that will be in the next post.

Full source code available here, look inside the lambda folder.

Indexing the Works of Shakespeare in ElasticSearch – Part 1, Infrastructure as Code

Full source code available here.

WARNING – be careful when using this, Kinesis costs money and is not on the AWS free tier. At time of writing a couple ElasticSearch instance types are included with the free tier, but you can only have one instance running at time. I made a mistake and spun up two ElasticSearch instances for a few days and ran up a small bill. I got in touch with AWS support, explained what I was doing, and they refunded me, very helpful and understanding.

This is part one of a three parter where I’m going to show how to index the complete works of Shakespeare in ElasticSearch. This first part will setup the infrastructure on AWS. The second will go through the lambda that bulk loads data into ElasticSearch. The third will show how to, in Node.js, create the index on the ElasticSearch domain, read the works of Shakespeare from CSV and send to Kinesis.

Introduction
A few weeks ago I wrote a post describing how to get ElasticSearch up and running on AWS using Pulumi. It was a simple approach with expectation that the user would send documents directly to ElasticSearch for indexing. This is fine if all you are doing are some experiments, but if you are loading a lot of data and especially if you are sending the data in one document at a time you can easily overwhelm a small ElasticSearch instance and get socket timeouts or run simply run out of sockets.

One way of to reduce the likelihood of this occurring is to make bulk requests to index documents in ElasticSearch – instead of sending one document per request, send 100 or 1,000.

Another approach is to use AWS Kinesis in combination with bulk indexing.

Kinesis is reliable service that you can send thousands (or millions) of individual documents to, these documents are picked up in batches by a lambda that in turn sends them in bulk to ElasticSearch.

If for some reason the lambda fails to process the documents, Kinesis will deliver them to the lambda again to retry indexing.

What Infrastructure is Needed
Quite a lot is needed to get this up and running.

On the IaC side –

    An AWS Role
    An AWS policy attachment
    A Kinesis stream
    An ElasticSearch instance
    An AWS Lambda
    The code the Lambda will execute
    A mapping between the Kinesis stream and the Lambda

Outside of IaC, the following is needed and will be shown in upcoming posts –

    An ElasticSearch mapping document
    A tool to send data to Kinesis for indexing

I also want to limit access to ElasticSearch service to my IP address, this is easy to figure ouw with a call to an API like api.ipify.org.

The lambda needs a zip file with all my Node.js code. Normally this would be of your CI/CD pipeline, but I want to do this all in one so it’s included here. BEWARE, I was not able to create a valid zip for the AWS lambda with ZipFile.CreateFromDirectory, instead I used Ionic.Zip.

By default, Pulumi suffixes the names of your resources with random string, I don’t like this so I explicitly set names on everything. There is a little method at the top of the class to help adding a prefix to resource names like “test-01-“, or whatever you want.

The Stack

The IaC code starts with a query to a third party API to get the IP address my computer is using, there doesn’t seem to be an easy way to avoid using a call .Result on the httpClient.GetStringAsync call.

public MyStack()
{
    HttpClient httpClient = new HttpClient()
    {
        BaseAddress = new System.Uri("https://api.ipify.org/")
    };
    string myIPAddress  = httpClient.GetStringAsync("?format=text").Result;

I then zip up the Lambda source, this is not what you would normally when deploying a serious application but it’s useful for this demo. As mentioned above I’m using Ionic.Zip because I could not get the zip file created by System.IO.Compression.ZipFile.CreateFromDirectory(..).

File.Delete("index.zip");
using (ZipFile zip = new ZipFile())
{
    zip.AddDirectory("lambda");
    zip.Save("index.zip");
}

IP address figured out, zip file in place, now I can start building the infrastructure.

Here it all is – this creates the role, policy attachment, Kinesis Stream, ElasticSearch domain, Lambda, Kinesis mapping to Lambda, and output the URL of the ElasticSearch domain.

var elasticsearch_indexing_role = new Aws.Iam.Role(PrefixName("elasticsearch_indexing_role"), new Aws.Iam.RoleArgs
{
    Name = PrefixName("elasticsearch_indexing_role"),
    AssumeRolePolicy = @"{
                            ""Version"": ""2012-10-17"",
                            ""Statement"": [
                                {
                                ""Action"": ""sts:AssumeRole"",
                                ""Principal"": {
                                    ""Service"": ""lambda.amazonaws.com""
                                },
                                ""Effect"": ""Allow"",
                                ""Sid"": """"
                                }
                            ]
                        }",
});

var lambdaKinesisPolicyAttachment = new Aws.Iam.PolicyAttachment(PrefixName("lambdaKinesisPolicyAttachment"), new Aws.Iam.PolicyAttachmentArgs
{
    Name = PrefixName("lambdaKinesisPolicyAttachment"),
    Roles =
    {
        elasticsearch_indexing_role.Name
    },
    PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
});

var elasticsearch_kinesis = new Aws.Kinesis.Stream(PrefixName("elasticsearch_kinesis"), new Aws.Kinesis.StreamArgs
{
    Name = PrefixName("elasticsearch_kinesis"),
    RetentionPeriod = 24,
    ShardCount = 1,
    ShardLevelMetrics =
    {
        "IncomingBytes",
        "OutgoingBytes",
    },
});

string esDomainName = PrefixName("elasticsearch");
var config = new Config();
var currentRegion = Output.Create(Aws.GetRegion.InvokeAsync());
var currentCallerIdentity = Output.Create(Aws.GetCallerIdentity.InvokeAsync());
var esDomain = new ElasticSearch.Domain(esDomainName, new ElasticSearch.DomainArgs
{
    DomainName = esDomainName,
    ClusterConfig = new ElasticSearch.Inputs.DomainClusterConfigArgs
    {
        InstanceType = "t2.small.elasticsearch",
    },
    EbsOptions = new DomainEbsOptionsArgs()
    {
        EbsEnabled = true,
        VolumeSize = 10,
        VolumeType = "gp2"
    },
    ElasticsearchVersion = "7.8",
    AccessPolicies = Output.Tuple(currentRegion, currentCallerIdentity, elasticsearch_indexing_role.Arn).Apply(values =>
    {
        var currentRegion = values.Item1;
        var currentCallerIdentity = values.Item2;
        return $@"
        {{
            ""Version"": ""2012-10-17"",
            ""Statement"": [
                {{
                    ""Effect"": ""Allow"",
                    ""Principal"": {{
                        ""AWS"": ""{values.Item3}""
                    }},
                    ""Action"": ""es:*"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*""
                }},
                {{
                    ""Action"": ""es:*"",
                    ""Principal"": {{
                        ""AWS"": ""*""
                    }},
                    ""Effect"": ""Allow"",
                    ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
                    ""Condition"": {{
                        ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
                    }}
                }}
            ]
        }}
        ";
    }),
});
this.ESDomainEndpoint = esDomain.Endpoint;


var lambdaEnvironmentVariables = new Aws.Lambda.Inputs.FunctionEnvironmentArgs();
lambdaEnvironmentVariables.Variables.Add("esUrl", esDomain.Endpoint);

var elasticsearch_indexing_function = new Aws.Lambda.Function(PrefixName("elasticsearch_indexing_function"), new Aws.Lambda.FunctionArgs
{
    Handler = "index.handler",
    MemorySize = 128,
    Name = PrefixName("elasticsearch_indexing_function"),
    Publish = false,
    ReservedConcurrentExecutions = -1,
    Role = elasticsearch_indexing_role.Arn,
    Runtime = "nodejs12.x",
    Timeout = 4,
    Code = new FileArchive("index.zip"),
    Environment = lambdaEnvironmentVariables
});

var elasticsearch_indexing_event_source = new Aws.Lambda.EventSourceMapping(PrefixName("elasticsearch_indexing_event_source"), new Aws.Lambda.EventSourceMappingArgs
{
    EventSourceArn = elasticsearch_kinesis.Arn,
    FunctionName = elasticsearch_indexing_function.Arn,
    StartingPosition = "LATEST",
});

Finally I want to print out the URL of the ElasticSearch domain.

[Output]
public Output<string> ESDomainEndpoint { get; set; }

That’s it, now all that is left is to deploy and wait…quite…a…while…for ElasticSearch to startup, sometimes as much as 20 minutes.

To deploy run –

pulumi up

And now you wait.

Part two coming soon.

Full source code available here.

VS Code Bug – Interpolation and Commented Lines, Workaround

This is a quick post with a workaround to a bug in VS Code that occurs when you use string with @$ – the verbatim character and the interpolation characters.

If you have a piece of code that looks like this –

string text = @$"any text as long as there is a new line
/*";

Everything that comes after it will look commented out, like so –

This is a bit annoying because all coloring is lost below that /*.

You might not have hit this, but if start using Pulumi for IaC in AWS, you will because many permissions on resources often end with /*, or if you want to print some slashes and stars to the console.

Fortunately, there is a very easy workaround! Use $@ instead of @$.

string text = $@"any text as long as there is a new line
/*";

Much better!

Reading CSV Files into Objects with Node.js

Full source code available here.

As I am learning Node.Js I am constantly surprised by how easy it is to do some things, and how difficult to do others, and how poor the examples out there are.

Case in point, I want to read some data from as CSV file and then send it off to ElasticSearch for indexing. The ElasticSearch part I have figured out already, and in Node.js that is easy.
I expected it to be trivial to find an example of reading a file, creating objects based on the rows, processing the objects, and doing all that in chunks of say 1000 rows. This kind of stuff is easy in C# and there are plenty of examples. Not the case is Node.js.

Here two ways of doing it.

You can download a CSV with all of Shakespeare’s plays, but the example you can download from above has just the first 650 lines from that file.

I want to read the file 100 rows at a time, put the rows into objects and then process the rows.

csv-parse
The first module I came across was csv-parse.

Run this from the console –

npm install csv-parse

I have a Entry class that represents a row from the CSV file. Its constructor takes six parameters that represent each of the columns in the CSV file.

In this example processing the data in chunks is not necessary, but when I process the full file with over 130,000 rows, chunking becomes important.

var fs = require('fs');
var parse = require('csv-parse');


function readCSV() {
    let entries = [];
    let count = 0;

    fs.createReadStream('shakespeare_plays_sample.csv')
        .pipe(parse({ delimiter: ';', from_line: 2 }))
        .on('data', function (row) {
            count++;
            entries.push(new Entry(row[0], row[1], row[2], row[3], row[4], row[5]))

            if (count % 100 == 0) {
                processEntries(entries);
                count = 0;
                entries = []; // clear the array
            }
        })
        .on('end', function () {
            processEntries(entries);
        });
}

function processEntries(entries) {
    console.log(entries[0].Id + "  to " + entries[entries.length - 1].Id);
}

class Entry {
    constructor(id, play, characterLineNumber, actSceneLine, character, line) {
        this.Id = id;
        this.Play = play;
        this.CharacterLineNumber = characterLineNumber;
        this.ActSceneLine = actSceneLine;
        this.Character = character;
        this.Line = line;
    }
}

readCSV();

Note how I have to make sure that the last chuck is also processed on line 22.

This approach seems fine, but then I found the csvtojson module.

csvtojson
This module makes what I’m trying to do a little easier by skipping over the need to explicitly construct an object with the data from the rows in the file.

First install the module –

npm install csvtojson

This is the code –

const csv=require('csvtojson');

function readCSV(){
    let entries = [];
    let count = 0;

    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_sample.csv')
    .then((json)=>{
        json.forEach((row)=>
        {   
            count++;
            entries.push(row);
            if(count % 100 == 0){
                processEntries(entries);
                count = 0;
                entries = []; // clear the array
            }
        });
        processEntries(entries);
    })
    return entries;
}

function processEntries(entries){
    console.log(entries[0].Id + "  to " + entries[entries.length - 1].Id);
}

readCSV();

Again note how I process the chunks of 100, and then final chunk on line 20.

All of this is the aim of indexing all of Shakespeare’s works in ElasticSearch, and that I will show in another post.

Full source code available here.

Working with JSON in .NET, Infrastructure as Code with Pulumi

Full source code available here.

This is a follow up to my previous post where I used dynamic and JSON files to make querying ElasticSearch with a HttpClient much easier.

To deploy my ElasticSearch domain on AWS I used Pulumi. ElasticSearch requires a JSON policy to define the permissions. In the post linked above, I have a heavily escaped that This policy can be complex and needs values substituted into it. In the example below I need to pass in the region, account id, domain name and allowed IP address.

Here is a very simple policy with four substitutions –

"{{
""Version"": ""2012-10-17"",
""Statement"": [
    {{
        ""Action"": ""es:*"",
        ""Principal"": {{
            ""AWS"": ""*""
        }},
        ""Effect"": ""Allow"",
        ""Resource"": ""arn:aws:es:{currentRegion.Name}:{currentCallerIdentity.AccountId}:domain/{esDomainName}/*"",
        ""Condition"": {{
            ""IpAddress"": {{""aws:SourceIp"": [""{myIPAddress}""]}}
        }}
    }}
]
}}"

Just escaping this is not easy, and very prone to error. A more realistic policy would be significantly longer and would need more substitutions.

Using a JSON file
Here is what I think is an easier way. As in the previous post, the JSON file becomes part of my source code. It is deserialized into a dynamic object and the required values are set.

Here is the AWS policy as it appears in my JSON file. The resource (made up of region, account, and domain name) and IpAddress are left blank, but the structure of the policy is the same as you would paste into the AWS console.

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "AWS": "*"
        },
        "Action": "es:*",
        "Resource": "",
        "Condition": {
          "IpAddress": {
            "aws:SourceIp": ""
          }
        }
      }
    ]
}

In my C# I read the file, deserialize, and set the values with simple C#.

Here is an example –

private string GetAWSElasticSearchPolicy(string region, string account, string elasticSearchDomainName, string allowedIPAddress)
{
    string blankPolicy = File.ReadAllText("AWSPolicy.json");
    dynamic awsElasticSearchPolicy = JsonConvert.DeserializeObject(blankPolicy);

    awsElasticSearchPolicy.Statement[0].Resource = $"arn:aws:es:{region}:{account}:domain/{elasticSearchDomainName}/*";
    awsElasticSearchPolicy.Statement[0].Condition.IpAddress = new JObject(new JProperty("aws:SourceIp", allowedIPAddress));

    return awsElasticSearchPolicy.ToString(); // this is correctly formatted JSON that can be used with Pulumi.
}

Line 3 reads the JSON file into a string.
Line 4 turns the string into a dynamic object.
Lines 6 & 7 set the values I want.
Line 9 returns a nice JSON string that can be used with Pulumi.

This is much cleaner than the heavily escaped version in this post.

Full source code available here.