Skip to content
Visit booth 3171 at Google Cloud Next to see how to unlock real-time decisions at scaleMore info

Message transformer for XDR Proxy

The Message Transformer allows you to write custom code that reads incoming XDR Change Notification Records, performs Aerospike operations or other transformations on them, and converts them into AerospikeRecordOperation objects. You can develop your code by using the Java SDK, then bundle it as a .jar file. You then make the .jar file available to the XDR Proxy using the classpath, along with associated parameters that are specified in the configuration file. Your custom code can be plugged into the XDR Proxy for rapid integration.

Example use cases

  • Perform complex operations on Maps or Lists on XDR CNR (change notification record) before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.
  • Filter messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.
  • Create Aerospike records with bins generated by tweaking XDR CNR (change notification record) values. You can even extend your message transformer to create Aerospike keys.

What does it not do?

  • The message transformer is not meant for heavy-weight processing or calling external APIs.
  • It does not support transactions where several commands isolated from commands outside the transaction are executed atomically. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.

Developing a message transformer

Add the Maven SDK dependencies

Add the following dependencies:

<dependencies>
<!-- Aerospike Inbound SDK -->
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-inbound-sdk</artifactId>
<version>1.2.0</version>
<scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->
</dependency>
<!-- Aerospike Outbound SDK -->
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-outbound-sdk</artifactId>
<version>2.2.0</version>
<scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->
</dependency>
<!-- Javax inject annotations -->
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
<scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.33</version>
<scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->
</dependency>
</dependencies>

Implement the InboundMessageTransformer interface

The custom message transformer should implement InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>>

The WritePolicy passed in the InboundMessage to the custom message transformer is as follows:

Write Policy fieldDescription
recordExistsActionSet to the value shipped by XDR
generationPolicySet to the value shipped by XDR
generationSet to the value shipped by XDR
expirationSet to the value shipped by XDR
socketTimeoutSet to the value configured in the aerospike section of the YAML config
totalTimeoutSet to the value configured in the aerospike section of the YAML config
xdrSet to true

You have the option to inject the following objects in your message-transformer class using Java Dependency Injection.

ClassUsage
AerospikeReaderAn object to read a record from the Aerospike Database.
InboundMessageTransformerConfigThe custom parameters provided in the configuration file as params.

Thread safety

  • If you annotate your implementation with @Singleton, it has to be thread safe because one instance can be used by multiple threads.
  • If you do not annotate your implementation with @Singleton, a new instance of your message transformer is created for every incoming message.

Configure the XDR Proxy to use your message transformer

Include the message-transformer stanza at the global, namespace, or set scope.

Example

# Message transformer at global scope.
message-transformer:
class: com.aerospike.GlobalScopeTransformer
params:
color: RED
namespaces:
milkyWay:
# Message transformer at namespace "milkyWay" scope.
message-transformer:
class: com.aerospike.TestNamespaceTransformer
params:
color: BLUE
sets:
solarSystem:
# Message transformer at set "solarSystem" scope.
message-transformer:
class: com.aerospike.TestNamespaceTransformer
params:
color: YELLOW

Deploy your message transformer

Deploy your .jar file, along with any dependencies it might have, by copying it to the /opt/aerospike-xdr-proxy/usr-lib/ folder of the directory where the Aerospike XDR Proxy is installed.

Example 1: Add new bins to records

import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;
import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;
import com.aerospike.connect.inbound.operation.AerospikePutOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Add bins specified in the YAML config to the incoming records.
*/
class XdrProxyAddBinsCustomTransformer implements
InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {
private final static Logger logger =
LoggerFactory.getLogger(XdrProxyAddBinsCustomTransformer.class.getName());
private final InboundMessageTransformerConfig config;
@Inject
public XdrProxyAddBinsCustomTransformer(
InboundMessageTransformerConfig config) {
this.config = config;
}
@Override
public AerospikeRecordOperation transform(
InboundMessage<Key, ChangeNotificationRecord> input) {
// Do not transform a delete.
if (input.getMessage().getMetadata().getOperation().isDelete()) {
logger.debug("key={} DELETE operation", input.getKey().get());
return new AerospikeDeleteOperation(input.getKey().get(),
input.getWritePolicy().get());
}
logger.debug("key={} WRITE operation", input.getKey().get());
// Get the bins shipped by XDR. Original bins is unmodifiable.
Map<String, Object> bins = new HashMap<String, Object>(input.getMessage().getBins());
// Add values specified in YAML config to the bins.
bins.putAll(config.getTransformerConfig());
// Convert to list of bins.
List<Bin> aerospikeBins = bins.entrySet().stream().map(
bin -> new Bin(bin.getKey(), bin.getValue())
).collect(Collectors.toList());
// Write the record with the transformed bins.
return new AerospikePutOperation(input.getKey().get(),
input.getWritePolicy().get(), aerospikeBins);
}
}

Example 2: Selectively delete records

import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.connect.inbound.AerospikeReader;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;
import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;
import com.aerospike.connect.inbound.operation.AerospikePutOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import com.aerospike.connect.outbound.AerospikeOperation;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Selectively delete records based on generation.
*/
class XdrProxyDeleteCustomTransformer implements
InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {
private final static Logger logger =
LoggerFactory.getLogger(XdrProxyDeleteCustomTransformer.class.getName());
private final InboundMessageTransformerConfig config;
private final AerospikeReader aerospikeReader;
/**
* The generation key to pass the value in the YAML config.
*/
private final String GEN_KEY = "gen";
/**
* Default generation cut-off value of an old record.
*/
private final int DEFAULT_GEN_OLD = 100;
@Inject
public XdrProxyDeleteCustomTransformer(
AerospikeReader aerospikeReader,
InboundMessageTransformerConfig config) {
this.aerospikeReader = aerospikeReader;
this.config = config;
}
@Override
public AerospikeRecordOperation transform(
InboundMessage<Key, ChangeNotificationRecord> input) {
// Do not transform a write.
if (input.getMessage().getMetadata().getOperation() ==
AerospikeOperation.WRITE) {
logger.debug("key={} WRITE operation", input.getKey().get());
Map<String, Object> bins = input.getMessage().getBins();
List<Bin> aerospikeBins = new ArrayList<>(bins.size());
for (Map.Entry<String, Object> bin : bins.entrySet()) {
aerospikeBins.add(new Bin(bin.getKey(), bin.getValue()));
}
return new AerospikePutOperation(input.getKey().get(),
input.getWritePolicy().get(), aerospikeBins);
}
logger.debug("key={} DELETE operation", input.getKey().get());
Record record =
aerospikeReader.get(null, input.getKey().get());
int oldGen = (int) config.getTransformerConfig()
.getOrDefault(GEN_KEY, DEFAULT_GEN_OLD);
// Do not delete young records, skip shipping them.
if (record == null || record.generation < oldGen) {
return new AerospikeSkipRecordOperation();
}
// Only delete older records.
return new AerospikeDeleteOperation(input.getKey().get(),
input.getWritePolicy().get());
}
}
Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?