---
title: "Examples of custom code to use Message Transformer"
description: "Java code examples for implementing custom Message Transformers in Aerospike Connect for Kafka."
---

# Examples of custom code to use Message Transformer

> For the complete documentation index see: [llms.txt](https://aerospike.com/docs/llms.txt)
> 
> All documentation pages available in markdown.

## Operations on Collection Data Types

This example uses the transformer to perform [List](https://aerospike.com/docs/develop/data-types/collections/list) and [Map](https://aerospike.com/docs/develop/data-types/collections/map) commands on an incoming Kafka record

This example modifies a rocket dealership’s inventory and sales record when a customer buys a rocket. It uses a parsed Kafka SinkRecord by implementing InboundMessageTransformer with InboundMessage.

```java
import com.aerospike.client.Key;

import com.aerospike.client.Operation;

import com.aerospike.client.Value;

import com.aerospike.client.cdt.CTX;

import com.aerospike.client.cdt.ListOperation;

import com.aerospike.client.cdt.ListReturnType;

import com.aerospike.client.cdt.MapOperation;

import com.aerospike.client.cdt.MapPolicy;

import com.aerospike.client.policy.WritePolicy;

import com.aerospike.connect.inbound.InboundMessageTransformer;

import com.aerospike.connect.inbound.model.InboundMessage;

import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;

import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;

import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import javax.inject.Singleton;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

@Singleton

public class CDTMessageTransformer implements

        InboundMessageTransformer<InboundMessage<Object, Object>> {

    private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());

    @Override

    public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {

        Map<String, Object> fields = input.getFields();

        // Get the Aerospike key.

        String key = (String) fields.get("key");

        if (key == null) {

            logger.warn("Invalid missing key");

            return new AerospikeSkipRecordOperation();

        }

        // Aerospike key.

        Key aerospikeKey = new Key("used-rocket-dealership", null, key);

        /*

          Rocket Map {

               model: String;

               manufacturer: String

               thrust: Integer;

               price: Double;

               .

               .

          }

         */

        @SuppressWarnings("unchecked")

        Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");

        /*

          This rocket has just been sold by the dealership. We need to

          remove it from the inventory and record our profits.

         */

        // List to hold Aerospike CDT operations.

        List<Operation> operations = new ArrayList<>();

        /*

          The "inventory" bin holds the dealerships list of rockets for sale.

          Lets remove the rocket from the "inventory" bin.

         */

        operations.add(ListOperation.removeByValue("inventory",

                Value.get(rocket), ListReturnType.NONE));

        /*

          Now we need to update our sales record to show how many rockets have

          been sold, our profits. The sales record looks like this:

          sales-record {

            list-of-sold:     List<Rocket>

            num-rockets-sold: Integer

            gross-profit:     Double

          }

         */

        operations.add(ListOperation.append("list-of-sold", Value.get(rocket),

                CTX.mapKey(Value.get("sales-record"))));

        operations.add(MapOperation.increment(new MapPolicy(), "sales-record"

                , Value.get("num-rockets-sold"), Value.get(1)));

        operations.add(MapOperation.increment(new MapPolicy(), "sales-record",

                Value.get("gross_profit"), Value.get(rocket.get("profit"))));

        /*

          Lastly, we will update the top sales person.

          top-sales-person {

            first-name: String

            last-name:  String

          }

         */

        Map<Value, Value> topSalesPerson = new HashMap<>();

        topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));

        topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));

        operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",

                topSalesPerson));

        return new AerospikeOperateOperation(aerospikeKey, new WritePolicy(), operations, input.getIgnoreErrorCodes());

    }

}
```

## Database reads during transform

Use injected [AerospikeReader](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.4.2/src/main/java/com/aerospike/connect/inbound/AerospikeReader.java) to perform Database reads during transform

This example shows a more complex transformer. Here we must check if there is an existing record (by performing a read on the database) and perform conditional logic to either create an entry or performs some Operations depending on the result of that check.

```java
public class CasCDTMessageTransformer implements

        InboundMessageTransformer<InboundMessage<Object, Object>> {

    private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());

    /**

     * Injected aerospike reader to read records from Aerospike.

     */

    private final AerospikeReader aerospikeReader;

    /**

     * Inbound message transformer config for the topic against which this class is bound.

     */

    private final InboundMessageTransformerConfig inboundMessageTransformerConfig;

    @Inject

    public CasCDTMessageTransformer(AerospikeReader aerospikeReader,

                                    InboundMessageTransformerConfig inboundMessageTransformerConfig) {

        this.aerospikeReader = aerospikeReader;

        this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;

    }

    @Override

    public AerospikeRecordOperation transform(

            InboundMessage<Object, Object> inboundMessage) {

        Map<String, Object> input = inboundMessage.getFields();

        // Get the Aerospike key. Name field was sent in the kafka message

        Object key = input.get("name");

        if (key == null) {

            logger.error("invalid message " + input);

            return new AerospikeSkipRecordOperation();

        }

        String newCdr = "cdr_" + System.currentTimeMillis();

        // Aerospike key.

        Key aerospikeKey = new Key("test", null, (String) key);

        Record existingRecord = null;

        // Read existing record.

        try {

            existingRecord = aerospikeReader.get(null, aerospikeKey);

        } catch (AerospikeException ae) {

            // Java client throws an exception if record is not found for

            // the key in Aerospike

            logger.error("Error while getting the record", ae);

        }

        WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);

        if (existingRecord == null) {

            List<Bin> bins = new ArrayList<>();

            List<String> cdrList = new ArrayList<>();

            cdrList.add(newCdr);

            bins.add(new Bin("cdrs", cdrList));

            bins.add(new Bin("topicName",

                    Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));

            // Add all config fields as a Bin

            bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())

                    .entrySet()

                    .stream()

                    .map(e -> new Bin(e.getKey(), e.getValue()))

                    .collect(Collectors.toList())

            );

            // Add all kafka message fields as a Bin

            bins.addAll(input

                    .entrySet()

                    .stream()

                    .map(e -> new Bin(e.getKey(), e.getValue()))

                    .collect(Collectors.toList())

            );

            // These error codes are sent in inboundMessage by Aerospike if you have configured them in

            // aerospike-kafka-inbound.yml.

            Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();

            return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes);

        } else {

            // List of Aerospike operations.

            List<Operation> operations = new ArrayList<>();

            // Append the CDR if the list is small, else first truncate the list.

            @SuppressWarnings("unchecked")

            List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");

            int cdrMaxCapacity = 2;

            if (existingCdrs.size() >= cdrMaxCapacity) {

                // Trim the oldest records.

                operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));

            }

            // Insert new CDR to the top of the list.

            operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));

            return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations);

        }

    }

}
```

## Kafka tombstones

Use transforms to provide functionality outside the scope of the Kafka connector

Tombstones in Kafka let you delete messages by using compaction. You can use them when you do not have to wait for a single message to be deleted upon expiration.

A tombstone in Kafka consists of a message key corresponding to the Kafka message that needs to be deleted and a null payload. The idea is to delete the corresponding message from Aerospike as well. This may be required for compliance-related use cases such as General Data Protection Regulation (GDPR).

The Aerospike Kafka Sink Connector does not offer any native support for handling such messages, so you must write a message transformer like this one([details](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.4.2/examples/kafka/src/main/java/com/aerospike/connect/kafka/inbound/KafkaTombstoneMessageTransformer.java)):

```java
public class KafkaTombstoneMessageTransformer

         implements InboundMessageTransformer<InboundMessage<Object, SinkRecord>> {

     @Override

     public AerospikeRecordOperation transform(

             InboundMessage<Object, SinkRecord> inboundMessage) {

         WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);

         // Kafka tombstone record has non-null key and null payload.

         if (inboundMessage.getMessage().value() == null) {

             return new AerospikeDeleteOperation(

                     new Key("test", null, "jumbo_jet"), writePolicy);

         }

         return new AerospikePutOperation(new Key("test", null, "kevin"), writePolicy,

                 singletonList(new Bin("name",

                         inboundMessage.getFields().get("name"))));

     }

 }
```

See these and other examples in the [GitHub repository](https://github.com/aerospike/aerospike-connect-inbound-sdk/tree/1.4.2/examples/kafka).