Kafka Avro serialization format for Aerospike Kafka source (outbound) connector
Kafka Avro Serialization Format specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry
Option | Required | Default | Expected value | Description |
---|---|---|---|---|
mode | yes | kafka-avro | Selects Kafka Avro format. | |
schema | no | The schema of the data. | ||
schema-file | no | The file containing the schema of the data. | ||
registry-topic | no | The topic name used to generate the schema name in the schema registry. | ||
metadata-key | no | The metadata will be inserted into this field of the record if specified, else metadata wont be included in writes. | ||
props | yes | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig. | ||
stringify-map-keys | no | true | Whether the numeric keys in CDT maps should be converted to strings. See stringify map keys |
The writes and deletes are written with different schemas. Since only a single
schema can be registered with TopicNameStrategy
for a topic, this conflicts with
accommodating both write and delete schemas. Hence TopicNameStrategy
strategy is
disallowed for value.subject.name.strategy
value in the props.
Schema Registry
The schemas for the records are maintained in a schema registry. The names of
the schemas are generated as specified by the value.subject.name.strategy
value in the props. See subject name strategy
The valid strategies are:
io.confluent.kafka.serializers.subject.RecordNameStrategy
: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.
WARNING Ensure that the same schema name is not generated for different schemas in the config.
Message metadata
The message metadata properties are:
Metadata | Type | Description |
---|---|---|
msg | string | Write/Delete command. |
namespace | string | Namespace of the Aerospike record. |
set | string | Set of the Aerospike record. |
userKey | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write command and the user key is stored on the Aerospike server. |
digest | bytes | Digest of the Aerospike record. |
gen | int | Generation of the Aerospike record. |
lut | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2] |
exp | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write commands. |
durable | boolean | Whether the delete is durable. Present only in delete commands. |
[1] When the Aerospike server does not ship lut
, then the following version of this outbound connector ship lut
as zero:
- Kafka outbound connector, versions earlier than Database 4.0.
[2] Breaking Change When the Aerospike server ships lut
, then the following version of this outbound connector ship lut
as a value of the data type “integer”:
- Kafka source connector, versions earlier than Database 4.0.
Fixed Schema for Metadata
Schema for the metadata.
{ "type": "record", "name": "AerospikeOutboundMetadata", "namespace": "com.aerospike.connect", "fields": [{ "name": "namespace", "type": "string" }, { "name": "set", "type": ["null", "string"], "default": null }, { "name": "userKey", "type": ["null", "long", "double", "bytes", "string"], "default": null }, { "name": "digest", "type": "bytes" }, { "name": "msg", "type": "string" }, { "name": "durable", "type": ["null", "boolean"], "default": null }, { "name": "gen", "type": ["null", "int"], "default": null }, { "name": "exp", "type": ["null", "int"], "default": null }, { "name": "lut", "type": ["null", "long"], "default": null } ]}
Fixed Schema for Delete
All deletes are written with the metadata schema.
Fixed Schema for Kafka Key
The schema of the Kafka key is always the following schema
{ "type": "record", "name": "AerospikeOutboundKey", "namespace": "com.aerospike.connect", "fields": [{ "name": "namespace", "type": "string" }, { "name": "userKey", "type": ["null", "long", "double", "bytes", "string"], "default": null }, { "name": "set", "type": ["null", "string"], "default": null }, { "name": "digest", "type": "bytes" }]}
Examples
Example Kafka Avro Schema without metadata
format: mode: kafka-avro registry-topic: users props: schema.registry.url: http://192.168.50.5:8081 value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" schema: | { "type": "record", "name": "RecordWithoutMetadata", "fields": [{ "name": "abc", "type": "string" }] }
Example Kafka Avro Schema with metadata
format: mode: kafka-avro registry-topic: users metadata-key: metadata props: schema.registry.url: http://192.168.50.5:8081 value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" schema: | { "type": "record", "name": "RecordWithMetadata", "fields": [{ "name": "abc", "type": "string" }, { "name": "metadata", "type": { "type": "record", "name": "com.aerospike.metadata", "fields": [{ "name": "namespace", "type": "string" }, { "name": "userKey", "type": ["null", "long", "double", "bytes", "string"], "default": null }, { "name": "set", "type": ["null", "string"], "default": null }, { "name": "digest", "type": "bytes" }, { "name": "msg", "type": "string" }, { "name": "gen", "type": ["null", "int"], "default": null }, { "name": "lut", "type": ["null", "long"], "default": null }, { "type": ["null", "int"], "default": null }] } }] }