Use message transformer for Aerospike Connect to Java Message Service
Example one: Operations on Collection Data Types
Use the transformer to perform List and Map operations on an incoming JMS record.
This example modifies a rocket dealership’s inventory and sales record when a customer buys a rocket. It uses a parsed JMS Message by implementing InboundMessageTransformer with InboundMessage.
import com.aerospike.client.Key;import com.aerospike.client.Operation;import com.aerospike.client.Value;import com.aerospike.client.cdt.CTX;import com.aerospike.client.cdt.ListOperation;import com.aerospike.client.cdt.ListReturnType;import com.aerospike.client.cdt.MapOperation;import com.aerospike.client.cdt.MapPolicy;import com.aerospike.client.policy.WritePolicy;import com.aerospike.connect.inbound.InboundMessageTransformer;import com.aerospike.connect.inbound.model.InboundMessage;import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import javax.inject.Singleton;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;
@Singletonpublic class CDTMessageTransformer implements InboundMessageTransformer<InboundMessage<Object, Object>> {
private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());
@Override public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) { Map<String, Object> fields = input.getFields();
// Get the Aerospike key. String key = (String) fields.get("key");
if (key == null) { logger.warn("Invalid missing key"); return new AerospikeSkipRecordOperation(); }
// Aerospike key. Key aerospikeKey = new Key("used-rocket-dealership", null, key);
/* Rocket Map {
model: String; manufacturer: String thrust: Integer; price: Double; . . } */ @SuppressWarnings("unchecked") Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");
/* This rocket has just been sold by the dealership. We need to remove it from the inventory and record our profits. */
// List to hold Aerospike CDT operations. List<Operation> operations = new ArrayList<>();
/* The "inventory" bin holds the dealerships list of rockets for sale. Lets remove the rocket from the "inventory" bin. */ operations.add(ListOperation.removeByValue("inventory", Value.get(rocket), ListReturnType.NONE));
/* Now we need to update our sales record to show how many rockets have been sold, our profits. The sales record looks like this:
sales-record { list-of-sold: List<Rocket> num-rockets-sold: Integer gross-profit: Double } */ operations.add(ListOperation.append("list-of-sold", Value.get(rocket), CTX.mapKey(Value.get("sales-record")))); operations.add(MapOperation.increment(new MapPolicy(), "sales-record" , Value.get("num-rockets-sold"), Value.get(1))); operations.add(MapOperation.increment(new MapPolicy(), "sales-record", Value.get("gross_profit"), Value.get(rocket.get("profit"))));
/* Lastly, we will update the top sales person.
top-sales-person { first-name: String last-name: String } */ Map<Value, Value> topSalesPerson = new HashMap<>(); topSalesPerson.put(Value.get("first-name"), Value.get("Elon")); topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));
operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person", topSalesPerson));
return new AerospikeOperateOperation(aerospikeKey, input.getWritePolicy().orElse(null), operations, input.getIgnoreErrorCodes()); }}
Example two: Database reads during transform
Use injected AerospikeReader to perform Database reads during transform.
This example shows a more complex transformer. Here we check for an existing record by performing a read on the database, and perform conditional logic. Depending on the result of the checks, we will either create an entry or perform some commands.
public class CasCDTMessageTransformer implements InboundMessageTransformer<InboundMessage<Object, Object>> {
private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());
/** * Injected aerospike reader to read records from Aerospike. */ private final AerospikeReader aerospikeReader; /** * Inbound message transformer config for the topic against which this class is bound. */ private final InboundMessageTransformerConfig inboundMessageTransformerConfig;
@Inject public CasCDTMessageTransformer(AerospikeReader aerospikeReader, InboundMessageTransformerConfig inboundMessageTransformerConfig) { this.aerospikeReader = aerospikeReader; this.inboundMessageTransformerConfig = inboundMessageTransformerConfig; }
@Override public AerospikeRecordOperation transform( InboundMessage<Object, Object> inboundMessage) {
Map<String, Object> input = inboundMessage.getFields();
// Get the Aerospike key. Name field was sent in the JMS message Object key = input.get("name");
if (key == null) { logger.error("invalid message " + input); return new AerospikeSkipRecordOperation(); }
String newCdr = "cdr_" + System.currentTimeMillis();
// Aerospike key. Key aerospikeKey = new Key("test", null, (String) key);
Record existingRecord = null; // Read existing record. try { existingRecord = aerospikeReader.get(null, aerospikeKey); } catch (AerospikeException ae) { // Java client throws an exception if record is not found for // the key in Aerospike logger.error("Error while getting the record", ae); }
WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null); if (existingRecord == null) { List<Bin> bins = new ArrayList<>();
List<String> cdrList = new ArrayList<>(); cdrList.add(newCdr); bins.add(new Bin("cdrs", cdrList));
bins.add(new Bin("topicName", Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName"))); // Add all config fields as a Bin bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()) .entrySet() .stream() .map(e -> new Bin(e.getKey(), e.getValue())) .collect(Collectors.toList()) ); // Add all JMS message fields as a Bin bins.addAll(input .entrySet() .stream() .map(e -> new Bin(e.getKey(), e.getValue())) .collect(Collectors.toList()) ); // These error codes are sent in inboundMessage by Aerospike if you have configured them in // aerospike-jms-inbound.yml. Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes(); return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes); } else { // List of Aerospike operations. List<Operation> operations = new ArrayList<>();
// Append the CDR if the list is small, else first truncate the list. @SuppressWarnings("unchecked") List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");
int cdrMaxCapacity = 2; if (existingCdrs.size() >= cdrMaxCapacity) { // Trim the oldest records. operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1)); } // Insert new CDR to the top of the list. operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));
return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations); } }}
See these and other examples here.