Activate Your Data Streams Using RudderStack: A Use-Case with AWS Lambda and Amazon Kinesis

Team RudderStack - May 28 '20 - - Dev Community

This blog presents an approach for routing data to RudderStack using Amazon Kinesis and AWS Lambda Functions.

Introduction

Many organizations today make use of streaming event data from their applications and websites. For collecting the data streams, they use tools like Amazon Kinesis. But how can these businesses turn the data streams into actionable insights? A popular approach to do this is through a process that is called activation. In this process, we transform the raw data and then route to different applications and services for insights. For example, we can send signup events to our CRM so that the sales team can work with new leads and establish business opportunities.

In this post, we present a very powerful architecture that uses readily-available services to achieve the above tasks. We combine Amazon Kinesis with AWS Lambda Functions and RudderStack, an open-source and flexible Customer Data Infrastructure that performs the activation we are looking for. The lambda functions in AWS read the Kinesis data streams and pass them on RudderStack for performing the necessary data mapping. RudderStack then passes on this mapped data to the analytics platforms (Google Analytics, Amplitude, etc.) for analytics.

Note: In this post, we use the AWS stack as an example. However, it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider. The stack will still work seamlessly with RudderStack.

How AWS Lambda integrates with RudderStack

As mentioned above, we use Lambda functions in AWS as an intermediary for processing and routing data streams for analytics. As Lambda functions can be coded in Node.js, its integration with data routing tools such as RudderStack is very easy. RudderStack provides a Node.js SDK which we can use with the Lambda code.

Quick Overview of the Data Flow

For the purpose of this blog, we devise a simple application flow:

  • Use the AWS Kinesis Agent to:
    • Monitor the file system for specific file patterns in a specified location
    • Upload the newly arrived files to the pre-defined Kinesis stream
  • Use AWS Kinesis Data Streams Consumer to read and process the data
  • The Consumer triggers an AWS Lambda Function
  • The Lambda function maps the Kinesis data to the RudderStack API arguments
  • The Lambda function invokes the RudderStack API
  • RudderStack routes the data to two destinations - AWS S3 and Google Analytics

Setting up the AWS Kinesis Agent

