Blog

Message Transformer for Aerospike Connect for Kafka

A message transformer for the Aerospike Kafka Inbound Connector

March 8, 2021 | 9 min read
abhilash-mandaliya-95310049a22199d69461aa8b0108c5c3
Abhilash Mandaliya
Software Engineer

Aerospike database along with the Aerospike Connect for Kafka enables you to build low latency and high throughput Kafka based streaming pipelines. Streaming from Apache Kafka to Aerospike Database Enterprise Edition relies on Aerospike’s Kafka inbound connector, which is implemented as a “sink” connector in the open-source Kafka Connect framework. This inbound connector allows you to stream messages from Kafka, translates each message to an Aerospike record, and performs the corresponding insert, update, or delete for that record in Aerospike.

We recently added a feature called Message Transformer to the Kafka connector. This allows you to read an incoming Kafka message and convert it into an Aerospike record after performing Aerospike Operations or other transformations on it. The transformer is a custom piece of code that you develop using the Java SDK and bundle as a JAR file. It is made available to the Aerospike Kafka Inbound Connector via the classpath, along with associated parameters that are specified in the configuration file. Your custom code is easily pluggable into the connector for rapid integration.

aerospike-kafka-df1896e473b87443b41bd0c53889f383

What can you use it for?

  • Performing complex operations on maps or lists in Kafka records before writing the records as Aerospike records to your Aerospike databases. For example, you could add an element from the incoming message to an existing map or a list, complex operations such as creating a map of maps, etc.

  • Developing custom code for functionality that is not natively supported in the Aerospike Kafka Inbound Connector. For example, your message transformer can process Kafka Tombstone for deleting a record in an Aerospike database whose key can be formed from the tombstone.

  • Filtering messages for compliance use cases. For example, you can filter out records containing PII or mask fields with sensitive data prior to writing them to an Aerospike database.

  • Creating an Aerospike record with bins generated by tweaking Kafka message key/value. You can extend the message transformer to create an Aerospike key as well.

What does it not do?

  • It does not use the Kafka Single Message Transforms which per the usage guidelines given by confluent, should be avoided for external database calls. Hence, the message transformer was developed to allow reading from and writing to the Aerospike database on a per Kafka message basis.

  • It’s not meant for heavy-weight processing or calling external(outside the Aerospike Kafka Inbound Connector) APIs. Consider using Apache Spark or Kafka Streams for these use cases.

  • It does not support multi-record transactions. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.

  • It does not transform messages outbound from Aerospike to Kafka. Consider using XDR filtering for outbound messages.

Why should you care?

  • It is thread-safe — It can transform incoming Kafka records in parallel. The API is thread-safe but your custom code running in the message transformer would have to ensure its own thread-safety when accessing non-local variables.

  • It is easy to use — In addition to the SinkRecord provided by Kafka with the SinkTask, it provides the InboundMessage<K,M> class, which is the SinkRecord parsed in an easily readable format to accelerate further processing.

  • It is flexible— It allows for injecting objects such as InboundMessageTransformerConfig and/or the AerospikeReader by using the Java Dependency Injection.

How to write a message transformer using the Java SDK

The SDK is published on Maven Central Repository. Add it to your project’s dependencies using your prefered tool. Here is an example:

<dependency>
    <groupId>com.aerospike</groupId>
    <artifactId>aerospike-connect-inbound-sdk</artifactId>
    <version>1.0.0</version>
</dependency>

The configuration for the transformer is a part of the Aerospike Kafka Inbound Connector configuration and is specified in the “topics” section of the aerospike-kafka-inbound.yml file. Below is a sample configuration:

topics: 
  users: 
    # .....
    message-transformer: 
      class: com.aerospike.connect.inbound.CasCDTMessageTransformer
      params: 
        cdtListLimit: 10
        fieldNamePrefix: cdt_impl

Here you add the message-transformer section under the topic users . This section can have two properties: class which is the mandatory property and should specify the fully qualified class name of your message transformer and an optional property named params which is a map and can be used to have some extra parameters to be used inside your message transformer.

Here are a few examples of the message transformer:

The first example (plain inventory of rocket dealership)

Here is an example message transformer (details) that modifies a rocket dealership’s inventory and sales record whenever a rocket dealership sells a rocket.

public class CDTMessageTransformer implements
        InboundMessageTransformer<InboundMessage<Object, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());
    @Override
    public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {
        Map<String, Object> fields = input.getFields();
        // Get the Aerospike key.
        String key = (String) fields.get("key");
        if (key == null) {
            logger.warn("Invalid missing key");
            return new AerospikeSkipRecordOperation();
        }
        // Aerospike key.
        Key aerospikeKey = new Key("used-rocket-dealership", null, key);
        /*
          Rocket Map {
               model: String;
               manufacturer: String
               thrust: Integer;
               price: Double;
               .
               .
          }
         */
        @SuppressWarnings("unchecked")
        Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");
        /*
          This rocket has just been sold by the dealership, we need to
          remove it from the inventory and record our profits.
         */
        // List to hold Aerospike CDT operations.
        List<Operation> operations = new ArrayList<>();
        /*
          The "inventory" bin holds the dealerships list of rockets for sale.
          Lets remove the rocket from the "inventory" bin.
         */
        operations.add(ListOperation.removeByValue("inventory",
                Value.get(rocket), ListReturnType.NONE));
        /*
          Now we need to update our sales record to show how many rockets have
          been sold, our profits. The sales record looks like this:
          sales-record {
            list-of-sold:     List<Rocket>
            num-rockets-sold: Integer
            gross-profit:     Double
          }
         */
        operations.add(ListOperation.append("list-of-sold", Value.get(rocket),
                CTX.mapKey(Value.get("sales-record"))));
        operations.add(MapOperation.increment(new MapPolicy(), "sales-record"
                , Value.get("num-rockets-sold"), Value.get(1)));
        operations.add(MapOperation.increment(new MapPolicy(), "sales-record",
                Value.get("gross_profit"), Value.get(rocket.get("profit"))));
        /*
          Lastly, we will update the top sales person.
          top-sales-person {
            first-name: String
            last-name:  String
          }
         */
        Map<Value, Value> topSalesPerson = new HashMap<>();
        topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));
        topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));
        operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",
                topSalesPerson));
        return new AerospikeOperateOperation(aerospikeKey, new WritePolicy(), operations);
    }
}

