Message Transformer for Aerospike Connect for Pulsar
The Message Transformer allows you to write custom code that reads incoming Pulsar messages, performs Aerospike operations or other transformations on them, and converts them into AerospikeRecordOperation objects. You can develop your code by using the Java SDK, and then bundle it as a .jar
file. You then make the .jar
file available to the Aerospike Pulsar Inbound Connector using the classpath, along with associated parameters that are specified in the configuration file. Your custom code is easily pluggable into the connector for rapid integration.
Example use casesโ
Performing complex operations on maps or lists in Pulsar messages before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.
Filtering messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.
Creating Aerospike records with bins generated by tweaking Pulsar message key-value pairs. You can even extend your message transformer to create Aerospike keys.
What does it not do?โ
- Itโs not meant for heavy-weight processing or calling external(outside the Aerospike Pulsar Inbound Connector) APIs. Consider using Apache Spark for these use cases.
- It does not support multi-record transactions. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.
- It does not transform messages outbound from Aerospike to Pulsar. Consider using XDR filtering for such messages.
Developing a message transformerโ
Add the Maven SDK dependencyโ
The dependency looks like this:
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-inbound-sdk</artifactId>
<version>1.2.0</version>
</dependency>
The Maven repository is here.
An example pom.xml
file that includes this dependency is here.
Implement the InboundMessageTransformer interfaceโ
There are two different ways to implement the interface:
Using InboundMessage<K, M>โ
The generic type K represents your nullable Pulsar record key class. M is the Pulsar Record<GenericRecord>. The fields of the Pulsar message are sent as a map. The key and the Record<GenericRecord> are sent separately.
You have the option to inject the following objects in your message-transformer class using Java Dependency Injection.
Class | Usage |
---|---|
AerospikeReader | An object to read a record from the Aerospike Database. |
InboundMessageTransformerConfig | The custom parameters provided in the configuration file as params . |
Using Pulsar Record<GenericRecord>.โ
Here you will get Record<GenericRecord> directly in your transformer class and can access fields by calling getField method of GenericRecord.
Thread safetyโ
- If you annotate your implementation with @Singleton, it has to be thread safe because one instance can be used by multiple threads.
- If you do not annotate your implementation with @Singleton, a new instance of your message transformer is created for every incoming message.
Configure the connector to use your message transformerโ
Include the message-transformer
stanza in the topics
stanza in the aerospike-pulsar-inbound.yml
file.
Here is an example:
topics:
users:
.....
message-transformer:
class: com.aerospike.connect.inbound.CasCDTCustomTransformer
params:
cdtListLimit: 10
fieldNamePrefix: 'cdt_impl'
The stanza takes these parameters:
Parameter | Required | Description |
---|---|---|
class | Yes | Fully-qualified name of the class that implements InboundMessageTransformer. |
params | No | Additional parameters that you want to use in your implementation. You need to inject InboundMessageTransformerConfig in your message-transformer class to set up your parameters. |
Deploy your message transformerโ
Deploy your .jar
file, along with any dependencies it might have, by copying it to the /bundled-dependencies
folder of the directory where the Aerospike Pulsar inbound connector is installed. The path should look like narExtractionDirectory/pulsar-nar/pulsar-io-aerospike.nar-unpacked/META-INF/bundled-dependencies
.
Look through example transformersโ
See here for a few examples of Message Transformers for Aerospike.