Outbound message transformer
Aerospike Connect outbound transformer is a plugin system in Aerospike outbound connectors to transform incoming XDR change notification records dispatched to the outbound destination with custom code.
Three transformers can be configured in any combination in the outbound connectors to apply to the incoming Aerospike change notification records dispatched by XDR:
- Routing transformer: Route the record to the outbound destination.
- Record transformer: Transform the contents of the Aerospike record. Transformations include:
- Add bin
- Delete bin
- Change bin value
- Change key
- Change metadata (generation, last update time, expiry)
- Format transformer: Transform the Aerospike record into a custom format written to the outbound destination.
- BatchFormat transformer: Similar to
Format transformer
except it can receive multiple Aerospike records and can return one or more custom formatted records. You can use either formatter or batch-formatter.
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level.
Develop a Custom Code Plugin
Add Maven dependency to Aerospike connect outbound SDK for all connectors except Elasticsearch. Elasticsearch requires dependency on Aerospike connect elasticsearch outbound SDK. Certain dependencies like Aerospike outbound SDK, Aerospike elasticsearch outbound SDK, Aerospike client, Lombok, Javax Inject, slf4j logger, Google protobuf, etc. are provided by the Aerospike connect runtime and need not be packaged with the plugin JAR.
Add the Maven dependency to desired outbound SDK during development:
For detailed examples see maven example and gradle example.
GitHub Repository
Both the SDKs are located at GitHub repo aerospike-connect-outbound-sdk as multi module project.
Custom Transformer Class
The custom transformer class should have a
- Publicly visible no-args constructor or
- Publicly visible constructor annotated with
@Inject
and having a single argument of typeFormatterConfig
for format transformer,RouterConfig
for routing transformer, orTransformerConfig
for record transformer.
Each configuration scope: global, namespace or set; runs within a separate injector scope. The custom class can be annotated with annotations regarding its lifetime Scope as follows
- If the custom class is annotated with
@Singleton
, a single instance of the class is created during the lifetime of the outbound connector in the given scope: global, namespace or set. The singleton instance should be thread safe, since the same singleton instance can be accessed across multiple threads concurrently. - If the custom class is not annotated with
@Singleton
, a new instance of the class is created for every invocation of the transformer.
Deploy a Custom Code Plugin
Generate the JARs for the custom code plugin with all dependencies shaded to avoid
class path conflicts with the connectors. Copy all the JARs to the
/opt/aerospike-<outbound-connector>/usr-lib
directory before starting the
outbound connector.
Change Notification Record
The change notification record is a representation of the record shipped by XDR to the outbound connectors. It is passed as input to the transformers.
In the change notification record passed as input to the Formatter
, Router
, or
Transformer
the following types in top-level bins, maps, and lists are represented
as follows:
- Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob and HLL types are stored
as
BytesValue
with the correspondingParticleType
. - GeoJson types are stored as
GeoJSON
types. - Plain byte arrays are stored as
BytesValue
withBlob
particle type.
Routing Transformer
Routing transformer transforms the incoming XDR change notification record into an outbound destination route. The route for each outbound destination is defined in the table below.
Outbound Destination | Route description |
---|---|
ESP (Event Stream Processing) | One of the destinations configured in the ESP config YAML. |
JMS | JMS queue/topic type and name. |
Kafka | Kafka topic name. |
Pulsar | Pulsar topic name. |
Elasticsearch | Elasticsearch index name. |
Routing Transformer API
Any custom code should implement the Router
interface to route incoming records.
Routing Transformer Skip Record
To skip a record from dispatch to the outbound destination, the custom
routing transformer should return a OutboundRoute
with OutboundRouteType
set to SKIP
.
Routing Transformer Configuration
Parameter | Required | Default Value | Description |
---|---|---|---|
mode | Yes | Use custom . | |
class | Yes | - | Fully-qualified name of the class implementing the routing interface. |
params | No | Empty map | Params passed to the custom code constructor as an unmodifiable map. |
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.
Different routing transformers can be configured for different namespaces, sets, etc.
Routing Transformer Example
This ESP example routes to the destination based on the generation number.
...
destinations: young: ... old: ...
namespaces: namespaceA: routing: mode: custom class: com.aerospike.connect.outbound.transformer.examples.esp.EspGenerationRouter params: genNumber: 100
package com.aerospike.connect.outbound.transformer.examples.esp;
import com.aerospike.connect.outbound.ChangeNotificationRecord;import com.aerospike.connect.outbound.routing.OutboundRoute;import com.aerospike.connect.outbound.routing.Router;import com.aerospike.connect.outbound.routing.RouterConfig;import lombok.NonNull;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import javax.inject.Inject;import javax.inject.Singleton;import java.util.Map;import java.util.Optional;
@Singletonpublic class EspGenerationRouter implements Router<String> { private final static Logger logger = LoggerFactory.getLogger(EspGenerationRouter.class.getName());
/** * Params set in the config. */ private final Map<String, Object> configParams;
@Inject public EspGenerationRouter(RouterConfig routerConfig) { this.configParams = routerConfig.getParams(); }
@Override public OutboundRoute<String> getRoute( @NonNull ChangeNotificationRecord record) { // Record generation is not shipped by Aerospike XDR versions before // v5.0.0. Optional<Integer> generation = record.getMetadata().getGeneration();
// Destinations young and old are to be configured in the // "destinations" section of the ESP config. // // "genNumber" is to be set in params option of the ESP routing config.
if (generation.isPresent() && generation.get() > (int) configParams.get("genNumber")) { logger.debug("Routing record {} to old", record.getMetadata().getKey()); return OutboundRoute.newEspRoute("old"); }
logger.debug("Routing record {} to young", record.getMetadata().getKey()); return OutboundRoute.newEspRoute("young"); }}
Record Transformer
Record transformer transforms the incoming change notification record. Transformations include:
- Add bin
- Delete bin
- Change bin value
- Change key
- Change metadata (generation, last update time, expiry)
Record Transformer API
Custom code should implement the Transformer
interface to transform incoming records.
The bins returned by the custom transformer can be plain Java objects or Aerospike client
Value
types. If the transformed record is formatted with one of these built-in outbound
formats: FlatJSON, JSON, and MessagePack; the differences in their formatting is as mentioned
below
- GeoJSON string:
- If represented as a Java string it is formatted as plain string in JSON, FlatJSON, and MessagePack outbound format.
- If represented as a
GeoJSONValue
Aerospike client value type, it is formatted as:- a JSON object in JSON and FlatJSON outbound format
- a bin with GeoJSON type in top level bin, a MessagePack ext format type (with ext type equal to GeoJSON) in maps and lists; in MessagePack outbound format.
- Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob, and HLL types:
- If represented by Java byte array it is formatted as:
- a Base64 encoded string in JSON and FlatJSON outbound format
- a Blob type in top level bin, a binary value in maps and lists; in MessagePack outbound format.
- If represented as a
BytesValue
it is formatted as:- a Base64 encode string in JSON and FlatJSON outbound format
- a bin with the exact type in top level bin, a MessagePack ext format type (with ext type equal to the type) in maps and lists; in MessagePack outbound format
- If represented by Java byte array it is formatted as:
Record Transformer Skip Record
To skip a record from dispatch to the outbound destination, the custom
record transformer should return a SkipChangeNotificationRecord
.
Record Transformer Configuration
Parameter | Required | Default Value | Description |
---|---|---|---|
class | Yes | - | Fully-qualified name of the class implementing the transformer interface. |
params | No | Empty map | Params passed to the custom code constructor as an unmodifiable map. |
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.
Different record transformers can be configured for different namespaces, sets, etc.
Record Transformer Example
This Pulsar example adds a bin and updates generation of the change notification record.
...
namespaces: namespaceA: custom-transformer: class: com.aerospike.connect.outbound.transformer.examples.pulsar.PulsarRecordTransformer params: bins: colour: RED shade: light
package com.aerospike.connect.outbound.transformer.examples.pulsar;
import com.aerospike.connect.outbound.ChangeNotificationMetadata;import com.aerospike.connect.outbound.ChangeNotificationRecord;import com.aerospike.connect.outbound.transformer.Transformer;import com.aerospike.connect.outbound.transformer.TransformerConfig;import lombok.NonNull;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import javax.inject.Inject;import javax.inject.Singleton;import java.util.HashMap;import java.util.Map;
@Singletonpublic class PulsarRecordTransformer implements Transformer { private final static Logger logger = LoggerFactory.getLogger(PulsarRecordTransformer.class.getName());
/** * Params set in the config. */ private final Map<String, Object> configParams;
@Inject public PulsarRecordTransformer(TransformerConfig transformerConfig) { this.configParams = transformerConfig.getParams(); }
@Override public ChangeNotificationRecord transform( @NonNull ChangeNotificationRecord record) { // Increment generation in metadata. Integer generation = record.getMetadata().getGeneration().isPresent() ? record.getMetadata().getGeneration().get() + 1 : null;
logger.debug("Updated generation of record {} to {}", record.getMetadata().getKey(), generation);
ChangeNotificationMetadata metadata = new ChangeNotificationMetadata(record.getMetadata().getKey(), record.getMetadata().getOperation(), generation, record.getMetadata().getLastUpdateTimeMs().orElse(null), record.getMetadata().getExpiryTime().orElse(null), record.getMetadata().getRecordExistsAction(), record.getMetadata().getGenerationPolicy());
// record.getBins() is immutable, create a copy. Map<String, Object> bins = new HashMap<>(record.getBins());
// Add bins passed as params in config. if (configParams.containsKey("bins")) { @SuppressWarnings("unchecked") Map<String, Object> paramBins = (Map<String, Object>) configParams.get("bins"); bins.putAll(paramBins); }
return new ChangeNotificationRecord(metadata, bins); }}
Format Transformer
Format transformer formats the incoming change notification record into a custom format, which is dispatched to the outbound destination.
The format transformer can also be configured to get the change notification record formatted
as one of the built-in outbound formats (Avro, MessagePack, etc), as payload
in the
formattedRecord
argument.
Format Transformer API
Custom code should implement the Formatter
interface to format incoming records. The
formatter is passed a record with metadata as per the outbound destination: EspOutboundMetadata
,
JmsOutboundMetadata
, KafkaOutboundMetadata
, PulsarOutboundMetadata
and ElasticsearchOutboundMetadata
.
The BytesOutboundRecord
type is returned by all custom formatters. In JMS outbound
destination if the return type is TextOutboundRecord
the record is sent as JMS
text message. All other return types are sent as JMS bytes message.
Format Transformer Skip Record
To skip a record from dispatch to the outbound destination, the custom
format transformer should return a SkipOutboundRecord
.
Format Transformer Configuration
Parameter | Required | Default Value | Description |
---|---|---|---|
mode | Yes | Use custom . | |
class | No | - | Fully-qualified name of the class implementing the Formatter interface. Either class or batch-formatter-class should be specified. |
batch-formatter-class | No | - | Fully-qualified name of the class implementing the BatchFormatter interface. Either class or batch-formatter-class should be specified. |
params | No | Empty map | Parameters passed to the custom code as an unmodifiable map. |
payload-format | No | null | The configuration item to format the payload passed to the custom code. If configured as null the payload is null . It can be configured with any of the built-in formats: Avro, FlatJSON, etc. as specified in supported formats supported formats |
Format Transformer Example
Examples
For more examples see examples in the SDK Github repo.