Kafka Avro Serialization Format for Aerospike Kafka Source (Outbound) Connector
Kafka Avro Serialization Format specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry
Option | Required | Default | Expected value | Description |
---|---|---|---|---|
mode | yes | kafka-avro | Selects Kafka Avro format. | |
schema | no | The schema of the data. | ||
schema-file | no | The file containing the schema of the data. | ||
registry-topic | no | The topic name used to generate the schema name in the schema registry. | ||
metadata-key | no | The metadata will be inserted into this field of the record if specified, else metadata wont be included in writes. | ||
props | yes | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig. | ||
stringify-map-keys | no | true | Whether the numeric keys in CDT maps should be converted to strings. See stringify map keys |
The writes and deletes are written with different schemas. Since only a single
schema can be registered with TopicNameStrategy
for a topic, this conflicts with
accommodating both write and delete schemas. Hence TopicNameStrategy
strategy is
disallowed for value.subject.name.strategy
value in the props.
Schema Registryโ
The schemas for the records are maintained in a schema registry. The names of
the schemas are generated as specified by the value.subject.name.strategy
value in the props. See subject name strategy
The valid strategies are:
io.confluent.kafka.serializers.subject.RecordNameStrategy
: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.
WARNING Ensure that the same schema name is not generated for different schemas in the config.
Message metadataโ
The message metadata properties are:
Metadata | Type | Description |
---|---|---|
msg | string | Write/Delete operation. |
namespace | string | Namespace of the Aerospike record. |
set | string | Set of the Aerospike record. |
userKey | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server. |
digest | bytes | Digest of the Aerospike record. |
gen | int | Generation of the Aerospike record. |
lut | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2] |
exp | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation. |
durable | boolean | Whether the delete is durable. Present only in delete operation. |
All metadata is affected by both delete and write operations, except where the description indicates otherwise.
[1] When the Aerospike server does not ship lut
, then the following version of this outbound connector ship lut
as zero:
- Kafka outbound connector, versions earlier than Database 4.0.
[2] Breaking Change When the Aerospike server ships lut
, then the following version of this outbound connector ship lut
as a value of the data type "integer":
- Kafka source connector, versions earlier than Database 4.0.
Fixed Schema for Metadataโ
Schema for the metadata.
{
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}
]
}
LUT It is available whenever Aerospike server ships last-update time.
Fixed Schema for Deleteโ
All deletes are written with the metadata schema.
Breaking Change Deletes are shipped with this deprecated schema in Kafka source connector versions earlier than 4.0.0.
- Kafka outbound connector, versions earlier than Database 4.0.
{
"type": "record",
"name": "AerospikeOutboundDelete",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": "boolean"
}
]
}
Fixed Schema for Kafka Keyโ
The schema of the Kafka key is always the following schema
{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}
Examplesโ
Example Kafka Avro Schema without metadataโ
format:
mode: kafka-avro
registry-topic: users
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}]
}
Example Kafka Avro Schema with metadataโ
format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithMetadata",
"fields": [{
"name": "abc",
"type": "string"
}, {
"name": "metadata",
"type": {
"type": "record",
"name": "com.aerospike.metadata",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"type": ["null", "int"],
"default": null
}]
}
}]
}