Avro Serialization Format for Aerospike Kafka Source (Outbound) Connector
Specifies that the data be serialized as Avro format. The Avro schema can be a map or a record. In case of map just specifying the type of the map values is sufficient. In case of record the exact field names and types need to be specified.
Option | Required | Default | Expected value | Description |
---|---|---|---|---|
mode | yes | avro | Selects Avro format. | |
schema | no | The schema of the data. | ||
schema-file | no | The file containing the schema of the data. | ||
stringify-map-keys | no | true | Whether the numeric keys in CDT maps should be converted to strings. See stringify map keys in Avro |
One of schema
or schema-file
has to be specified.
Message metadataโ
The message metadata properties are:
Metadata | Type | Description |
---|---|---|
msg | string | Write/Delete operation. |
namespace | string | Namespace of the Aerospike record. |
set | string | Set of the Aerospike record. |
userKey | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server. |
digest | bytes | Digest of the Aerospike record. |
gen | int | Generation of the Aerospike record. |
lut | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2] |
exp | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation. |
durable | boolean | Whether the delete is durable. Present only in delete operation. |
All metadata is affected by both delete and write operations, except where the description indicates otherwise.
[1] When the Aerospike server does not ship lut
, Aerospike Kafka source (outbound) connector versions earlier than 4.0.0 ship lut
as zero.
[2] Breaking Change When the Aerospike server ships lut
, Aerospike Kafka source (outbound) connector versions earlier than 4.0.0 ship lut
as a value of the data type "integer".
Stringify Map Keys in Avroโ
Aerospike allows many data types as keys in collection data types (CDTs) Maps, but Avro only allows strings as keys in objects. As of now the source connector only permits for conversion of numeric types to string keys when converting Aerospike CDT maps.
As per Avro specification the name portion of a fullname, record field names, and enum symbols must:
- start with [A-Za-z_]
- subsequently contain only [A-Za-z0-9_]
With the restriction of names not starting with digits, when a numeric CDT Map key is converted to a string it is prefixed with an underscore. Example: If a CDT map has a key 1234, which will be converted to "1234" in the connector. Hence, in your Avro schema/schema file, prefix the numeric keys with_underscore for maps having numeric keys in Aerospike.
An Aerospike CDT map with any other data type as key will fail to be parsed
in the connector. Also an Aerospike CDT map with numeric key, but
stringify-map-keys
set to false
will fail to be parsed in the
connector.
Avro Map Schemaโ
All the metadata
are at the top level of the map. The bins of the Aerospike record are put into
the key named bins
. This map structure accommodates both write and delete
operations.
So the schema specified should be a map with values being a union of
metadata types and the Aerospike record bin types. There can be only map type in this
union schema and this map corresponds to the bins
key, into which all the
Aerospike record bins will be written.
Avro Map Fixed Schema for the Keyโ
The Aerospike record key is always written with the following fixed schema. Use the below schema to deserialize the key data.
{
"type": "map",
"values": ["long", "double", "bytes", "string"]
}
Example Avro Map output of a Key
{
"namespace": "users",
"set": "premium",
"userKey": "id123",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ="
}
Example Avro Map schema for the Valueโ
This schema is an example of the Avro Map schema
format:
mode: avro
stringify-map-keys: true
schema: |
{
"type": "map",
"values": ["int", "long", "float", "double", "bytes", "string", "boolean", { # The types of the Aerospike key and metadata.
"type": "map",
"values": ["string"] # Specifies that the Aerospike bins are all string types.
}]
}
Example Avro Map output of Value for Aerospike Writeโ
{
"msg": "write",
"namespace": "users",
"set": "premium",
"userKey": "id123",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ=",
"gen": 4,
"lut": 1617167159548,
"exp": 1682797792,
"bins": {
"color": "red",
"size": 123,
"dayMap": {
"_1": "Monday. I had numeric key in Aerospike record"
}
}
}
Example Avro Map output of Value for Aerospike Deleteโ
{
"msg": "delete",
"namespace": "users",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ",
"durable": false,
"gen": 4,
"lut": 1617167159548
}
Avro Record Schemaโ
The Avro record schema should accommodate both the write and delete operations. All the metadata are put at the top level of the record.
The bins of the Aerospike record are put into the bins
field of the record. In
case of delete the bins field is null, so, to accommodate this, the bins
field should be a union of null and the expected bin types.
Any schema specified should conform to these values.
Example Avro Record Schema for the Valueโ
{
"type": "record",
"name": "com.aerospike",
"fields": [{
"name": "bins",
"type": ["null", {
"type": "record",
"name": "AerospikeRecordBins",
"fields": [{
"name": "color",
"type": ["string"]
}]
}]
}, {
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}
]
}
Avro Record Fixed Schema for Keyโ
The schema of the key is always fixed and is
{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}