Indexing the Works of Shakespeare in Elasticsearch – Part 3, Sending the Lines to Kinesis

Full source code available here.

In this, the third part of the series, I show how to read from the Shakespeare CSV file where each row represents a line from a play, (download here), and send these lines to Kinesis. The lambda in AWS will pick up the lines from Kinesis and forward them to Elasticsearch for indexing.

You need to configure the script to point to your Elasticsearch server (line 4) and to the Kinesis stream (line 97);

The script itself is fairly simple, it checks if the Elasticsearch index for the plays already exists, if not, it creates one using the mapping document provided.

Next, it reads from the CSV file and, row by row converts the lines line from the play to Kinesis records and sends them to Kinesis.

I could have written a script that sends the lines directly to Elasticsearch, but there are a few drawbacks -

  1. If I have a small Elasticsearch server (as is the case if you are following along from [part 1](/2020/11/indexing-the-works-of-shakespeare-in-elasticsearch-part-1-infrastructure-as-code/) where I used Pulumi to set up the infrastructure), sending tens of thousands of index requests directly to Elasticsearch could overwhelm it, I have managed to do this a few times. To alleviate this, I could send in bulk, but I wanted to do something with one more added piece of resilience - retries.
  2. If Elasticsearch is temporarily down or there is a networking issue, Kinesis and the lambda will retry the indexing request to Elasticsearch. This is taken care of out of the box, all I had to do was specify how many retries should be performed by the lambda.

The lambda and this code are coupled to an Elasticsearch index named “Shakespeare”, but it would be a simple thing to break this. In the code below, all you would need to do is add an index name to the kinesisRecord, and in the lambda, pass this name to the bulk indexing function (see part 2 for the code).

Unzip the attached file and run this to install the necessary modules -

1
npm install

This will look at the packages.json file and download what is needed.

Below is the code needed to send the rows of the CSV to Kinesis (this is included in the zip).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
const AWS = require('aws-sdk');
const { Client } = require('@elastic/elasticsearch');
const elasticSearchClient = new Client({
    node: "https://your elastic search server"
});

const csv=require('csvtojson');
const awsRegion = 'us-east-1';

let kinesis = new AWS.Kinesis({region: awsRegion});

function readCSVAndSendToKinesis(streamName){
    csv({delimiter:';'})
    .fromFile('./shakespeare_plays_small.csv')
    .then((json) => {
        json.forEach((row) =>
        {   
            sendEntryToKinesisStream(row, streamName);
        });
    })
}

//read the file, foreach over and pass to sendEntryToKinesisStream
function sendEntryToKinesisStream(entry, streamName){
    var kinesisRecord = {
        Data: JSON.stringify(entry),
        PartitionKey: entry.Id.toString(),
        StreamName: streamName
    };
    kinesis.putRecord(kinesisRecord, function (err, data){
        if(err){
            console.log('ERROR ' + err);
        }
        else {
            console.log(entry.Id + ' added to Kinesis stream');
        }
    });
}

async function createIndexIfNotExists(indexName) {
    let result = await elasticSearchClient.indices.exists({
        index: indexName
    });
    if (result.body === true) {
        console.log(indexName + ' already exists');
    } else {
        console.log(indexName + ' will be created');
        await createIndex(indexName);
    }
}

async function createIndex(indexName) {
    let result = await elasticSearchClient.indices.create({
        index: indexName,
        body: {
            "mappings": {
                "properties": {
                    "Id": {
                        "type": "integer"
                    },
                    "play": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword" 
                            }
                        }
                    },
                    "characterLineNumber": {
                        "type": "integer"
                    },
                    "actSceneLine": {
                        "type": "text"
                    },
                    "character": {
                        "type": "text",
                        "fields": {
                            "raw": {
                                "type": "keyword"
                            }
                        }
                    },
                    "line": {
                        "type": "text"
                    },
                }
            }
        }
    });
    console.log(result.statusCode);
}

async function seed(indexName, streamName) {
    await createIndexIfNotExists(indexName);
    readCSVAndSendToKinesis(streamName);
}

seed("shakespeare", "you kinesis stream name");

That’s it, now you have infrastructure as code, a lambda to bulk index in Elasticsearch, and a small application to send data to Kinesis. All that’s left is to add an API to perform a search, I have done this before using HttpClient as shown in this post, but in the next post I’m going to use and Elasticsearch client to for .NET to perform the search.

Full source code available here.

comments powered by Disqus

Related