Second Example with reads before write. (more complex transformer)

Here is an example message transformer (details) that appends incoming CDR to an existing record in case the record exists. If the list becomes too large it is trimmed. This example shows a more complex transformer that reads from an Aerospike DB and decides how to process an incoming Kafka record based on the Aerospike record. The message transformer creates a new Aerospike record if it doesn’t exist already or updates an existing one with some conditional checks. Since it is only performing reads, it is still transactionally safe.

public class CasCDTMessageTransformer implements
        InboundMessageTransformer<InboundMessage<Object, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());
    /**
     * Injected aerospike reader to read records from Aerospike.
     */
    private final AerospikeReader aerospikeReader;
    /**
     * Inbound message transformer config for the topic against which this class is bound.
     */
    private final InboundMessageTransformerConfig inboundMessageTransformerConfig;
    @Inject
    public CasCDTMessageTransformer(AerospikeReader aerospikeReader,
                                    InboundMessageTransformerConfig inboundMessageTransformerConfig) {
        this.aerospikeReader = aerospikeReader;
        this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;
    }
    @Override
    public AerospikeRecordOperation transform(
            InboundMessage<Object, Object> inboundMessage) {
        Map<String, Object> input = inboundMessage.getFields();
        // Get the Aerospike key. Name field was sent in the kafka message
        Object key = input.get("name");
        if (key == null) {
            logger.error("invalid message " + input);
            return new AerospikeSkipRecordOperation();
        }
        String newCdr = "cdr_" + System.currentTimeMillis();
        // Aerospike key.
        Key aerospikeKey = new Key("test", null, (String) key);
        Record existingRecord = null;
        // Read existing record.
        try {
            existingRecord = aerospikeReader.getRecord(aerospikeKey);
        } catch (AerospikeException ae) {
            // Java client throws an exception if record is not found for
            // the key in Aerospike
            logger.error("Error while getting the record", ae);
        }
        if (existingRecord == null) {
            List<Bin> bins = new ArrayList<>();
            List<String> cdrList = new ArrayList<>();
            cdrList.add(newCdr);
            bins.add(new Bin("cdrs", cdrList));
            bins.add(new Bin("topicName",
                    Objects.requireNonNull(inboundMessageTransformerConfig.getTransformConfig()).get("topicName")));
            // Add all config fields as a Bin
            bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformConfig())
                    .entrySet()
                    .stream()
                    .map(e -> new Bin(e.getKey(), e.getValue()))
                    .collect(Collectors.toList())
            );
            // Add all kafka message fields as a Bin
            bins.addAll(input
                    .entrySet()
                    .stream()
                    .map(e -> new Bin(e.getKey(), e.getValue()))
                    .collect(Collectors.toList())
            );
            return new AerospikePutOperation(aerospikeKey, null, bins);
        } else {
            // List of Aerospike operations.
            List<Operation> operations = new ArrayList<>();
            // Append the CDR if the list is small, else first truncate the
            // list.
            @SuppressWarnings("unchecked")
            List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");
            int cdrMaxCapacity = 2;
            if (existingCdrs.size() >= cdrMaxCapacity) {
                // Trim the oldest records.
                operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));
            }
            // Insert new CDR to the top of the list.
            operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));
            return new AerospikeOperateOperation(aerospikeKey, null, operations);
        }
    }
}

Third example with Kafka Tombstones

Tombstones in Kafka let you delete messages by using compaction. You can use them when you do not have to wait for a single message to be deleted upon expiration. A tombstone in Kafka consists of a message key corresponding to the Kafka message that needs to be deleted and a null payload. The basic idea here is to delete the corresponding message from Aerospike as well, which may be required for compliance-related use cases such as GDPR. The Aerospike Kafka Inbound Connector does not offer any native support for handling such messages, so you would need to write a message transformer(details), like this one:

public final class KafkaTombstoneMessageTransformer
        implements InboundMessageTransformer<InboundMessage<Object, SinkRecord>> {
    @Override
    public AerospikeRecordOperation transform(
            InboundMessage<Object, SinkRecord> inboundMessage) {
        // Kafka tombstone record has non-null key and null payload
        if (inboundMessage.getMessage().value() == null) {
            return new AerospikeDeleteOperation(
                    new Key("test", null, "jumbo_jet"), null);
        }
        return new AerospikePutOperation(new Key("test", null, "kevin"), null,
                singletonList(new Bin("name",
                        inboundMessage.getFields().get("name"))));
    }
}

How to deploy the message transformer

Deploy your .jar file, along with any dependencies it might have, by copying it to <Aerospike Kafka Inbound Connector directory>/lib.

What’s next?

Message Transformer is an easy way for extending the Aerospike Kafka Inbound Connector for the purpose of data integration with Aerospike for incoming Kafka messages if our default implementation doesn’t meet your requirements. You can easily develop a transformer using the Java SDK to enable interesting use cases.

If you’d like to learn more, visit Aerospike Connect for Kafka. We are also available in the Confluent Hub.

This is my first blog ever and I would like to thank Kiran Matty, Joe Martin, Robert Heath and Ashish Shinde for helping me write this blog.