# Message transformer for XDR Proxy

::: caution
-   This config is only valid for the XDR Proxy version 2.1.0+.
-   Bin convergence and writing bin-level LUT (last update time) are not supported in the custom message transformer.
:::

The Message Transformer allows you to write custom code that reads incoming XDR Change Notification Records, performs Aerospike operations or other transformations on them, and converts them into [AerospikeRecordOperation](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.3.2/src/main/java/com/aerospike/connect/inbound/operation/AerospikeRecordOperation.java) objects. You can develop your code by using the Java SDK, then bundle it as a .jar file. You then make the .jar file available to the XDR Proxy using the classpath, along with associated parameters that are specified in the configuration file. Your custom code can be plugged into the XDR Proxy for rapid integration.

## Example use cases

-   Perform complex operations on Maps or Lists on XDR CNR (change notification record) before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.
-   Filter messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.
-   Create Aerospike records with bins generated by tweaking XDR CNR (change notification record) values. You can even extend your message transformer to create Aerospike keys.

## What does it not do?

-   The message transformer is not meant for heavy-weight processing or calling external APIs.
-   It does not support transactions where several commands isolated from commands outside the transaction are executed atomically. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.

## Developing a message transformer

### Add the Maven SDK dependencies

Add the following dependencies:

```xml
<dependencies>

  <!-- Aerospike Inbound SDK -->

  <dependency>

      <groupId>com.aerospike</groupId>

      <artifactId>aerospike-connect-inbound-sdk</artifactId>

      <version>1.3.2</version>

      <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->

  </dependency>

  <!-- Aerospike Outbound SDK -->

  <dependency>

      <groupId>com.aerospike</groupId>

      <artifactId>aerospike-connect-outbound-sdk</artifactId>

      <version>2.2.2</version>

      <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->

  </dependency>

  <!-- Javax inject annotations -->

  <dependency>

      <groupId>javax.inject</groupId>

      <artifactId>javax.inject</artifactId>

      <version>1</version>

      <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->

  </dependency>

  <!-- Logging -->

  <dependency>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-api</artifactId>

      <version>1.7.33</version>

      <scope>provided</scope> <!-- Will be available in the XDR Proxy runtime -->

  </dependency>

</dependencies>
```

### Implement the [InboundMessageTransformer](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.3.2/src/main/java/com/aerospike/connect/inbound/InboundMessageTransformer.java) interface

The custom message transformer should implement `InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>>`

::: caution
The `bins` field in `ChangeNotificationRecord` is an immutable map.
:::

The WritePolicy passed in the `InboundMessage` to the custom message transformer is as follows:

| Write Policy field | Description |
| --- | --- |
| `recordExistsAction` | Set to the value shipped by XDR |
| `generationPolicy` | Set to the value shipped by XDR |
| `generation` | Set to the value shipped by XDR |
| `expiration` | Set to the value shipped by XDR |
| `socketTimeout` | Set to the value configured in the `aerospike` section of the YAML config |
| `totalTimeout` | Set to the value configured in the `aerospike` section of the YAML config |
| `xdr` | Set to `true` |