We can configure the AWS Kinesis Agent using the following lines of code:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/tmp/*.csv",
      "kinesisStream": "lambda-integration-poc",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": ["anonymousId","orderId", "itemId", "itemName", "qty", "unitPrice"],
                    "delimiter": ","
                }
    ]
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Some important points to note here:

  • We configure the AWS Kinesis Agent to monitor CSV files in the /tmp directory.
  • The Kinesis Agent passes the data to the lambda-integration-poc Kinesis data stream. 
  • The data processing options stipulate that the CSV data be converted to JSON. The field names for the generated JSON are also mentioned. 

Some examples of a sample CSV file and the corresponding generated Kinesis records follow:

testuser1,0001,0001,sample product 1,001,51.00

testuser1,0001,0002,sample product 2,002,23.50

{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0001", "itemName": "sample product 1", 
"qty": "001", "unitPrice": "51.00" }

{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0002", "itemName": "sample product 2", 
"qty": "002", "unitPrice": "23.50" }
Enter fullscreen mode Exit fullscreen mode

Note: We use this transformation as an example to demonstrate the operation of AWS Kinesis Agent. Some businesses already have their own programs that write to Kinesis in a format that suits their business requirements. There is no need to change such programs and/or formats.

Setting up AWS Lambda

Before we proceed to the Lambda Function code, it is imperative that we review the overall setup. For this blog, we set up a Docker version of the RudderStack server in an EC2 instance. You can find more instructions on the setup here.

Note: You also need to have the AWS CLI installed in your development environment.

Integrating RudderStack with AWS Lambda

As mentioned previously, we can integrate AWS Lambda seamlessly with third-party libraries such as the RudderStack Node.js SDK. The lambda function calls this SDK to perform the necessary data mappings and route the data streams to the specified analytics destinations. Hence, it would also be prudent at this point to go over a few steps that are necessary for integrating the RudderStack Node SDK with the Lambda Function.

  • You should install the RudderStack Node.js SDK at the location where we maintain the Lambda function artifacts in the development environment, as shown:
[ec2-user@ip-172-31-44-230 ~]$ npm install --prefix=~/lambda-apps @rudderstack/rudder-sdk-node
Enter fullscreen mode Exit fullscreen mode
  • Archive all the contents of the Lambda function development directory in a ZIP file.
[ec2-user@ip-172-31-44-230 lambda-apps]$ zip -r function.zip
Enter fullscreen mode Exit fullscreen mode
  • Update the lambda function deployment, as shown:
[ec2-user@ip-172-31-44-230 lambda-apps]$ aws lambda update-function-code --function-name lambda-apps-dev-helloWorld --zip-file fileb://~/lambda-apps/function.zip
Enter fullscreen mode Exit fullscreen mode

Using the Lambda function

As the next step, the following snippet shows the Lambda code. You can use the Lambda code response to test the availability of the function at the AWS-designated web endpoint, created at the time of deployment of the function for the first time.

In the following code snippet, the following actions occur:

  • The function initializes some of the variables used for constructing the RudderStack canonical object
  • The lambda function iterates over every record in the Kinesis event
  • The function parses the records, which are in JSON format
  • The function then uses the attribute values of the JSON object as the values for the RudderStack object attributes
  • In some cases, RudderStack object attribute values are derived by aggregating the JSON object attribute values, as in the case of revenue
  • Each record is used to create a product object. Multiple product objects are collected into a productsarray. An order object is constructed using the products array, the order_id from the records and the revenue

After this, the order object is used as the value for the properties key while invoking the track API of RudderStack. 

'use strict';
const Analytics = require("@rudderstack/rudder-sdk-node");

//
//
module.exports.helloWorld = (event, context, callback) => {
    const response = {
     statusCode: 200,
     headers: {
         'Access-Control-Allow-Origin': '*', // Required for CORS support to work
     },
     body: JSON.stringify({
         message: 'Go Serverless v1.0! Your function executed successfully!',
         input: event,
     }),
    };
    var order = {};
    var revenue = 0;
    var anonymousId = "dummy";
    order["products"] = [];
    event.Records.forEach(function(record) {
     // Kinesis data is base64 encoded so decode here
     var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
     console.log('Decoded payload:', payload);

     //Construct order line item as expected by GA from Kinesis record
     var orderLine = JSON.parse(payload);
     var product = {};
     product["product_id"] = orderLine.itemId;
     product["name"] = orderLine.itemName;

     revenue += orderLine.qty * orderLine.unitPrice;
     order["products"].push(product);
     order["order_id"] = orderLine.orderId; //keeping it simple, all line items from same order
     anonymousId = orderLine.anonymousId; //keeping simple again, as above

    });

    order["revenue"] = revenue;
    console.log("Order : ", JSON.stringify(order));
    // we need the batch endpoint of the Rudder server you are running
    const client = new Analytics("1ZINZh5pUNcKwgVGccCuSE4hi7K", "Data Plane URL");
    //remember to handle error and allow for processing to continue
    try {
     client.track({"event" : "Order Completed", "anonymousId" : anonymousId, "properties" :  {order}});
     console.log("Rudder Success");
    } catch(err) {
     console.log("Rudder Error");
    }
    callback(null, response);
};
Enter fullscreen mode Exit fullscreen mode

The write_key and the RudderStack endpoint is used to initialize the Rudder client. In this particular case, we configure RudderStack to dump the event to Amazon S3 as well as Google Analytics, for analytics. Learn more on configuring the sources and destinations in RudderStack here:

RudderStack Connections Configuration

The delivered event dumped to Amazon S3 looks like the following:

{"type": "track", "event": "Order Completed", "sentAt": "2020-04-15T09:59:50.246Z", "context": {"library": {"name": "analytics-node", "version": "0.0.1"}}, "_metadata": {"nodeVersion": "12.16.1"}, "messageId": "node-5306d64b863bdf7c95cce1442c70f3ac-1345b9b5-c5a9-4c1b-8338-64762ff2de8d", "timestamp": "2020-04-15T09:59:50.27Z", "properties": {"order": {"revenue": 98, "order_id": "0001", "products": [{"name": "sample product 1", "product_id": "0001"}, {"name": "sample product 2", "product_id": "0002"}]}}, "receivedAt": "2020-04-15T09:59:50.271Z", "request_ip": "34.205.171.63:54764", "anonymousId": "testuser1", "originalTimestamp": "2020-04-15T09:59:50.245Z"}
Enter fullscreen mode Exit fullscreen mode

The screenshot below shows the delivered event in Google Analytics:

Event delivered in Google Analytics through RudderStack and AWS Lambda
The RudderStack event as seen in Google Analytics

Summary

In this post, we saw how to combine the data streams with RudderStack and AWS Lambda functions to create an extremely flexible and real-time activation data flow for your event data. Combining infrastructures like AWS Kinesis and AWS Lambdas with RudderStack results in a lean and scalable data infrastructure where value can be extracted from your data in no time.

Moreover, RudderStack is an open and flexible Customer Data Infrastructure which means that it can be combined with any of the common data platforms you can find. In this post, we use the AWS stack as an example, but it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player