Outbound Message Transformer
Aerospike Connect outbound transformer provides a plugin system in Aerospike outbound connectors to transform incoming XDR change notification records dispatched to the outbound destination with custom code.
Three transformers can be configured in any combination in the outbound connectors to apply to the incoming Aerospike change notification records dispatched by XDR:
- Routing transformer: Route the record to the outbound destination.
- Record transformer: Transform the contents of the Aerospike record. Transformations include:
- Add bin
- Delete bin
- Change bin value
- Change key
- Change metadata (generation, last update time, expiry)
- Format transformer: Transform the Aerospike record into a custom format written to the outbound destination.
- BatchFormat transformer: Similar to
Format transformer
except it can receive multiple Aerospike records and can return one or more custom formatted records. You can use either formatter or batch-formatter.
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level.
Develop a Custom Code Pluginโ
Add Maven dependency to Aerospike connect outbound SDK for all connectors except Elasticsearch. Elasticsearch requires dependency on Aerospike connect elasticsearch outbound SDK. Certain dependencies like Aerospike outbound SDK, Aerospike elasticsearch outbound SDK, Aerospike client, Lombok, Javax Inject, slf4j logger, Google protobuf, etc. are provided by the Aerospike connect runtime and need not be packaged with the plugin JAR.
Plugin should be compiled with the same/compatible Java version running the outbound connector.
Add the Maven dependency to desired outbound SDK during development:
- Outbound SDK
- Elasticsearch Outbound SDK
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-outbound-sdk</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-elasticsearch-outbound-sdk</artifactId>
<version>2.1.2</version>
<scope>provided</scope>
</dependency>
For detailed examples see maven example and gradle example.
GitHub Repositoryโ
Both the SDKs are located at GitHub repo aerospike-connect-outbound-sdk as multi module project.
Custom Transformer Classโ
The custom transformer class should have a
- Publicly visible no-args constructor or
- Publicly visible constructor annotated with
@Inject
and having a single argument of typeFormatterConfig
for format transformer,RouterConfig
for routing transformer, orTransformerConfig
for record transformer.
Each configuration scope: global, namespace or set; runs within a separate injector scope. The custom class can be annotated with annotations regarding its lifetime Scope as follows
- If the custom class is annotated with
@Singleton
, a single instance of the class is created during the lifetime of the outbound connector in the given scope: global, namespace or set. The singleton instance should be thread safe, since the same singleton instance can be accessed across multiple threads concurrently. - If the custom class is not annotated with
@Singleton
, a new instance of the class is created for every invocation of the transformer.
Deploy a Custom Code Pluginโ
Generate the JARs for the custom code plugin with all dependencies shaded to avoid
class path conflicts with the connectors. Copy all the JARs to the
/opt/aerospike-<outbound-connector>/usr-lib
directory before starting the
outbound connector.
We recommend that you shade all the custom plugin dependencies to avoid class path conflicts with the outbound connectors.
Change Notification Recordโ
The change notification record is a representation of the record shipped by XDR to the outbound connectors. It is passed as input to the transformers.
In the change notification record passed as input to the Formatter
, Router
, or
Transformer
the following types in top-level bins, maps, and lists are represented
as follows:
- Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob and HLL types are stored
as
BytesValue
with the correspondingParticleType
. - GeoJson types are stored as
GeoJSON
types. - Plain byte arrays are stored as
BytesValue
withBlob
particle type.
Routing Transformerโ
Routing transformer transforms the incoming XDR change notification record into an outbound destination route. The route for each outbound destination is defined in the table below.
Outbound Destination | Route description |
---|---|
ESP (Event Stream Processing) | One of the destinations configured in the ESP config YAML. |
JMS | JMS queue/topic type and name. |
Kafka | Kafka topic name. |
Pulsar | Pulsar topic name. |
Elasticsearch | Elasticsearch index name. |
If you configure a record transformer and a routing transformer for the same change notification, the record transformer is applied first. The routing transformer is then applied to the record transformer, NOT to the original record.
Routing Transformer APIโ
Any custom code should implement the Router
interface to route incoming records.
Routing Transformer Skip Recordโ
To skip a record from dispatch to the outbound destination, the custom
routing transformer should return a OutboundRoute
with OutboundRouteType
set to SKIP
.
Routing Transformer Configurationโ
Parameter | Required | Default Value | Description |
---|---|---|---|
mode | Yes | Use custom . | |
class | Yes | - | Fully-qualified name of the class implementing the routing interface. |
params | No | Empty map | Params passed to the custom code constructor as an unmodifiable map. |
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.
Different routing transformers can be configured for different namespaces, sets, etc.
Routing Transformer Exampleโ
This ESP example routes to the destination based on the generation number.
...
destinations:
young:
...
old:
...
namespaces:
namespaceA:
routing:
mode: custom
class: com.aerospike.connect.outbound.transformer.examples.esp.EspGenerationRouter
params:
genNumber: 100
package com.aerospike.connect.outbound.transformer.examples.esp;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.routing.OutboundRoute;
import com.aerospike.connect.outbound.routing.Router;
import com.aerospike.connect.outbound.routing.RouterConfig;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;
import java.util.Optional;
@Singleton
public class EspGenerationRouter implements Router<String> {
private final static Logger logger =
LoggerFactory.getLogger(EspGenerationRouter.class.getName());
/**
* Params set in the config.
*/
private final Map<String, Object> configParams;
@Inject
public EspGenerationRouter(RouterConfig routerConfig) {
this.configParams = routerConfig.getParams();
}
@Override
public OutboundRoute<String> getRoute(
@NonNull ChangeNotificationRecord record) {
// Record generation is not shipped by Aerospike XDR versions before
// v5.0.0.
Optional<Integer> generation = record.getMetadata().getGeneration();
// Destinations young and old are to be configured in the
// "destinations" section of the ESP config.
//
// "genNumber" is to be set in params option of the ESP routing config.
if (generation.isPresent() &&
generation.get() > (int) configParams.get("genNumber")) {
logger.debug("Routing record {} to old", record.getMetadata().getKey());
return OutboundRoute.newEspRoute("old");
}
logger.debug("Routing record {} to young", record.getMetadata().getKey());
return OutboundRoute.newEspRoute("young");
}
}
Record Transformerโ
Record transformer transforms the incoming change notification record. Transformations include:
- Add bin
- Delete bin
- Change bin value
- Change key
- Change metadata (generation, last update time, expiry)
Record Transformer APIโ
Custom code should implement the Transformer
interface to transform incoming records.
The bins returned by the custom transformer can be plain Java objects or Aerospike client
Value
types. If the transformed record is formatted with one of these built-in outbound
formats: FlatJSON, JSON, and MessagePack; the differences in their formatting is as mentioned
below
- GeoJSON string:
- If represented as a Java string it is formatted as plain string in JSON, FlatJSON, and MessagePack outbound format.
- If represented as a
GeoJSONValue
Aerospike client value type, it is formatted as:- a JSON object in JSON and FlatJSON outbound format
- a bin with GeoJSON type in top level bin, a MessagePack ext format type (with ext type equal to GeoJSON) in maps and lists; in MessagePack outbound format.
- Java blob, C# blob, Python blob, Ruby blob, PHP blob, Erlang blob, and HLL types:
- If represented by Java byte array it is formatted as:
- a Base64 encoded string in JSON and FlatJSON outbound format
- a Blob type in top level bin, a binary value in maps and lists; in MessagePack outbound format.
- If represented as a
BytesValue
it is formatted as:- a Base64 encode string in JSON and FlatJSON outbound format
- a bin with the exact type in top level bin, a MessagePack ext format type (with ext type equal to the type) in maps and lists; in MessagePack outbound format
- If represented by Java byte array it is formatted as:
Record Transformer Skip Recordโ
To skip a record from dispatch to the outbound destination, the custom
record transformer should return a SkipChangeNotificationRecord
.
Record Transformer Configurationโ
Parameter | Required | Default Value | Description |
---|---|---|---|
class | Yes | - | Fully-qualified name of the class implementing the transformer interface. |
params | No | Empty map | Params passed to the custom code constructor as an unmodifiable map. |
As with other configurations for outbound connectors, you can plug in the transformers at any of the specific levels; global level, namespace level, or set level. The custom code is invoked only for the configured XDR change notification records.
Different record transformers can be configured for different namespaces, sets, etc.
Record Transformer Exampleโ
This Pulsar example adds a bin and updates generation of the change notification record.
...
namespaces:
namespaceA:
custom-transformer:
class: com.aerospike.connect.outbound.transformer.examples.pulsar.PulsarRecordTransformer
params:
bins:
colour: RED
shade: light
package com.aerospike.connect.outbound.transformer.examples.pulsar;
import com.aerospike.connect.outbound.ChangeNotificationMetadata;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.transformer.Transformer;
import com.aerospike.connect.outbound.transformer.TransformerConfig;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.HashMap;
import java.util.Map;
@Singleton
public class PulsarRecordTransformer implements Transformer {
private final static Logger logger =
LoggerFactory.getLogger(PulsarRecordTransformer.class.getName());
/**
* Params set in the config.
*/
private final Map<String, Object> configParams;
@Inject
public PulsarRecordTransformer(TransformerConfig transformerConfig) {
this.configParams = transformerConfig.getParams();
}
@Override
public ChangeNotificationRecord transform(
@NonNull ChangeNotificationRecord record) {
// Increment generation in metadata.
Integer generation = record.getMetadata().getGeneration().isPresent() ?
record.getMetadata().getGeneration().get() + 1 : null;
logger.debug("Updated generation of record {} to {}", record.getMetadata().getKey(),
generation);
ChangeNotificationMetadata metadata =
new ChangeNotificationMetadata(record.getMetadata().getKey(),
record.getMetadata().getOperation(), generation,
record.getMetadata().getLastUpdateTimeMs().orElse(null),
record.getMetadata().getExpiryTime().orElse(null),
record.getMetadata().getRecordExistsAction(),
record.getMetadata().getGenerationPolicy());
// record.getBins() is immutable, create a copy.
Map<String, Object> bins = new HashMap<>(record.getBins());
// Add bins passed as params in config.
if (configParams.containsKey("bins")) {
@SuppressWarnings("unchecked") Map<String, Object> paramBins =
(Map<String, Object>) configParams.get("bins");
bins.putAll(paramBins);
}
return new ChangeNotificationRecord(metadata, bins);
}
}
Format Transformerโ
Format transformer formats the incoming change notification record into a custom format, which is dispatched to the outbound destination.
If you configure a record transformer and a format transformer for the same change notification, the record transformer is applied first. The format transformer is then applied to the record transformer, NOT to the original record.
The format transformer can also be configured to get the change notification record formatted
as one of the built-in outbound formats (Avro, MessagePack, etc), as payload
in the
formattedRecord
argument.
Format Transformer APIโ
Custom code should implement the Formatter
interface to format incoming records. The
formatter is passed a record with metadata as per the outbound destination: EspOutboundMetadata
,
JmsOutboundMetadata
, KafkaOutboundMetadata
, PulsarOutboundMetadata
and ElasticsearchOutboundMetadata
.
The BytesOutboundRecord
type is returned by all custom formatters. In JMS outbound
destination if the return type is TextOutboundRecord
the record is sent as JMS
text message. All other return types are sent as JMS bytes message.
Format Transformer Skip Recordโ
To skip a record from dispatch to the outbound destination, the custom
format transformer should return a SkipOutboundRecord
.
Format Transformer Configurationโ
Parameter | Required | Default Value | Description |
---|---|---|---|
mode | Yes | Use custom . | |
class | No | - | Fully-qualified name of the class implementing the Formatter interface. Either class or batch-formatter-class should be specified. |
batch-formatter-class | No | - | Fully-qualified name of the class implementing the BatchFormatter interface. Either class or batch-formatter-class should be specified. |
params | No | Empty map | Parameters passed to the custom code as an unmodifiable map. |
payload-format | No | null | The configuration item to format the payload passed to the custom code. If configured as null the payload is null . It can be configured with any of the built-in formats: Avro, FlatJSON, etc. as specified in supported formats supported formats |
Format Transformer Exampleโ
- Formatter
- Batch Formatter
An example format transformer for JMS outbound destination.
...
namespaceA:
format:
mode: custom
class: com.aerospike.connect.outbound.transformer.examples.jms.JmsKeyValueFormatter
params:
separator: ":"
package com.aerospike.connect.outbound.transformer.examples.jms;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import com.aerospike.connect.outbound.format.DefaultTextOutboundRecord;
import com.aerospike.connect.outbound.format.Formatter;
import com.aerospike.connect.outbound.format.FormatterConfig;
import com.aerospike.connect.outbound.format.MediaType;
import com.aerospike.connect.outbound.format.OutboundRecord;
import com.aerospike.connect.outbound.jms.JmsOutboundMetadata;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;
@Singleton
public class JmsKeyValueFormatter implements Formatter<JmsOutboundMetadata> {
private final static Logger logger =
LoggerFactory.getLogger(JmsKeyValueFormatter.class.getName());
/**
* Params set in the config.
*/
private final Map<String, Object> configParams;
@Inject
public JmsKeyValueFormatter(FormatterConfig formatterConfig) {
configParams = formatterConfig.getParams();
}
@Override
public OutboundRecord<JmsOutboundMetadata> format(
@NonNull ChangeNotificationRecord record,
@NonNull OutboundRecord<JmsOutboundMetadata> formattedRecord) {
logger.debug("Formatting record {}", record.getMetadata().getKey());
// Only write string bins.
StringBuilder payloadBuilder = new StringBuilder();
String separator =
(String) configParams.getOrDefault("separator", ":");
for (Map.Entry<String, Object> bin : record.getBins().entrySet()) {
if (bin.getValue() instanceof String) {
payloadBuilder.append(bin.getKey());
payloadBuilder.append(separator);
payloadBuilder.append(bin.getValue());
payloadBuilder.append(System.lineSeparator());
}
}
return new DefaultTextOutboundRecord<>(
payloadBuilder.toString().getBytes(), MediaType.OCTET_STREAM,
formattedRecord.getMetadata());
}
}
An example batch format transformer for Elasticsearch outbound destination.
...
namespaceA:
format:
mode: custom
batch-formatter-class: com.aerospike.connect.outbound.transformer.examples.elasticsearch.ElasticsearchCustomJsonBatchFormatter
package com.aerospike.connect.outbound.transformer.examples.elasticsearch;
import co.elastic.clients.elasticsearch.core.BulkRequest.Builder;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import com.aerospike.connect.outbound.elasticsearch.ElasticsearchOutboundMetadata;
import com.aerospike.connect.outbound.elasticsearch.format.ElasticsearchOutboundRecord;
import com.aerospike.connect.outbound.format.BatchFormatter;
import com.aerospike.connect.outbound.format.BatchItem;
import com.aerospike.connect.outbound.format.OutboundRecord;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ElasticsearchCustomJsonBatchFormatter formats change notification records by
* adding an extra field named 'created_at' along with the parsed bins. Also,
* setting the index at operation level if it can be derived.
*
* <p>
* A snippet of a config for this batch-formatter can be
* <pre>
* format:
* mode: custom
* batch-formatter-class: com.aerospike.connect.outbound.transformer.examples.elasticsearch.ElasticsearchCustomJsonBatchFormatter
* </pre>
*/
@Singleton
public class ElasticsearchCustomJsonBatchFormatter
implements BatchFormatter<ElasticsearchOutboundMetadata> {
private final static Logger logger =
LoggerFactory.getLogger(
ElasticsearchCustomJsonBatchFormatter.class.getName());
private final static ObjectMapper objectMapper = new ObjectMapper();
@Override
public List<OutboundRecord<ElasticsearchOutboundMetadata>> format(
@NonNull List<BatchItem<ElasticsearchOutboundMetadata>> batchItems)
throws Exception {
ElasticsearchOutboundRecord elasticsearchOutboundRecord =
getElasticsearchOutboundRecord(batchItems);
List<OutboundRecord<ElasticsearchOutboundMetadata>>
outboundRecords = new ArrayList<>();
outboundRecords.add(elasticsearchOutboundRecord);
return outboundRecords;
}
@NonNull
private ElasticsearchOutboundRecord getElasticsearchOutboundRecord(
@NonNull List<BatchItem<ElasticsearchOutboundMetadata>> batchItems) {
long currentTimeMillis = System.currentTimeMillis();
return () -> {
Builder builder = new Builder();
// This is the default index for all bulk operations if we don't
// override at the operation level.
builder.index("my_test_index");
batchItems.stream().map(BatchItem::getRecord).forEach(record -> {
logger.debug("Formatting record {}",
record.getMetadata().getKey());
try {
Map<String, Object> bins = record.getBins();
Map<String, Object> resultMap =
new HashMap<>(bins.size() + 1);
resultMap.putAll(bins);
resultMap.put("created_at", currentTimeMillis);
byte[] value =
objectMapper.writeValueAsBytes(resultMap);
builder.operations(
op -> op.create(CreateOperation.of(cob -> {
cob.document(
new PreSerializedJson(value));
Object esIndex = bins.get("es_index");
if (esIndex instanceof String) {
// Set index for this Create
// operation.
cob.index((String) esIndex);
}
return cob;
})
));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
return builder.build();
};
}
}
Examplesโ
For more examples see examples in the SDK Github repo.