# Use message transformer for Aerospike Connect to Java Message Service

## Example one: Operations on Collection Data Types

Use the transformer to perform [List](https://aerospike.com/docs/develop/data-types/collections/list) and [Map](https://aerospike.com/docs/develop/data-types/collections/map) operations on an incoming JMS record.

This example modifies a rocket dealership’s inventory and sales record when a customer buys a rocket. It uses a parsed JMS Message by implementing InboundMessageTransformer with InboundMessage.

```java
import com.aerospike.client.Key;

import com.aerospike.client.Operation;

import com.aerospike.client.Value;

import com.aerospike.client.cdt.CTX;

import com.aerospike.client.cdt.ListOperation;

import com.aerospike.client.cdt.ListReturnType;

import com.aerospike.client.cdt.MapOperation;

import com.aerospike.client.cdt.MapPolicy;

import com.aerospike.client.policy.WritePolicy;

import com.aerospike.connect.inbound.InboundMessageTransformer;

import com.aerospike.connect.inbound.model.InboundMessage;

import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;

import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;

import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import javax.inject.Singleton;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

@Singleton

public class CDTMessageTransformer implements

        InboundMessageTransformer<InboundMessage<Object, Object>> {

    private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());

    @Override

    public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {

        Map<String, Object> fields = input.getFields();

        // Get the Aerospike key.

        String key = (String) fields.get("key");

        if (key == null) {

            logger.warn("Invalid missing key");

            return new AerospikeSkipRecordOperation();

        }

        // Aerospike key.

        Key aerospikeKey = new Key("used-rocket-dealership", null, key);

        /*

          Rocket Map {

               model: String;

               manufacturer: String

               thrust: Integer;

               price: Double;

               .

               .

          }

         */

        @SuppressWarnings("unchecked")

        Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");

        /*

          This rocket has just been sold by the dealership. We need to

          remove it from the inventory and record our profits.

         */

        // List to hold Aerospike CDT operations.

        List<Operation> operations = new ArrayList<>();

        /*

          The "inventory" bin holds the dealerships list of rockets for sale.

          Lets remove the rocket from the "inventory" bin.

         */

        operations.add(ListOperation.removeByValue("inventory",

                Value.get(rocket), ListReturnType.NONE));

        /*

          Now we need to update our sales record to show how many rockets have

          been sold, our profits. The sales record looks like this:

          sales-record {

            list-of-sold:     List<Rocket>

            num-rockets-sold: Integer

            gross-profit:     Double

          }

         */

        operations.add(ListOperation.append("list-of-sold", Value.get(rocket),

                CTX.mapKey(Value.get("sales-record"))));

        operations.add(MapOperation.increment(new MapPolicy(), "sales-record"

                , Value.get("num-rockets-sold"), Value.get(1)));

        operations.add(MapOperation.increment(new MapPolicy(), "sales-record",

                Value.get("gross_profit"), Value.get(rocket.get("profit"))));

        /*

          Lastly, we will update the top sales person.

          top-sales-person {

            first-name: String

            last-name:  String

          }

         */

        Map<Value, Value> topSalesPerson = new HashMap<>();

        topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));

        topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));

        operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",

                topSalesPerson));

        return new AerospikeOperateOperation(aerospikeKey, input.getWritePolicy().orElse(null), operations,

                input.getIgnoreErrorCodes());

    }

}
```

## Example two: Database reads during transform

Use injected [AerospikeReader](https://github.com/aerospike/aerospike-connect-inbound-sdk/blob/1.3.2/src/main/java/com/aerospike/connect/inbound/AerospikeReader.java) to perform Database reads during transform.

This example shows a more complex transformer. Here we check for an existing record by performing a read on the database, and perform conditional logic. Depending on the result of the checks, we will either create an entry or perform some commands.

```java
public class CasCDTMessageTransformer implements

        InboundMessageTransformer<InboundMessage<Object, Object>> {

    private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());

    /**

     * Injected aerospike reader to read records from Aerospike.

     */

    private final AerospikeReader aerospikeReader;

    /**

     * Inbound message transformer config for the topic against which this class is bound.

     */

    private final InboundMessageTransformerConfig inboundMessageTransformerConfig;

    @Inject

    public CasCDTMessageTransformer(AerospikeReader aerospikeReader,

                                    InboundMessageTransformerConfig inboundMessageTransformerConfig) {

        this.aerospikeReader = aerospikeReader;

        this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;

    }

    @Override

    public AerospikeRecordOperation transform(

            InboundMessage<Object, Object> inboundMessage) {

        Map<String, Object> input = inboundMessage.getFields();

        // Get the Aerospike key. Name field was sent in the JMS message

        Object key = input.get("name");

        if (key == null) {

            logger.error("invalid message " + input);

            return new AerospikeSkipRecordOperation();

        }

        String newCdr = "cdr_" + System.currentTimeMillis();

        // Aerospike key.

        Key aerospikeKey = new Key("test", null, (String) key);

        Record existingRecord = null;

        // Read existing record.

        try {

            existingRecord = aerospikeReader.get(null, aerospikeKey);

        } catch (AerospikeException ae) {

            // Java client throws an exception if record is not found for

            // the key in Aerospike

            logger.error("Error while getting the record", ae);

        }

        WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);

        if (existingRecord == null) {

            List<Bin> bins = new ArrayList<>();

            List<String> cdrList = new ArrayList<>();

            cdrList.add(newCdr);

            bins.add(new Bin("cdrs", cdrList));

            bins.add(new Bin("topicName",

                    Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));

            // Add all config fields as a Bin

            bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())

                    .entrySet()

                    .stream()

                    .map(e -> new Bin(e.getKey(), e.getValue()))

                    .collect(Collectors.toList())

            );

            // Add all JMS message fields as a Bin

            bins.addAll(input

                    .entrySet()

                    .stream()

                    .map(e -> new Bin(e.getKey(), e.getValue()))

                    .collect(Collectors.toList())

            );

            // These error codes are sent in inboundMessage by Aerospike if you have configured them in

            // aerospike-jms-inbound.yml.

            Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();

            return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes);

        } else {

            // List of Aerospike operations.

            List<Operation> operations = new ArrayList<>();

            // Append the CDR if the list is small, else first truncate the list.

            @SuppressWarnings("unchecked")

            List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");

            int cdrMaxCapacity = 2;

            if (existingCdrs.size() >= cdrMaxCapacity) {

                // Trim the oldest records.

                operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));

            }

            // Insert new CDR to the top of the list.

            operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));

            return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations);

        }

    }

}
```

See these and other examples [here](https://github.com/aerospike/aerospike-connect-inbound-sdk/tree/1.3.2/examples/jms).