# Kafka Avro serialization format

Specifies that the data be serialized with [KafkaAvroSerializer](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java). The schema is expected to be maintained in a schema registry.

| Option | Required | Default | Expected value | Description |
| --- | --- | --- | --- | --- |
| mode | yes |  | kafka-avro | Selects Kafka Avro format. |
| schema | no |  |  | Schema of the data. |
| schema-file | no |  |  | File containing the schema of the data. |
| registry-topic | no |  |  | Topic name used to generate the schema name in the schema registry. See [Kafka Avro Serialization Format](#schema-registry). |
| metadata-key | no |  |  | Metadata will be inserted into this field of the record if specified, else metadata won’t be included in writes. |
| props | yes |  |  | Map of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in [KafkaAvroSerializerConfig](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java). |
| stringify-map-keys | no | true |  | Indicates whether the numeric keys in CDT maps should be converted to strings. See [Avro Serialization Format](https://aerospike.com/docs/connectors/streaming/common/formats/avro-serialization-format#stringify-map-keys-in-avro). |

The writes and deletes are written with different schemas. Since only a single schema can be registered with `TopicNameStrategy` for a topic, this conflicts with accommodating both write and delete schemas. Hence `TopicNameStrategy` strategy is disallowed for `value.subject.name.strategy` value in the props.

## Schema Registry

The schemas for the records are maintained in a schema registry. The names of the schemas are generated as specified by the `value.subject.name.strategy` value in the props.

The valid strategies are

-   `io.confluent.kafka.serializers.subject.RecordNameStrategy`: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.
-   `io.confluent.kafka.serializers.subject.TopicRecordNameStrategy`: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.

**WARNING** Ensure that the same schema name is not generated for different schemas in the config.

## Message metadata

The message metadata properties are:

| Metadata | Type | Description |
| --- | --- | --- |
| `msg` | string | Write/Delete command. |
| `namespace` | string | Namespace of the Aerospike record. |
| `set` | string | Set of the Aerospike record. |
| `userKey` | long, double, bytes or string | User key of the Aerospike record. Present only if it is a write command and the user key is stored on the Aerospike server. |
| `digest` | bytes | Digest of the Aerospike record. |
| `gen` | int | Generation of the Aerospike record. |
| `lut` | long | Time when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. \[1\]\[2\] |
| `exp` | int | Time when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write commands. |
| `durable` | boolean | Whether the delete is durable. Present only in delete operation. |

All metadata is affected by both delete and write commands, except where the description indicates otherwise.

\[1\] When the Aerospike server does not ship `lut`, then the following versions of these outbound connectors ship `lut` as zero:

-   JMS outbound connector, versions prior to 3.0.0
-   Kafka outbound connector, versions prior to 4.0.0
-   Pulsar outbound connector, versions prior to 2.0.0

\[2\] **Breaking Change** When the Aerospike server ships `lut`, then the following versions of these outbound connectors ship `lut` as a value of the data type “integer”:

-   JMS outbound connector, versions prior to 3.0.0
-   Kafka outbound connector, versions prior to 4.0.0
-   Pulsar outbound connector, versions prior to 2.0.0

## Kafka Avro Fixed Schema for Metadata

Schema for the metadata.

```json
{

  "type": "record",

  "name": "AerospikeOutboundMetadata",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

    }, {

      "name": "msg",

      "type": "string"

    }, {

      "name": "durable",

      "type": ["null", "boolean"],

      "default": null

    }, {

      "name": "gen",

      "type": ["null", "int"],

      "default": null

    }, {

      "name": "exp",

      "type": ["null", "int"],

      "default": null

    }, {

      "name": "lut",

      "type": ["null", "long"],

      "default": null

    }

  ]

}
```

::: note
LUT It is available whenever Aerospike server ships last-update time.
:::

## Kafka Avro Fixed Schema for Delete

All deletes are written with the [metadata schema](https://aerospike.com/docs/connectors/streaming/common/formats/kafka-avro-serialization-format/#kafka-avro-fixed-schema-for-metadata).

::: note
**Breaking Change** Deletes are shipped with this deprecated schema in the following versions of the outbound connectors

-   JMS outbound connector, versions earlier than 3.0.0
-   Kafka outbound connector, versions earlier than 4.0.0
-   Pulsar outbound connector, versions earlier than 2.0.0

```json
{

  "type": "record",

  "name": "AerospikeOutboundDelete",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

    }, {

      "name": "msg",

      "type": "string"

    }, {

      "name": "durable",

      "type": "boolean"

    }

  ]

}
```
:::

## Kafka Avro Fixed Schema for Kafka Key

The schema of the Kafka key is always the following schema

```json
{

  "type": "record",

  "name": "AerospikeOutboundKey",

  "namespace": "com.aerospike.connect",

  "fields": [{

      "name": "namespace",

      "type": "string"

    }, {

      "name": "userKey",

      "type": ["null", "long", "double", "bytes", "string"],

      "default": null

    }, {

      "name": "set",

      "type": ["null", "string"],

      "default": null

    }, {

      "name": "digest",

      "type": "bytes"

  }]

}
```

## Example Kafka Avro Schema without metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "RecordWithoutMetadata",

      "fields": [{

         "name": "abc",

         "type": "string"

       }]

    }
```

## Example Kafka Avro Schema with metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  metadata-key: metadata

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "RecordWithMetadata",

      "fields": [{

         "name": "abc",

         "type": "string"

         }, {

         "name": "metadata",

         "type": {

           "type": "record",

           "name": "com.aerospike.metadata",

           "fields": [{

             "name": "namespace",

             "type": "string"

            }, {

             "name": "userKey",

             "type": ["null", "long", "double", "bytes", "string"],

             "default": null

            }, {

             "name": "set",

             "type": ["null", "string"],

             "default": null

            }, {

             "name": "digest",

             "type": "bytes"

            }, {

             "name": "msg",

             "type": "string"

            }, {

             "name": "gen",

             "type": ["null", "int"],

             "default": null

            }, {

             "name": "lut",

             "type": ["null", "long"],

             "default": null

            }, {

             "name": "exp",

             "type": ["null", "int"],

             "default": null

            }, {

             "name": "durable",

             "type": ["null", "boolean"],

             "default": null,

             "doc": "durable delete"

            }]

        }

     }]

    }
```

## Batching of record in Kafka Avro format

Groups of records can be batched into a single Kafka Avro record by enabling batching in the configuration file. Writes and deletes are formatted into separate batches of Kafka Avro records. The schemas for the batches follow.

### Batch Kafka Avro Fixed Schema for Delete

All deletes are batched together into a single record with the following schema.

```json
{

  "type": "record",

  "name": "AerospikeOutboundBatchDeletes",

  "namespace": "com.aerospike.connect",

  "doc": " Aerospike schema for batch deletes",

  "fields": [

    {

      "name": "deletes",

      "type": {

        "type": "array",

        "items": {

          "type": "record",

          "name": "AerospikeOutboundMetadata",

          "doc": "Aerospike metadata key schema",

          "fields": [

            {

              "name": "namespace",

              "type": "string"

            },

            {

              "name": "set",

              "type": [

                "null",

                "string"

              ],

              "doc": "set",

              "default": null

            },

            {

              "name": "userKey",

              "type": [

                "null",

                "long",

                "double",

                "bytes",

                "string"

              ],

              "doc": "user key",

              "default": null

            },

            {

              "name": "digest",

              "type": "bytes"

            },

            {

              "name": "msg",

              "type": "string"

            },

            {

              "name": "durable",

              "type": [

                "null",

                "boolean"

              ],

              "doc": "durable delete",

              "default": null

            },

            {

              "name": "gen",

              "type": [

                "null",

                "int"

              ],

              "doc": "generation",

              "default": null

            },

            {

              "name": "lut",

              "type": [

                "null",

                "long"

              ],

              "doc": "lut",

              "default": null

            },

            {

              "name": "exp",

              "type": [

                "null",

                "int"

              ],

              "doc": "expiry",

              "default": null

            }

          ]

        }

      }

    }

  ]

}
```

### Batch Kafka Avro Fixed Schema for Kafka Keys

If the keys of a batch are configured to be concatenated, then the Kafka key is written with the following schema

```json
{

  "type": "record",

  "name": "AerospikeOutboundBatchKeys",

  "namespace": "com.aerospike.connect",

  "doc": "Aerospike schema for keys in a batch",

  "fields": [

    {

      "name": "keys",

      "type": {

        "type": "array",

        "items": {

          "type": "record",

          "name": "AerospikeOutboundKey",

          "doc": "Aerospike key schema",

          "fields": [

            {

              "name": "namespace",

              "type": "string"

            },

            {

              "name": "set",

              "type": [

                "null",

                "string"

              ],

              "default": null

            },

            {

              "name": "userKey",

              "type": [

                "null",

                "long",

                "double",

                "bytes",

                "string"

              ],

              "doc": "user key",

              "default": null

            },

            {

              "name": "digest",

              "type": "bytes"

            }

          ]

        }

      }

    }

  ]

}
```

### Batch Kafka Avro Schema for Write

All batch writes can be written with or without metadata for each record. The schema should have a single field of array type which is an array of Kafka Avro records. The schema of the array field should match the shape of the individual records.

#### Example Batch Kafka Avro Schema with metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  metadata-key: metadata

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "BatchOfRecordsWithMetadata",

      "namespace": "com.aerospike",

      "fields": [

        {

          "name": "ArrayOfRecords",

          "type": {

            "type": "array",

            "items": {

              "type": "record",

              "name": "RecordWithMetadata",

              "fields": [

                {

                  "name": "abc",

                  "type": "string",

                },

                {

                  "name": "metadata",

                  "type": {

                    "type": "record",

                    "name": "AerospikeOutboundMetadata",

                    "namespace": "com.aerospike.connect",

                    "doc": "Aerospike metadata key schema",

                    "fields": [

                      {

                        "name": "namespace",

                        "type": "string"

                      },

                      {

                        "name": "set",

                        "type": [

                          "null",

                          "string"

                        ],

                        "default": null

                      },

                      {

                        "name": "userKey",

                        "type": [

                          "null",

                          "long",

                          "double",

                          "bytes",

                          "string"

                        ],

                        "default": null

                      },

                      {

                        "name": "digest",

                        "type": "bytes"

                      },

                      {

                        "name": "msg",

                        "type": "string"

                        "type": "Write or Delete operation"

                      },

                      {

                        "name": "gen",

                        "type": [

                          "null",

                          "int"

                        ],

                        "doc": "generation",

                        "default": null

                      },

                      {

                        "name": "lut",

                        "type": [

                          "null",

                          "long"

                        ],

                        "doc": "lut",

                        "default": null

                      },

                      {

                        "name": "exp",

                        "type": [

                          "null",

                          "int"

                        ],

                        "doc": "expiry",

                        "default": null

                      },

                      {

                        "name": "durable",

                        "type": [

                          "null",

                          "boolean"

                        ],

                        "doc": "durable delete",

                        "default": null

                      }

                    ]

                  }

                }

              ]

            }

          }

        }

      ]

    }
```

#### Example Batch Kafka Avro Schema without metadata

```yaml
format:

  mode: kafka-avro

  registry-topic: users

  metadata-key: metadata

  props:

    schema.registry.url: http://192.168.50.5:8081

    value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

  schema: |

    {

      "type": "record",

      "name": "BatchOfRecordsWithoutMetadata",

      "namespace": "com.aerospike",

      "doc": "",

      "fields": [

        {

          "name": "ArrayOfRecords",

          "type": {

            "type": "array",

            "items": {

              "type": "record",

              "name": "RecordWithoutMetadata",

              "fields": [{

                  "name": "abc",

                  "type": "string"

                }

              ]

            }

          }

        }

      ]

    }
```