You have the option to inject the following objects in your `message-transformer` class using [Java Dependency Injection](https://docs.oracle.com/javaee/7/api/javax/inject/package-summary.html).

| Class | Usage |
| --- | --- |
| [AerospikeReader](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.3.2/src/main/java/com/aerospike/connect/inbound/AerospikeReader.java) | An object to read a record from the Aerospike Database. |
| [InboundMessageTransformerConfig](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.3.2/src/main/java/com/aerospike/connect/inbound/model/InboundMessageTransformerConfig.java) | The custom parameters provided in the configuration file as `params`. |

### Thread safety

-   If you annotate your implementation with [@Singleton](https://docs.oracle.com/javaee/7/api/javax/inject/Singleton.html), it has to be thread safe because one instance can be used by multiple threads.
-   If you do not annotate your implementation with [@Singleton](https://docs.oracle.com/javaee/7/api/javax/inject/Singleton.html), a new instance of your message transformer is created for every incoming message.

### Configure the XDR Proxy to use your message transformer

Include the `message-transformer` stanza at the global, namespace, or set scope.

#### Example

```yaml
# Message transformer at global scope.

message-transformer:

  class: com.aerospike.GlobalScopeTransformer

  params:

    color: RED

namespaces:

  milkyWay:

    # Message transformer at namespace "milkyWay" scope.

    message-transformer:

      class: com.aerospike.TestNamespaceTransformer

      params:

        color: BLUE

    sets:

      solarSystem:

        # Message transformer at set "solarSystem" scope.

        message-transformer:

          class: com.aerospike.TestNamespaceTransformer

          params:

            color: YELLOW
```

### Deploy your message transformer

Deploy your `.jar` file, along with any dependencies it might have, by copying it to the `/opt/aerospike-xdr-proxy/usr-lib/` folder of the directory where the Aerospike XDR Proxy is installed.

## Example 1: Add new bins to records

```java
import com.aerospike.client.Bin;

import com.aerospike.client.Key;

import com.aerospike.connect.inbound.InboundMessageTransformer;

import com.aerospike.connect.inbound.model.InboundMessage;

import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;

import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;

import com.aerospike.connect.inbound.operation.AerospikePutOperation;

import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;

import com.aerospike.connect.outbound.ChangeNotificationRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import javax.inject.Inject;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.stream.Collectors;

/**

 * Add bins specified in the YAML config to the incoming records.

 */

class XdrProxyAddBinsCustomTransformer implements

        InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {

    private final static Logger logger =

            LoggerFactory.getLogger(XdrProxyAddBinsCustomTransformer.class.getName());

    private final InboundMessageTransformerConfig config;

    @Inject

    public XdrProxyAddBinsCustomTransformer(

            InboundMessageTransformerConfig config) {

        this.config = config;

    }

    @Override

    public AerospikeRecordOperation transform(

            InboundMessage<Key, ChangeNotificationRecord> input) {

        // Do not transform a delete.

        if (input.getMessage().getMetadata().getOperation().isDelete()) {

            logger.debug("key={} DELETE operation", input.getKey().get());

            return new AerospikeDeleteOperation(input.getKey().get(),

                    input.getWritePolicy().get());

        }

        logger.debug("key={} WRITE operation", input.getKey().get());

        // Get the bins shipped by XDR. Original bins is unmodifiable.

        Map<String, Object> bins = new HashMap<String, Object>(input.getMessage().getBins());

        // Add values specified in YAML config to the bins.

        bins.putAll(config.getTransformerConfig());

        // Convert to list of bins.

        List<Bin> aerospikeBins = bins.entrySet().stream().map(

                bin -> new Bin(bin.getKey(), bin.getValue())

        ).collect(Collectors.toList());

        // Write the record with the transformed bins.

        return new AerospikePutOperation(input.getKey().get(),

                input.getWritePolicy().get(), aerospikeBins);

    }

}
```

## Example 2: Selectively delete records

```java
import com.aerospike.client.Bin;

import com.aerospike.client.Key;

import com.aerospike.client.Record;

import com.aerospike.connect.inbound.AerospikeReader;

import com.aerospike.connect.inbound.InboundMessageTransformer;

import com.aerospike.connect.inbound.model.InboundMessage;

import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;

import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;

import com.aerospike.connect.inbound.operation.AerospikePutOperation;

import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;

import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;

import com.aerospike.connect.outbound.AerospikeOperation;

import com.aerospike.connect.outbound.ChangeNotificationRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import javax.inject.Inject;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

/**

 * Selectively delete records based on generation.

 */

class XdrProxyDeleteCustomTransformer implements

        InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {

    private final static Logger logger =

            LoggerFactory.getLogger(XdrProxyDeleteCustomTransformer.class.getName());

    private final InboundMessageTransformerConfig config;

    private final AerospikeReader aerospikeReader;

    /**

     * The generation key to pass the value in the YAML config.

     */

    private final String GEN_KEY = "gen";

    /**

     * Default generation cut-off value of an old record.

     */

    private final int DEFAULT_GEN_OLD = 100;

    @Inject

    public XdrProxyDeleteCustomTransformer(

            AerospikeReader aerospikeReader,

            InboundMessageTransformerConfig config) {

        this.aerospikeReader = aerospikeReader;

        this.config = config;

    }

    @Override

    public AerospikeRecordOperation transform(

            InboundMessage<Key, ChangeNotificationRecord> input) {

        // Do not transform a write.

        if (input.getMessage().getMetadata().getOperation() ==

                AerospikeOperation.WRITE) {

            logger.debug("key={} WRITE operation", input.getKey().get());

            Map<String, Object> bins = input.getMessage().getBins();

            List<Bin> aerospikeBins = new ArrayList<>(bins.size());

            for (Map.Entry<String, Object> bin : bins.entrySet()) {

                aerospikeBins.add(new Bin(bin.getKey(), bin.getValue()));

            }

            return new AerospikePutOperation(input.getKey().get(),

                    input.getWritePolicy().get(), aerospikeBins);

        }

        logger.debug("key={} DELETE operation", input.getKey().get());

        Record record =

                aerospikeReader.get(null, input.getKey().get());

        int oldGen = (int) config.getTransformerConfig()

                .getOrDefault(GEN_KEY, DEFAULT_GEN_OLD);

        // Do not delete young records, skip shipping them.

        if (record == null || record.generation < oldGen) {

            return new AerospikeSkipRecordOperation();

        }

        // Only delete older records.

        return new AerospikeDeleteOperation(input.getKey().get(),

                input.getWritePolicy().get());

    }

}
```