We are excited to be a part of AWS re:Invent 2024. Visit us at booth #1844 in Las Vegas.More info
Blog

Serverless Event Stream Processing with Aerospike

abhilash-mandaliya-95310049a22199d69461aa8b0108c5c3
Abhilash Mandaliya
Software Engineer
October 28, 2021|11 min read

Serverless compute architectural pattern allows you to run code without provisioning or managing servers, creating workload-aware cluster scaling logic, maintaining event integrations, or managing runtimes. AWS Lambda and Google Cloud Functions in GCP are important services that are based on this pattern and can be extended to support event streaming pipelines. AWS Lambda is integrated with the majority of the services of the AWS big data ecosystem. Likewise for the integration of Google Cloud Functions with Google Cloud Platform. This presents an opportunity for you to integrate the Aerospike database with the broader cloud ecosystem, in order to build efficient cloud-native streaming pipelines.

Aerospike Connect for Event Stream Processing (ESP) or the ESP connector converts Change Notification (CN) over XDR into HTTP/1 or HTTP/2 requests and streams them to downstream consumers or any system that can ingest HTTP requests. In our earlier blog, we had shared several Connect for ESP-powered use cases. In this blog, we will hone in on the serverless event processing pattern and show how you can leverage AWS Lambda or Google Cloud Functions in conjunction with Aerospike Connect for ESP and Aerospike database to implement them.

With serverless functions, you can develop your own function for supported backend services, all with zero administration. Further, you can code those functions in your favorite language (Node.js, Python, Go, Java, and more), while leveraging both serverless and container tools provided by the cloud providers to build, test, and deploy them. In this blog, I am assuming that you have the familiarity or, better yet, the experience to write Lambda or Google Cloud Functions. If you are not familiar with them, then I’d encourage you to refer to the documentation offered by AWS or GCP to help write your own serverless functions.

The following is a simple example to walk you through how I built a streaming pipeline to stream Aerospike Change Notification (CN) events, which are Change Data Capture or CDC-like, to AWS SQS. As depicted in figure 1, ESP Connector converts the CN events from the XDR protocol to HTTP POST requests. The change notification payload can be serialized into a text format, such as JSON, or a binary format, such as Avro or MessagePack, via a configuration for efficient data exchange. The HTTP request is forwarded to an API Gateway (or an Application load balancer), which triggers the Lambda function. The Lambda function subsequently writes the CN payload to AWS SQS. Optionally, you can persist state information or the raw CN payload in the Aerospike database via the Aerospike REST Gateway, for future use. I’d like to remind you that though SQS was chosen as the final destination here, you can extend this pipeline to support any AWS big data service that Lambda is integrated with.

serverless-d3556948442b1718772277961fe45b74

Figure 1 - Serverless Processing data pipeline

Operationalizing the Streaming Pipeline

Step 1: Install and configure the ESP Connector

Follow the ESP Connector documentation for installing and setting up the ESP connector for receiving change notifications from the Aerospike database. Please be aware that you will need the change notification key in your existing Aerospike cluster that you can request here or download the full-featured single-node Aerospike EE version which already has the key turned on for a seamless trial experience. If you are not familiar with the Aerospike XDR configuration, you can refer to it here. Below is the sample XDR stanza which can be used for this example:

xdr {
  dc esp_outbound {
    connector true
    namespace test {
    }
    node-address-port localhost 8080
  }
}

I’d highly recommend that you deploy multiple instances of the connector for fault tolerance and scale them out/in based on the load.

Below is the sample ESP outbound configuration for your reference:

# Change the configuration for your use case.
#
# Refer to https://docs.aerospike.com/docs/connect/streaming-from-asdb/configuring/from-asdb-to-esp-configuring.html
# for details.
# The connector's listening ports, TLS and network interface.
service:
  ports:
    - 8080
  manage:
    ports: 8081
# The logging properties.
logging:
  file: /var/log/aerospike-esp-outbound/aerospike-esp-outbound.log
# ESP HTTP destinations.
destinations:
  aws-lambda:
    urls:
      - https://myapp.execute-api.ap-south-1.amazonaws.com
    max-connections-per-endpoint: 100
    call-timeout: 10000
    connect-timeout: 2000
    health-check:
      call-timeout: 3000
      interval-seconds: 5000
