Skip to main content
Loading

Outbound Message Transformer

Aerospike Connect outbound transformer provides a plugin system in Aerospike outbound connectors to transform incoming XDR change notification records dispatched to the outbound destination with custom code.

Three transformers can be configured in any combination in the outbound connectors to apply to the incoming Aerospike change notification records dispatched by XDR:

  • Routing transformer: Route the record to the outbound destination.
  • Record transformer: Transform the contents of the Aerospike record. Transformations include:
    • Add bin
    • Delete bin
    • Change bin value
    • Change key
    • Change metadata (generation, last update time, expiry)
  • Format transformer: Transform the Aerospike record into a custom format written to the outbound destination.
  • BatchFormat transformer: Similar to Format transformer except it can receive multiple Aerospike records and can return one or more custom formatted records. You can use either formatter or batch-formatter.

As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level.

Develop a Custom Code Plugin

Add Maven dependency to Aerospike connect outbound SDK for all connectors except Elasticsearch. Elasticsearch requires dependency on Aerospike connect elasticsearch outbound SDK. Certain dependencies like Aerospike outbound SDK, Aerospike elasticsearch outbound SDK, Aerospike client, Lombok, Javax Inject, slf4j logger, Google protobuf, etc. are provided by the Aerospike connect runtime and need not be packaged with the plugin JAR.

note

Plugin should be compiled with the same/compatible Java version running the outbound connector.

Add the Maven dependency to desired outbound SDK during development:

<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-outbound-sdk</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>

For detailed examples see maven example and gradle example.

GitHub Repository

Both the SDKs are located at GitHub repo aerospike-connect-outbound-sdk as multi module project.

Custom Transformer Class

The custom transformer class should have a

  • Publicly visible no-args constructor or
  • Publicly visible constructor annotated with @Inject and having a single argument of type FormatterConfig for format transformer, RouterConfig for routing transformer, or TransformerConfig for record transformer.

Each configuration scope: global, namespace or set; runs within a separate injector scope. The custom class can be annotated with annotations regarding its lifetime Scope as follows

  • If the custom class is annotated with @Singleton, a single instance of the class is created during the lifetime of the outbound connector in the given scope: global, namespace or set. The singleton instance should be thread safe, since the same singleton instance can be accessed across multiple threads concurrently.
  • If the custom class is not annotated with @Singleton, a new instance of the class is created for every invocation of the transformer.

Deploy a Custom Code Plugin

Generate the JARs for the custom code plugin with all dependencies shaded to avoid class path conflicts with the connectors. Copy all the JARs to the /opt/aerospike-<outbound-connector>/usr-lib directory before starting the outbound connector.

note

It is recommended to shade all the custom plugin dependencies to avoid class path conflicts with the outbound connectors.

Change Notification Record

The change notification record is a representation of the record shipped by XDR to the outbound connectors. It is passed as input to the transformers.

In the change notification record passed as input to the Formatter, Router, or Transformer the following types in top-level bins, maps, and lists are represented as follows:

  • Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob and HLL types are stored as BytesValue with the corresponding ParticleType.
  • GeoJson types are stored as GeoJSON types.
  • Plain byte arrays are stored as BytesValue with Blob particle type.

Routing Transformer

Routing transformer transforms the incoming XDR change notification record into an outbound destination route. The route for each outbound destination is defined in the table below.

Outbound DestinationRoute description
ESP (Event Stream Processing)One of the destinations configured in the ESP config YAML.
JMSJMS queue/topic type and name.
KafkaKafka topic name.
PulsarPulsar topic name.
ElasticsearchElasticsearch index name.
note

If you configure a record transformer and a routing transformer for the same change notification, the record transformer is applied first. The routing transformer is then applied to the record transformer, NOT to the original record.

Routing Transformer API

Any custom code should implement the Router interface to route incoming records.

Routing Transformer Skip Record

To skip a record from dispatch to the outbound destination, the custom routing transformer should return a OutboundRoute with OutboundRouteType set to SKIP.

Routing Transformer Configuration

ParameterRequiredDefault ValueDescription
modeYesShould be custom.
classYes-Fully-qualified name of the class implementing the routing interface.
paramsNoEmpty mapParams passed to the custom code constructor as an unmodifiable map.

As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.

Different routing transformers can be configured for different namespaces, sets, etc.

Routing Transformer Example

This ESP example routes to the destination based on the generation number.

...

destinations:
young:
...
old:
...

