Message transformer for XDR Proxy
The Message Transformer allows you to write custom code that reads incoming XDR Change Notification Records, performs Aerospike operations or other transformations on them, and converts them into AerospikeRecordOperation objects. You can develop your code by using the Java SDK, then bundle it as a .jar file. You then make the .jar file available to the XDR Proxy using the classpath, along with associated parameters that are specified in the configuration file. Your custom code can be plugged into the XDR Proxy for rapid integration.
Example use cases
- Perform complex operations on Maps or Lists on XDR CNR (change notification record) before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.
- Filter messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.
- Create Aerospike records with bins generated by tweaking XDR CNR (change notification record) values. You can even extend your message transformer to create Aerospike keys.
What does it not do?
- The message transformer is not meant for heavy-weight processing or calling external APIs.
- It does not support transactions where several commands isolated from commands outside the transaction are executed atomically. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.
Developing a message transformer
Add the Maven SDK dependencies
Add the following dependencies:
<dependencies> <!-- Aerospike Inbound SDK --> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-connect-inbound-sdk</artifactId> <version>1.2.0</version> <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime --> </dependency>
<!-- Aerospike Outbound SDK --> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-connect-outbound-sdk</artifactId> <version>2.2.0</version> <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime --> </dependency>
<!-- Javax inject annotations --> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> <version>1</version> <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime --> </dependency>
<!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.33</version> <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime --> </dependency></dependencies>
Implement the InboundMessageTransformer interface
The custom message transformer should implement InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>>
The WritePolicy passed in the InboundMessage
to the custom message transformer is as follows:
Write Policy field | Description |
---|---|
recordExistsAction | Set to the value shipped by XDR |
generationPolicy | Set to the value shipped by XDR |
generation | Set to the value shipped by XDR |
expiration | Set to the value shipped by XDR |
socketTimeout | Set to the value configured in the aerospike section of the YAML config |
totalTimeout | Set to the value configured in the aerospike section of the YAML config |
xdr | Set to true |
You have the option to inject the following objects in your message-transformer
class using Java Dependency Injection.
Class | Usage |
---|---|
AerospikeReader | An object to read a record from the Aerospike Database. |
InboundMessageTransformerConfig | The custom parameters provided in the configuration file as params . |
Thread safety
- If you annotate your implementation with @Singleton, it has to be thread safe because one instance can be used by multiple threads.
- If you do not annotate your implementation with @Singleton, a new instance of your message transformer is created for every incoming message.
Configure the XDR Proxy to use your message transformer
Include the message-transformer
stanza at the global, namespace, or set scope.
Example
# Message transformer at global scope.message-transformer: class: com.aerospike.GlobalScopeTransformer params: color: RED
namespaces: milkyWay: # Message transformer at namespace "milkyWay" scope. message-transformer: class: com.aerospike.TestNamespaceTransformer params: color: BLUE sets: solarSystem: # Message transformer at set "solarSystem" scope. message-transformer: class: com.aerospike.TestNamespaceTransformer params: color: YELLOW
Deploy your message transformer
Deploy your .jar
file, along with any dependencies it might have, by copying it to the
/opt/aerospike-xdr-proxy/usr-lib/
folder of the directory where the Aerospike XDR Proxy
is installed.
Example 1: Add new bins to records
import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.connect.inbound.InboundMessageTransformer;import com.aerospike.connect.inbound.model.InboundMessage;import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;import com.aerospike.connect.inbound.operation.AerospikePutOperation;import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;import com.aerospike.connect.outbound.ChangeNotificationRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import javax.inject.Inject;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;
/** * Add bins specified in the YAML config to the incoming records. */class XdrProxyAddBinsCustomTransformer implements InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> { private final static Logger logger = LoggerFactory.getLogger(XdrProxyAddBinsCustomTransformer.class.getName());
private final InboundMessageTransformerConfig config;
@Inject public XdrProxyAddBinsCustomTransformer( InboundMessageTransformerConfig config) { this.config = config; }
@Override public AerospikeRecordOperation transform( InboundMessage<Key, ChangeNotificationRecord> input) { // Do not transform a delete. if (input.getMessage().getMetadata().getOperation().isDelete()) { logger.debug("key={} DELETE operation", input.getKey().get()); return new AerospikeDeleteOperation(input.getKey().get(), input.getWritePolicy().get()); }
logger.debug("key={} WRITE operation", input.getKey().get());
// Get the bins shipped by XDR. Original bins is unmodifiable. Map<String, Object> bins = new HashMap<String, Object>(input.getMessage().getBins());
// Add values specified in YAML config to the bins. bins.putAll(config.getTransformerConfig());
// Convert to list of bins. List<Bin> aerospikeBins = bins.entrySet().stream().map( bin -> new Bin(bin.getKey(), bin.getValue()) ).collect(Collectors.toList());
// Write the record with the transformed bins. return new AerospikePutOperation(input.getKey().get(), input.getWritePolicy().get(), aerospikeBins); }}
Example 2: Selectively delete records
import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.Record;import com.aerospike.connect.inbound.AerospikeReader;import com.aerospike.connect.inbound.InboundMessageTransformer;import com.aerospike.connect.inbound.model.InboundMessage;import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;import com.aerospike.connect.inbound.operation.AerospikePutOperation;import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;import com.aerospike.connect.outbound.AerospikeOperation;import com.aerospike.connect.outbound.ChangeNotificationRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import javax.inject.Inject;import java.util.ArrayList;import java.util.List;import java.util.Map;
/** * Selectively delete records based on generation. */class XdrProxyDeleteCustomTransformer implements InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> { private final static Logger logger = LoggerFactory.getLogger(XdrProxyDeleteCustomTransformer.class.getName());
private final InboundMessageTransformerConfig config; private final AerospikeReader aerospikeReader;
/** * The generation key to pass the value in the YAML config. */ private final String GEN_KEY = "gen";
/** * Default generation cut-off value of an old record. */ private final int DEFAULT_GEN_OLD = 100;
@Inject public XdrProxyDeleteCustomTransformer( AerospikeReader aerospikeReader, InboundMessageTransformerConfig config) { this.aerospikeReader = aerospikeReader; this.config = config; }
@Override public AerospikeRecordOperation transform( InboundMessage<Key, ChangeNotificationRecord> input) { // Do not transform a write. if (input.getMessage().getMetadata().getOperation() == AerospikeOperation.WRITE) { logger.debug("key={} WRITE operation", input.getKey().get());
Map<String, Object> bins = input.getMessage().getBins(); List<Bin> aerospikeBins = new ArrayList<>(bins.size()); for (Map.Entry<String, Object> bin : bins.entrySet()) { aerospikeBins.add(new Bin(bin.getKey(), bin.getValue())); } return new AerospikePutOperation(input.getKey().get(), input.getWritePolicy().get(), aerospikeBins); }
logger.debug("key={} DELETE operation", input.getKey().get());
Record record = aerospikeReader.get(null, input.getKey().get()); int oldGen = (int) config.getTransformerConfig() .getOrDefault(GEN_KEY, DEFAULT_GEN_OLD);
// Do not delete young records, skip shipping them. if (record == null || record.generation < oldGen) { return new AerospikeSkipRecordOperation(); }
// Only delete older records. return new AerospikeDeleteOperation(input.getKey().get(), input.getWritePolicy().get()); }}