# Aerospike record routing to destination(s).
routing:
  mode: static-multi-destination
  destinations:
    - aws-lambda
format: json

Notice that a destination called “aws-lambda” was defined under the destinations section and the URL of the AWS API Gateway was specified under urls. This destination is subsequently added to the routing section. You also have the option to specify multiple destinations. See here for more information on the above configuration.

Step 2: Configure a Lambda Invocation

Follow the instructions detailed here to configure the API Gateway for triggering the Lambda Function. Alternatively, you could also configure an application load balancer to trigger Lambda Function as detailed in this blog.

AWS REST API Gateway doesn’t convert binary format to base64 by default. Follow this blog to set it up. In our example code below, we are mapping the payload to the “body” param. AWS HTTP API Gateway converts binary format to base64 by default so no action is required there. Both REST and HTTP API have different payload formats. I have followed the HTTP API format in the code below so we need to map the incoming record’s payload to our desired format based on the “Content-Type” if we choose to use the REST API as shown in figure 2 below. The left side shows an example of binary data type mapping and the right side shows the mapping of textual format like JSON. If you choose the HTTP API, set the “Payload format version” to “1.0” to follow this example.

rest-api-9b32607e18e95ac027c933a3cbad8adf

Figure 2 - REST API data type mapping for Lambda Function

Step 3: Trigger a Lambda Function

The simple Java code below will forward incoming ESP records to SQS. You would have to include SQS and Lambda dependencies in your project. Note that I haven’t specified the AWS credentials explicitly in this example. However, I’d encourage you to follow this documentation to learn how to secure your implementation.

package com.aerospike.connect.esp;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
public class LambdaHandlerJava implements
       RequestHandler<Map<String, Object>, Map<String, Object>> {
   private static final ObjectMapper objectMapper = new ObjectMapper();
   private static final String AWS_SQS_NAME = "esp.fifo";
   @Override
   public Map<String, Object> handleRequest(Map<String, Object> input,
                                            Context context) {
       Object rawInput = input.get("body");
       String inputBody = rawInput != null ? rawInput.toString() : "";
       // We need to check for the POST request as ESP connector will perform
       // health-check at the configured interval, and we don't need to push
       // that to SQS.
       if (!input.get("httpMethod").toString().trim().equals("POST")) {
           return getResponse(inputBody);
       }
       pushToSQS(rawInput);
       return getResponse(inputBody);
   }
   private void pushToSQS(Object rawInput) {
       AmazonSQS sqs =
               AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
                       .build();
       String queueUrl = sqs.getQueueUrl(AWS_SQS_NAME).getQueueUrl();
       SendMessageRequest sendMsgRequest;
       try {
           sendMsgRequest = new SendMessageRequest()
                   .withQueueUrl(queueUrl)
                   .withMessageGroupId(LambdaHandlerJava.AWS_SQS_NAME)
                   .withMessageBody(getMessageBody(rawInput));
       } catch (JsonProcessingException e) {
           throw new RuntimeException(e);
       }
       sqs.sendMessage(sendMsgRequest);
   }
   private String getMessageBody(Object rawInput)
           throws JsonProcessingException {
       // REST API sends the data in object format whereas, an HTTP API
       // sends the String.
       return rawInput instanceof String ? (String) rawInput :
               objectMapper.writeValueAsString(rawInput);
   }
   private Map<String, Object> getResponse(String inputBody) {
       Map<String, Object> response = new HashMap<>();
       response.put("statusCode", 200);
       response.put("body", inputBody);
       response.put("isBase64Encoded", false);
       response.put("headers", emptyMap());
       return response;
   }
}

Once your code is ready, create an Uber jar of your project so that it bundles all the dependencies, and proceed to the AWS console for the remaining setup:

  1. On the Lambda homepage, follow the below steps:

    1. Hit the Create function button

    2. Select Author from scratch option

    3. In the Basic information section

      1. Enter the function name of your choice

      2. In the Runtime dropdown, select the Java 11 or whichever latest version of Java it offers

      3. Leave the other opinion to their default and hit the Create Function button

  2. The previous step will redirect you to your newly created function page. In the Code tab:

    1. In the Code Source section. Hit the Upload from dropdown and select the

      .zip or .jar file option and upload your Uber jar.

    2. In the Runtime settings section, click on Edit, and enter the Fully qualified Java class name followed by ‘::handleRequest’ in the Handler input box. It should be com.aerospike.connect.esp.LambdaHandlerJava::handleRequest as per the above example code. Leave the other fields as is and save your changes.

