Kafka Avro Serialization Format
Specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry
Option | Required | Default | Expected value | Description |
---|---|---|---|---|
mode | yes | kafka-avro | Selects Kafka Avro format. | |
schema | no | The schema of the data. | ||
schema-file | no | The file containing the schema of the data. | ||
registry-topic | no | The topic name used to generate the schema name in the schema registry. | ||
metadata-key | no | The metadata will be inserted into this field of the record if specified, else metadata wont be included in writes. | ||
props | yes | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig. | ||
stringify-map-keys | no | true | Whether the numeric keys in CDT maps should be converted to strings. See stringify map keys |
The writes and deletes are written with different schemas. Since only a single
schema can be registered with TopicNameStrategy
for a topic, this conflicts with
accommodating both write and delete schemas. Hence TopicNameStrategy
strategy is
disallowed for value.subject.name.strategy
value in the props.
Schema Registryโ
The schemas for the records are maintained in a schema registry. The names of
the schemas are generated as specified by the value.subject.name.strategy
value in the props. See subject name strategy
The valid strategies are
io.confluent.kafka.serializers.subject.RecordNameStrategy
: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.
WARNING Ensure that the same schema name is not generated for different schemas in the config.
Message metadataโ
The message metadata properties are:
Metadata | Type | Description |
---|---|---|
msg | string | Write/Delete operation. |
namespace | string | Namespace of the Aerospike record. |
set | string | Set of the Aerospike record. |
userKey | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server. |
digest | bytes | Digest of the Aerospike record. |
gen | int | Generation of the Aerospike record. |
lut | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2] |
exp | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation. |
durable | boolean | Whether the delete is durable. Present only in delete operation. |
All metadata is affected by both delete and write operations, except where the description indicates otherwise.
[1] When the Aerospike server does not ship lut
, then the following versions of these outbound connectors ship lut
as zero:
- JMS outbound connector, versions prior to 3.0.0
- Kafka outbound connector, versions prior to 4.0.0
- Pulsar outbound connector, versions prior to 2.0.0
[2] Breaking Change When the Aerospike server ships lut
, then the following versions of these outbound connectors ship lut
as a value of the data type "integer":
- JMS outbound connector, versions prior to 3.0.0
- Kafka outbound connector, versions prior to 4.0.0
- Pulsar outbound connector, versions prior to 2.0.0
Kafka Avro Fixed Schema for Metadataโ
Schema for the metadata.
{
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}
]
}
LUT It is available whenever Aerospike server ships last-update time.
Kafka Avro Fixed Schema for Deleteโ
All deletes are written with the metadata schema.
Breaking Change Deletes are shipped with this deprecated schema in the following versions of the outbound connectors
- JMS outbound connector, versions earlier than 3.0.0
- Kafka outbound connector, versions earlier than 4.0.0
- Pulsar outbound connector, versions earlier than 2.0.0
{
"type": "record",
"name": "AerospikeOutboundDelete",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": "boolean"
}
]
}
Kafka Avro Fixed Schema for Kafka Keyโ
The schema of the Kafka key is always the following schema
{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}
Example Kafka Avro Schema without metadataโ
format:
mode: kafka-avro
registry-topic: users
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}]
}
Example Kafka Avro Schema with metadataโ
format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithMetadata",
"fields": [{
"name": "abc",
"type": "string"
}, {
"name": "metadata",
"type": {
"type": "record",
"name": "com.aerospike.metadata",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null,
"doc": "durable delete"
}]
}
}]
}
Batching of record in Kafka Avro formatโ
Groups of records can be batched into a single Kafka Avro record by enabling batching in the configuration file. Writes and deletes are formatted into separate batches of Kafka Avro records. The schemas for the batches follow.
Batch Kafka Avro Fixed Schema for Deleteโ
All deletes are batched together into a single record with the following schema.
{
"type": "record",
"name": "AerospikeOutboundBatchDeletes",
"namespace": "com.aerospike.connect",
"doc": " Aerospike schema for batch deletes",
"fields": [
{
"name": "deletes",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "AerospikeOutboundMetadata",
"doc": "Aerospike metadata key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"doc": "set",
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"doc": "user key",
"default": null
},
{
"name": "digest",
"type": "bytes"
},
{
"name": "msg",
"type": "string"
},
{
"name": "durable",
"type": [
"null",
"boolean"
],
"doc": "durable delete",
"default": null
},
{
"name": "gen",
"type": [
"null",
"int"
],
"doc": "generation",
"default": null
},
{
"name": "lut",
"type": [
"null",
"long"
],
"doc": "lut",
"default": null
},
{
"name": "exp",
"type": [
"null",
"int"
],
"doc": "expiry",
"default": null
}
]
}
}
}
]
}
Batch Kafka Avro Fixed Schema for Kafka Keysโ
If the keys of a batch are configured to be concatenated, then the Kafka key is written with the following schema
{
"type": "record",
"name": "AerospikeOutboundBatchKeys",
"namespace": "com.aerospike.connect",
"doc": "Aerospike schema for keys in a batch",
"fields": [
{
"name": "keys",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "AerospikeOutboundKey",
"doc": "Aerospike key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"doc": "user key",
"default": null
},
{
"name": "digest",
"type": "bytes"
}
]
}
}
}
]
}
Batch Kafka Avro Schema for Writeโ
All batch writes can be written with or without metadata for each record. The schema should have a single field of array type which is an array of Kafka Avro records. The schema of the array field should match the shape of the individual records.
Example Batch Kafka Avro Schema with metadataโ
format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "BatchOfRecordsWithMetadata",
"namespace": "com.aerospike",
"fields": [
{
"name": "ArrayOfRecords",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "RecordWithMetadata",
"fields": [
{
"name": "abc",
"type": "string",
},
{
"name": "metadata",
"type": {
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"doc": "Aerospike metadata key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"default": null
},
{
"name": "digest",
"type": "bytes"
},
{
"name": "msg",
"type": "string"
"type": "Write or Delete operation"
},
{
"name": "gen",
"type": [
"null",
"int"
],
"doc": "generation",
"default": null
},
{
"name": "lut",
"type": [
"null",
"long"
],
"doc": "lut",
"default": null
},
{
"name": "exp",
"type": [
"null",
"int"
],
"doc": "expiry",
"default": null
},
{
"name": "durable",
"type": [
"null",
"boolean"
],
"doc": "durable delete",
"default": null
}
]
}
}
]
}
}
}
]
}
Example Batch Kafka Avro Schema without metadataโ
format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "BatchOfRecordsWithoutMetadata",
"namespace": "com.aerospike",
"doc": "",
"fields": [
{
"name": "ArrayOfRecords",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}
]
}
}
}
]
}