Kafka Avro Serialization Format for Aerospike Kafka Source (Outbound) Connector
Kafka Avro Serialization Format specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry
Option | Required | Default | Expected value | Description |
---|---|---|---|---|
mode | yes | kafka-avro | Selects Kafka Avro format. | |
schema | no | The schema of the data. | ||
schema-file | no | The file containing the schema of the data. | ||
registry-topic | no | The topic name used to generate the schema name in the schema registry. | ||
metadata-key | no | The metadata will be inserted into this field of the record if specified, else metadata wont be included in writes. | ||
props | yes | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig. | ||
stringify-map-keys | no | true | Whether the numeric keys in CDT maps should be converted to strings. See stringify map keys |
The writes and deletes are written with different schemas. Since only a single
schema can be registered with TopicNameStrategy
for a topic, this conflicts with
accommodating both write and delete schemas. Hence TopicNameStrategy
strategy is
disallowed for value.subject.name.strategy
value in the props.
Schema Registry
The schemas for the records are maintained in a schema registry. The names of
the schemas are generated as specified by the value.subject.name.strategy
value in the props. See subject name strategy
The valid strategies are:
io.confluent.kafka.serializers.subject.RecordNameStrategy
: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.
WARNING Ensure that the same schema name is not generated for different schemas in the config.
Message metadata
The message metadata properties are:
Metadata | Type | Description |
---|---|---|
msg | string | Write/Delete operation. |
namespace | string | Namespace of the Aerospike record. |
set | string | Set of the Aerospike record. |
userKey | long, double, bytes or string | The user key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server. |
digest | bytes | The digest of the Aerospike record. |
gen | int | The generation of the Aerospike record. |
lut | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2] |
exp | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation. |
durable | boolean | Whether the delete is durable. Present only in delete operation. |
All metadata is affected by both delete and write operations, except where the description indicates otherwise.
[1] When the Aerospike server does not ship lut
, then the following version of this outbound connector ship lut
as zero:
- Kafka outbound connector, versions earlier than 4.0.0
[2] Breaking Change When the Aerospike server ships lut
, then the following version of this outbound connector ship lut
as a value of the data type "integer":
- Kafka source connector, versions earlier than 4.0.0
Fixed Schema for Metadata
Schema for the metadata.
{
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}
]
}
LUT It is available whenever Aerospike server ships last-update time.
Fixed Schema for Delete
All deletes are written with the metadata schema.
Breaking Change Deletes are shipped with this deprecated schema in Kafka source connector versions earlier than 4.0.0.
- Kafka outbound connector, versions earlier than 4.0.0
{
"type": "record",
"name": "AerospikeOutboundDelete",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": "boolean"
}
]
}
Fixed Schema for Kafka Key
The schema of the Kafka key is always the following schema
{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}
Examples
Example Kafka Avro Schema without metadata
format:
mode: kafka-avro
registry-topic: users
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}]
}
Example Kafka Avro Schema with metadata
format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithMetadata",
"fields": [{
"name": "abc",
"type": "string"
}, {
"name": "metadata",
"type": {
"type": "record",
"name": "com.aerospike.metadata",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"type": ["null", "int"],
"default": null
}]
}
}]
}