Once saved, your Lambda function’s homepage should look like Figure 3. Note that “esp-outbound” is the name of the Lambda function that I have used.

aws-lambda-e9523a33a08b1fc4dc02bf1943016699

Figure 3 - AWS Lambda console

I’d encourage you to refer to this blog to understand the performance limitations of Lambda vis-a-vis concurrency limits, start time, and the tricks to write performant functions.

Step 4: Set up SQS

I have used the FIFO queue in SQS. Although there are several minute configuration options available for SQS, none are required for this example. You can choose to follow the basic steps to set up your queue from the AWS console. Once done, the homepage of your FIFO queue should look like figure 4.

aws-sqs-fc9e895069689f6f6607fbb9c019de16

Figure 4 - AWS SQS console

Step 5: Test the pipeline

Now, this is where the rubber meets the road. In the Aerospike AQL window, type the following commands to trigger an INSERT change notification message as shown in figure 5.

insert-a703b2f0ff53f1c6208158c5a5ea3b8c

Figure 5 - Inserting data into Aerospike to trigger an INSERT change notification

Viola! The message shows up in the SQS console. Look at Messages available under Receive messages in figure 6.

change-notification-3110029be0284c10ac9982ae481de790

Figure 6 - Change Notification message is received

Seeing is believing! As seen in figure 7, you can verify that the message type is “write” and the “name” bin contains the value “Joe” that you had inserted earlier via the AQL session.

message-received-6b2e7e8980fa71d64b243a0a42800510

Figure 7 - Received message validation

Wear your Operations hat and monitor the CloudWatch metrics for AWS Lambda to observe the throughput of Lambda function invocation as shown in figure 8.

monitoring-77b54ed88652c76593908810a47c904b

Figure 8 - Monitoring in action for AWS Lambda

Additional Capabilities

A few additional capabilities that you can leverage to make your streaming pipeline robust:

  • LUT (last-update-time) of the records are shipped as metadata along with the message, so consider using it for ordering messages in your downstream application.

  • XDR’s at-least-once delivery guarantee is extended by the ESP connector to limit message loss. So even if the connector instance goes down or the link between the connector and XDR is down, your pipeline will still be able to recover.

  • ESP Connector can be used in conjunction with XDR filter expressions in the server to filter out records before transmission. For example, you could filter out sensitive records before streaming them downstream.

  • HTTPS is supported, hence you can choose to secure the ingress and egress traffic from the connector.

  • Multiple destinations can be configured so that you can simultaneously stream to multiple destinations using the same ESP cluster.

  • Text serialization format, such as JSON or FlatJSON, or a binary format, such as Avro or MessagePack, are available for efficient data exchange.

  • The option to add custom headers to each HTTP request is also available.

  • Monitoring the health of the ESP connector via Prometheus/Grafana as depicted in figure 9 is supported. See Prometheus integration for more details.

monitoring-esp-75919769b26a078f0273bab728412e1c

Figure 9 - Monitoring in action for the ESP Connector

Potential Use Cases

Here are a few potential use cases that can help spark your imagination:

  • Back up Aerospike data to S3 or Google Cloud Storage for archival purposes.

  • Stream Aerospike data to AWS SageMaker or Google AI to help you build cloud-native AI/ML pipelines with the best of breed AI/ML services.

  • Encrypt Aerospike data for compliance using AWS KMS, before ingesting it into your data lake.

  • Write Aersopike data to Kinesis Data Streams for real-time analytics use cases such as real-time dashboards, anomaly detection, dynamic pricing, and many more.

The Aerospike database will help you realize the above streaming pipelines with low latency and high throughput. If you have any other interesting use case in mind that Aerospike and serverless functions can bring to life in AWS or GCP, then wait no more. Please download Connect for ESP, full-featured single-node Aerospike Database Enterprise Edition, and refer to our documentation for a seamless trial experience.

Happy streaming, Serverless style!

I would like to thank Kiran Matty for co-authoring the blog, Joe Martin, and Ashish Shinde for reviewing it.