namespaces:
namespaceA:
routing:
mode: custom
class: com.aerospike.connect.outbound.transformer.examples.esp.EspGenerationRouter
params:
genNumber: 100
package com.aerospike.connect.outbound.transformer.examples.esp;


import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.routing.OutboundRoute;
import com.aerospike.connect.outbound.routing.Router;
import com.aerospike.connect.outbound.routing.RouterConfig;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;
import java.util.Optional;

@Singleton
public class EspGenerationRouter implements Router<String> {
private final static Logger logger =
LoggerFactory.getLogger(EspGenerationRouter.class.getName());

/**
* Params set in the config.
*/
private final Map<String, Object> configParams;

@Inject
public EspGenerationRouter(RouterConfig routerConfig) {
this.configParams = routerConfig.getParams();
}

@Override
public OutboundRoute<String> getRoute(
@NonNull ChangeNotificationRecord record) {
// Record generation is not shipped by Aerospike XDR versions before
// v5.0.0.
Optional<Integer> generation = record.getMetadata().getGeneration();

// Destinations young and old are to be configured in the
// "destinations" section of the ESP config.
//
// "genNumber" is to be set in params option of the ESP routing config.

if (generation.isPresent() &&
generation.get() > (int) configParams.get("genNumber")) {
logger.debug("Routing record {} to old", record.getMetadata().getKey());
return OutboundRoute.newEspRoute("old");
}

logger.debug("Routing record {} to young", record.getMetadata().getKey());
return OutboundRoute.newEspRoute("young");
}
}

Record Transformer

Record transformer transforms the incoming change notification record. Transformations include:

  • Add bin
  • Delete bin
  • Change bin value
  • Change key
  • Change metadata (generation, last update time, expiry)

Record Transformer API

Custom code should implement the Transformer interface to transform incoming records.

The bins returned by the custom transformer can be plain Java objects or Aerospike client Value types. If the transformed record is formatted with one of these built-in outbound formats: FlatJSON, JSON, and MessagePack; the differences in their formatting is as mentioned below

  • GeoJSON string:
    • If represented as a Java string it is formatted as plain string in JSON, FlatJSON, and MessagePack outbound format.
    • If represented as a GeoJSONValue Aerospike client value type, it is formatted as:
      • a JSON object in JSON and FlatJSON outbound format
      • a bin with GeoJSON type in top level bin, a MessagePack ext format type (with ext type equal to GeoJSON) in maps and lists; in MessagePack outbound format.
  • Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob, and HLL types:
    • If represented by Java byte array it is formatted as:
      • a Base64 encoded string in JSON and FlatJSON outbound format
      • a Blob type in top level bin, a binary value in maps and lists; in MessagePack outbound format.
    • If represented as a BytesValue it is formatted as:
      • a Base64 encode string in JSON and FlatJSON outbound format
      • a bin with the exact type in top level bin, a MessagePack ext format type (with ext type equal to the type) in maps and lists; in MessagePack outbound format

Record Transformer Skip Record

To skip a record from dispatch to the outbound destination, the custom record transformer should return a SkipChangeNotificationRecord.

Record Transformer Configuration

ParameterRequiredDefault ValueDescription
classYes-Fully-qualified name of the class implementing the transformer interface.
paramsNoEmpty mapParams passed to the custom code constructor as an unmodifiable map.

As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.

Different record transformers can be configured for different namespaces, sets, etc.

Record Transformer Example

This Pulsar example adds a bin and updates generation of the change notification record.

...

namespaces:
namespaceA:
custom-transformer:
class: com.aerospike.connect.outbound.transformer.examples.pulsar.PulsarRecordTransformer
params:
bins:
colour: RED
shade: light
package com.aerospike.connect.outbound.transformer.examples.pulsar;

import com.aerospike.connect.outbound.ChangeNotificationMetadata;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.transformer.Transformer;
import com.aerospike.connect.outbound.transformer.TransformerConfig;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.HashMap;
import java.util.Map;

