Indexing the Works of Shakespeare in Elasticsearch – Part 3, Sending the Lines to Kinesis

Want to learn more about AWS Lambda and .NET? Check out my A Cloud Guru course on ASP.NET Web API and Lambda.

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to Elasticsearch for indexing.

You need to configure the script to point to your Elasticsearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the Elasticsearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to Elasticsearch, but there are a few drawbacks -

  1. If I have a small Elasticsearch server (as is the case if you are following along from part 1 where I used Pulumi to set up the infrastructure), sending tens of thousands of index requests directly to Elasticsearch could overwhelm it, I have managed to do this a few times. To alleviate this, I could send in bulk, but I wanted to do something with one more added piece of resilience - retries.

  2. If Elasticsearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to Elasticsearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an Elasticsearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules -

1npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

 1const AWS = require('aws-sdk');
 2const { Client } = require('@elastic/elasticsearch');
 3const elasticSearchClient = new Client({
 4    node: "https://your elastic search server"
 5});
 6
 7const csv=require('csvtojson');
 8const awsRegion = 'us-east-1';
 9
10let kinesis = new AWS.Kinesis({region: awsRegion});
11
12function readCSVAndSendToKinesis(streamName){
13    csv({delimiter:';'})
14    .fromFile('./shakespeare_plays_small.csv')
15    .then((json) => {
16        json.forEach((row) =>
17        {   
18            sendEntryToKinesisStream(row, streamName);
19        });
20    })
21}
22
23//read the file, foreach over and pass to sendEntryToKinesisStream
24function sendEntryToKinesisStream(entry, streamName){
25    var kinesisRecord = {
26        Data: JSON.stringify(entry),
27        PartitionKey: entry.Id.toString(),
28        StreamName: streamName
29    };
30    kinesis.putRecord(kinesisRecord, function (err, data){
31        if(err){
32            console.log('ERROR ' + err);
33        }
34        else {
35            console.log(entry.Id + ' added to Kinesis stream');
36        }
37    });
38}
39
40async function createIndexIfNotExists(indexName) {
41    let result = await elasticSearchClient.indices.exists({
42        index: indexName
43    });
44    if (result.body === true) {
45        console.log(indexName + ' already exists');
46    } else {
47        console.log(indexName + ' will be created');
48        await createIndex(indexName);
49    }
50}
51
52async function createIndex(indexName) {
53    let result = await elasticSearchClient.indices.create({
54        index: indexName,
55        body: {
56            "mappings": {
57                "properties": {
58                    "Id": {
59                        "type": "integer"
60                    },
61                    "play": {
62                        "type": "text",
63                        "fields": {
64                            "raw": {
65                                "type": "keyword" 
66                            }
67                        }
68                    },
69                    "characterLineNumber": {
70                        "type": "integer"
71                    },
72                    "actSceneLine": {
73                        "type": "text"
74                    },
75                    "character": {
76                        "type": "text",
77                        "fields": {
78                            "raw": {
79                                "type": "keyword"
80                            }
81                        }
82                    },
83                    "line": {
84                        "type": "text"
85                    },
86                }
87            }
88        }
89    });
90    console.log(result.statusCode);
91}
92
93async function seed(indexName, streamName) {
94    await createIndexIfNotExists(indexName);
95    readCSVAndSendToKinesis(streamName);
96}
97
98seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in Elasticsearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and Elasticsearch client to for .NET to perform the search.

Full source code available here.

comments powered by Disqus

Related