# Kafka Avro serialization format for Aerospike Kafka source (outbound) connector

Kafka Avro Serialization Format specifies that the data be serialized with [KafkaAvroSerializer](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java). The schema is expected to be maintained in a [schema registry](https://docs.confluent.io/current/schema-registry/index.html)

| Option | Required | Default | Expected value | Description |
| --- | --- | --- | --- | --- |
| mode | yes |  | kafka-avro | Selects Kafka Avro format. |
| schema | no |  |  | The schema of the data. |
| schema-file | no |  |  | The file containing the schema of the data. |
| registry-topic | no |  |  | The topic name used to generate the schema name in the [schema registry](#schema-registry). |
| metadata-key | no |  |  | The metadata will be inserted into this field of the record if specified, else metadata won’t be included in writes. |
| props | yes |  |  | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in [KafkaAvroSerializerConfig](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java). |
| stringify-map-keys | no | true |  | Whether the numeric keys in CDT maps should be converted to strings. See [stringify map keys](https://aerospike.com/docs/connectors/streaming/kafka/outbound/formats/avro-serialization-format#stringify-map-keys-in-avro) |

The writes and deletes are written with different schemas. Since only a single schema can be registered with `TopicNameStrategy` for a topic, this conflicts with accommodating both write and delete schemas. Hence `TopicNameStrategy` strategy is disallowed for `value.subject.name.strategy` value in the props.

## Schema Registry

The schemas for the records are maintained in a schema registry. The names of the schemas are generated as specified by the `value.subject.name.strategy` value in the props. See [subject name strategy](https://docs.confluent.io/4.1.0/schema-registry/docs/serializer-formatter.html#subject-name-strategy)

The valid strategies are:

-   `io.confluent.kafka.serializers.subject.RecordNameStrategy`: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.
-   `io.confluent.kafka.serializers.subject.TopicRecordNameStrategy`: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.

**WARNING** Ensure that the same schema name is not generated for different schemas in the config.

## Message metadata

The message metadata properties are:

| Metadata | Type | Description |
| --- | --- | --- |
| `msg` | string | Write/Delete command. |
| `namespace` | string | Namespace of the Aerospike record. |
| `set` | string | Set of the Aerospike record. |
| `userKey` | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write command and the user key is stored on the Aerospike server. |
| `digest` | bytes | Digest of the Aerospike record. |
| `gen` | int | Generation of the Aerospike record. |
| `lut` | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. \[1\]\[2\] |
| `exp` | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write commands. |
| `durable` | boolean | Whether the delete is durable. Present only in delete commands. |

::: note
All metadata is affected by both delete and write commands, except where the description indicates otherwise.
:::

\[1\] When the Aerospike server does not ship `lut`, then the following version of this outbound connector ships `lut` as zero:

-   Kafka outbound connector, versions earlier than Database 4.0.

\[2\] **Breaking Change** When the Aerospike server ships `lut`, then the following version of this outbound connector ship `lut` as a value of the data type “integer”:

-   Kafka source connector, versions earlier than Database 4.0.

## Fixed Schema for Metadata

Schema for the metadata.

```json
{

  "type": "record",

  "name": "AerospikeOutboundMetadata",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

    }, {

      "name": "msg",

      "type": "string"

    }, {

      "name": "durable",

      "type": ["null", "boolean"],

      "default": null

    }, {

      "name": "gen",

      "type": ["null", "int"],

      "default": null

    }, {

      "name": "exp",

      "type": ["null", "int"],

      "default": null

    }, {

      "name": "lut",

      "type": ["null", "long"],

      "default": null

    }

  ]

}
```

::: note
LUT It is available whenever Aerospike server ships last-update time.
:::

## Fixed Schema for Delete

All deletes are written with the [metadata schema](https://aerospike.com/docs/connectors/streaming/kafka/outbound/formats/kafka-avro-serialization-format/#fixed-schema-for-metadata).

::: note
**Breaking Change** Deletes are shipped with this deprecated schema in Kafka source connector versions earlier than 4.0.0.

-   Kafka outbound connector, versions earlier than Database 4.0.

```json
{

  "type": "record",

  "name": "AerospikeOutboundDelete",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

    }, {

      "name": "msg",

      "type": "string"

    }, {

      "name": "durable",

      "type": "boolean"

    }

  ]

}
```
:::

## Fixed Schema for Kafka Key

The schema of the Kafka key is always the following schema

```json
{

  "type": "record",

  "name": "AerospikeOutboundKey",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

  }]

}
```

## Examples

#### Example Kafka Avro Schema without metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "RecordWithoutMetadata",

      "fields": [{

         "name": "abc",

         "type": "string"

       }]

    }
```

#### Example Kafka Avro Schema with metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  metadata-key: metadata

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "RecordWithMetadata",

      "fields": [{

         "name": "abc",

         "type": "string"

         }, {

         "name": "metadata",

         "type": {

           "type": "record",

           "name": "com.aerospike.metadata",

           "fields": [{

             "name": "namespace",

             "type": "string"

            }, {

             "name": "userKey",

             "type": ["null", "long", "double", "bytes", "string"],

             "default": null

            }, {

             "name": "set",

             "type": ["null", "string"],

             "default": null

            }, {

             "name": "digest",

             "type": "bytes"

            }, {

             "name": "msg",

             "type": "string"

            }, {

             "name": "gen",

             "type": ["null", "int"],

             "default": null

            }, {

             "name": "lut",

             "type": ["null", "long"],

             "default": null

            }, {

             "type": ["null", "int"],

             "default": null

            }]

        }

     }]

    }
```