Loading
Examples of Custom Code to use Message Transformer
Example one: Operations on Collection Data Types
Use the transformer to perform List and Map operations on an incoming JMS record.
This example modifies a rocket dealership's inventory and sales record when a customer buys a rocket. It uses a parsed JMS Message by implementing InboundMessageTransformer with InboundMessage.
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Singleton
public class CDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {
private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());
@Override
public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {
Map<String, Object> fields = input.getFields();
// Get the Aerospike key.
String key = (String) fields.get("key");
if (key == null) {
logger.warn("Invalid missing key");
return new AerospikeSkipRecordOperation();
}
// Aerospike key.
Key aerospikeKey = new Key("used-rocket-dealership", null, key);
/*
Rocket Map {
model: String;
manufacturer: String
thrust: Integer;
price: Double;
.
.
}
*/
@SuppressWarnings("unchecked")
Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");
/*
This rocket has just been sold by the dealership. We need to
remove it from the inventory and record our profits.
*/
// List to hold Aerospike CDT operations.
List<Operation> operations = new ArrayList<>();
/*
The "inventory" bin holds the dealerships list of rockets for sale.
Lets remove the rocket from the "inventory" bin.
*/
operations.add(ListOperation.removeByValue("inventory",
Value.get(rocket), ListReturnType.NONE));
/*
Now we need to update our sales record to show how many rockets have
been sold, our profits. The sales record looks like this:
sales-record {
list-of-sold: List<Rocket>
num-rockets-sold: Integer
gross-profit: Double
}
*/
operations.add(ListOperation.append("list-of-sold", Value.get(rocket),
CTX.mapKey(Value.get("sales-record"))));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record"
, Value.get("num-rockets-sold"), Value.get(1)));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record",
Value.get("gross_profit"), Value.get(rocket.get("profit"))));
/*
Lastly, we will update the top sales person.
top-sales-person {
first-name: String
last-name: String
}
*/
Map<Value, Value> topSalesPerson = new HashMap<>();
topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));
topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));
operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",
topSalesPerson));
return new AerospikeOperateOperation(aerospikeKey, input.getWritePolicy().orElse(null), operations,
input.getIgnoreErrorCodes());
}
}
Example two: Database reads during transform
Use injected AerospikeReader to perform Database reads during transform.
This example shows a more complex transformer. Here we check for an existing record by performing a read on the database, and perform conditional logic. Depending on the result of the checks, we will either create an entry or perform some operations.
public class CasCDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {
private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());
/**
* Injected aerospike reader to read records from Aerospike.
*/
private final AerospikeReader aerospikeReader;
/**
* Inbound message transformer config for the topic against which this class is bound.
*/
private final InboundMessageTransformerConfig inboundMessageTransformerConfig;
@Inject
public CasCDTMessageTransformer(AerospikeReader aerospikeReader,
InboundMessageTransformerConfig inboundMessageTransformerConfig) {
this.aerospikeReader = aerospikeReader;
this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;
}
@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, Object> inboundMessage) {
Map<String, Object> input = inboundMessage.getFields();
// Get the Aerospike key. Name field was sent in the JMS message
Object key = input.get("name");
if (key == null) {
logger.error("invalid message " + input);
return new AerospikeSkipRecordOperation();
}
String newCdr = "cdr_" + System.currentTimeMillis();
// Aerospike key.
Key aerospikeKey = new Key("test", null, (String) key);
Record existingRecord = null;
// Read existing record.
try {
existingRecord = aerospikeReader.get(null, aerospikeKey);
} catch (AerospikeException ae) {
// Java client throws an exception if record is not found for
// the key in Aerospike
logger.error("Error while getting the record", ae);
}
WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);
if (existingRecord == null) {
List<Bin> bins = new ArrayList<>();
List<String> cdrList = new ArrayList<>();
cdrList.add(newCdr);
bins.add(new Bin("cdrs", cdrList));
bins.add(new Bin("topicName",
Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));
// Add all config fields as a Bin
bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// Add all JMS message fields as a Bin
bins.addAll(input
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// These error codes are sent in inboundMessage by Aerospike if you have configured them in
// aerospike-jms-inbound.yml.
Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();
return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes);
} else {
// List of Aerospike operations.
List<Operation> operations = new ArrayList<>();
// Append the CDR if the list is small, else first truncate the list.
@SuppressWarnings("unchecked")
List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");
int cdrMaxCapacity = 2;
if (existingCdrs.size() >= cdrMaxCapacity) {
// Trim the oldest records.
operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));
}
// Insert new CDR to the top of the list.
operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));
return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations);
}
}
}
See these and other examples here.