@Singleton
public class PulsarRecordTransformer implements Transformer {
private final static Logger logger =
LoggerFactory.getLogger(PulsarRecordTransformer.class.getName());

/**
* Params set in the config.
*/
private final Map<String, Object> configParams;

@Inject
public PulsarRecordTransformer(TransformerConfig transformerConfig) {
this.configParams = transformerConfig.getParams();
}

@Override
public ChangeNotificationRecord transform(
@NonNull ChangeNotificationRecord record) {
// Increment generation in metadata.
Integer generation = record.getMetadata().getGeneration().isPresent() ?
record.getMetadata().getGeneration().get() + 1 : null;

logger.debug("Updated generation of record {} to {}", record.getMetadata().getKey(),
generation);

ChangeNotificationMetadata metadata =
new ChangeNotificationMetadata(record.getMetadata().getKey(),
record.getMetadata().getOperation(), generation,
record.getMetadata().getLastUpdateTimeMs().orElse(null),
record.getMetadata().getExpiryTime().orElse(null),
record.getMetadata().getRecordExistsAction(),
record.getMetadata().getGenerationPolicy());

// record.getBins() is immutable, create a copy.
Map<String, Object> bins = new HashMap<>(record.getBins());

// Add bins passed as params in config.
if (configParams.containsKey("bins")) {
@SuppressWarnings("unchecked") Map<String, Object> paramBins =
(Map<String, Object>) configParams.get("bins");
bins.putAll(paramBins);
}

return new ChangeNotificationRecord(metadata, bins);
}
}

Format Transformer

Format transformer formats the incoming change notification record into a custom format, which is dispatched to the outbound destination.

note

If you configure a record transformer and a format transformer for the same change notification, the record transformer is applied first. The format transformer is then applied to the record transformer, NOT to the original record.

The format transformer can also be configured to get the change notification record formatted as one of the built-in outbound formats (Avro, MessagePack, etc), as payload in the formattedRecord argument.

Format Transformer API

Custom code should implement the Formatter interface to format incoming records. The formatter is passed a record with metadata as per the outbound destination: EspOutboundMetadata, JmsOutboundMetadata, KafkaOutboundMetadata, PulsarOutboundMetadata and ElasticsearchOutboundMetadata.

The BytesOutboundRecord type is returned by all custom formatters. In JMS outbound destination if the return type is TextOutboundRecord the record is sent as JMS text message. All other return types are sent as JMS bytes message.

Format Transformer Skip Record

To skip a record from dispatch to the outbound destination, the custom format transformer should return a SkipOutboundRecord.

Format Transformer Configuration

ParameterRequiredDefault ValueDescription
modeYesShould be custom.
classNo-Fully-qualified name of the class implementing the Formatter interface. Either class or batch-formatter-class should be specified.
batch-formatter-classNo-Fully-qualified name of the class implementing the BatchFormatter interface. Either class or batch-formatter-class should be specified.
paramsNoEmpty mapParameters passed to the custom code as an unmodifiable map.
payload-formatNonullThe configuration item to format the payload passed to the custom code. If configured as null the payload is null. It can be configured with any of the built-in formats: Avro, FlatJSON, etc. as specified in supported formats supported formats

Format Transformer Example

An example format transformer for JMS outbound destination.

...

namespaceA:
format:
mode: custom
class: com.aerospike.connect.outbound.transformer.examples.jms.JmsKeyValueFormatter
params:
separator: ":"
package com.aerospike.connect.outbound.transformer.examples.jms;

import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.format.DefaultTextOutboundRecord;
import com.aerospike.connect.outbound.format.Formatter;
import com.aerospike.connect.outbound.format.FormatterConfig;
import com.aerospike.connect.outbound.format.MediaType;
import com.aerospike.connect.outbound.format.OutboundRecord;
import com.aerospike.connect.outbound.jms.JmsOutboundMetadata;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;

@Singleton
public class JmsKeyValueFormatter implements Formatter<JmsOutboundMetadata> {
private final static Logger logger =
LoggerFactory.getLogger(JmsKeyValueFormatter.class.getName());

/**
* Params set in the config.
*/
private final Map<String, Object> configParams;

@Inject
public JmsKeyValueFormatter(FormatterConfig formatterConfig) {
configParams = formatterConfig.getParams();
}

@Override
public OutboundRecord<JmsOutboundMetadata> format(
@NonNull ChangeNotificationRecord record,
@NonNull OutboundRecord<JmsOutboundMetadata> formattedRecord) {
logger.debug("Formatting record {}", record.getMetadata().getKey());

// Only write string bins.
StringBuilder payloadBuilder = new StringBuilder();
String separator =
(String) configParams.getOrDefault("separator", ":");
for (Map.Entry<String, Object> bin : record.getBins().entrySet()) {
if (bin.getValue() instanceof String) {
payloadBuilder.append(bin.getKey());
payloadBuilder.append(separator);
payloadBuilder.append(bin.getValue());
payloadBuilder.append(System.lineSeparator());
}
}

return new DefaultTextOutboundRecord<>(
payloadBuilder.toString().getBytes(), MediaType.OCTET_STREAM,
formattedRecord.getMetadata());
}
}

Examples

For more examples see examples in the SDK Github repo.