Specify Kafka topics
Configure the topics
section of the aerospike-kafka-inbound.yml
file with the Kafka topics
the connector will consume messages from, and how the data in the messages corresponds to the
data in your Aerospike database.
Example
Here is an example of a topics
section that specifies two topics to consume from. The names of the topics are users
and products
.
topics: users: invalid-record: ignore mapping: namespace: mode: static value: users set: mode: dynamic source: value-field field-name: city key-field: source: key ttl: mode: dynamic source: value-field field-name: ttl bins: type: multi-bins map: name: source: value-field field-name: firstName products: invalid-record: kill-task mapping: namespace: mode: dynamic source: value-field field-name: category set: mode: dynamic source: value-field field-name: brand digest: source: value-field field-name: digest ttl: mode: dynamic source: value-field field-name: ttl bins: type: multi-bins map: name: source: value-field field-name: prod_name price: source: value-field field-name: prod_price
Configure topics
For each topic that you name in the topics
section, you can set properties in these sections:
Option | Required | Description |
---|---|---|
aerospike-operation | no | Specifies how to apply writes and deletes to your Aerospike database. See Configure Aerospike Operations for details. |
invalid-record | yes | The action to take when a Kafka record cannot be parsed or cannot be converted to a valid Aerospike record. Valid values are ignore , kill-task . |
mapping | in some cases | The mapping of the Kafka record to an Aerospike record. It is required if you do not implement message-transformer for the topic or implement a message-transformer with InboundMessage<K, M>. See Map Kafka messages to Aerospike records for details. |
message-transformer | no | The configuration parameters for a message transformer. See Aerospike Kafka Sink Message Transformer for details. |
Configure Aerospike operations
Use the aerospike-operation
option to specify how to apply writes and deletes to your Aerospike database.
Option | Required | Default | Description |
---|---|---|---|
type | no | write | Specifies the type of command to perform in Aerospike. Valid values are: write , delete . |
max-retries | no | 2 | Specifies the maximum number of times to retry a transaction. |
total-timeout | no | 0 | Specifies the number of milliseconds to wait before a transaction time. A value of 0 indicates that there is no time limit. |
record-exists-action | no | update | Specifies the action to take if a record already exists and type is set to write . Valid values are update , update-only , replace , replace-only , create-only . |
send-key | no | false | Specifies to send a user-defined key, in addition to a hash digest. Applies only when the value for the property type is write . |
durable-delete | no | false | Specifies to leave a tombstone for the record. Applies only when the value for the property type is delete . |
ignore-error-codes | no | empty set | Consider a record as successfully processed if it throws AerospikeException with the given ResultCode. |
Examples
# An aerospike write operation with all relevant options.aerospike-operation: type: write max-retries: 3 total-timeout: 0 record-exists-action: update-only send-key: true ignore-error-codes: - 22 - 13
# An aerospike delete operation with all relevant options.aerospike-operation: type: delete max-retries: 3 total-timeout: 0 durable-delete: true ignore-error-codes: - 22 - 13
Map Kafka messages to Aerospike records
Use the mapping
option to specify how to map Kafka messages in a topic to Aerospike records.
Option | Required | Description |
---|---|---|
namespace | yes | Specifies whether the name of the namespace is set in the configuration file or in messages. See Specify namespace , set , and ttl for details. |
set | no | Specifies whether the name of the set is set in the configuration file or in messages. See Specify namespace , set , and ttl for details. |
ttl | no | Specifies whether the time to live is set in the configuration file or in messages. See Specify namespace , set , and ttl for details. |
bins | yes | Mapping of incoming message to Aerospike bins. See Mapping fields in messages to bins in Aerospike records for details. |
key-field | no* | The Field selection configuration value for the Aerospike record key. See Key-field and Digest config for details. |
digest | no* | The Field selection configuration value for the Aerospike record digest. The field has to be a valid Aerospike RIPEMD-160 digest. The digest field should be plain bytes or a Base64 encoded string. See Key-field and Digest config for details. |
Key-field and Digest config
Use these two options to generate key-field
or digest
.
Option | Required | Description |
---|---|---|
source | yes | The source of the value. |
field-name | no | Name of the field to extract the value from. Only applies when source is value-field . |
Possible values for the source
option
Value | Description |
---|---|
key | Use the whole of the Pulsar record key as the value. |
value | Use the whole of the Pulsar record value as the value. |
value-field | Extract a value from the Pulsar record value. |
Specify namespace
, set
, and ttl
Configuration to obtain values for namespace
, set
, and ttl
requires definitions for the following properties:
Option | Required | Description |
---|---|---|
mode | yes | How the value of a given field should be obtained. The possible options are static and dynamic . |
Static: Specify the values in the configuration file
To set a static or unchanging name for the namespace, name for the set, or value for the time to live for an Aerospike record, you can use this option:
Option | Required | Description |
---|---|---|
value | yes | The static name or value. |
This example sets the name of a namespace to east
for all records that an inbound connector creates from the messages that it receives:
mapping: ... namespace: mode: static value: east ...
This example sets the time to live for all records that an inbound connector creates from the messages that it receives:
mapping: ... ttl: mode: static value: -1 ...
Choose from among the following values when setting the time to live for Aerospike records:
Value | Description |
---|---|
-2 | Specifies not to reset the time to live when a record is updated. |
-1 | Specifies that the time to live never expires. |
0 | Specifies to set the time to live to the default that is configured for the namespace in the Aerospike database. |
Integer greater than 0 | Specifies by default the time to live in seconds. However, you can use one of the following suffixes to specify a different unit of time: M for minutes, H for hours, D for days. |
Dynamic: Specify message fields from which to obtain values
Select a value dynamically from the Kafka record.
Option | Required | Description |
---|---|---|
source | yes | The source of the value. |
field-name | no | Name of the field to extract the value from. Only applies when source is value-field . |
Possible values for the source
option
Value | Description |
---|---|
key | Use the whole of the Pulsar record key as the value. |
value | Use the whole of the Pulsar record value as the value. |
value-field | Extract a value from the Pulsar record value. |
Example
mapping: ... namespace: mode: dynamic source: key-field field-name: category set: mode: dynamic source: value-field field-name: brand ttl: mode: dynamic source: value-field field-name: validForSec ...
Mapping fields in messages to bins in Aerospike records
For topics that you list in the topics
section, you must map fields from messages to bins in Aerospike records.
Option | Required | Description |
---|---|---|
type | yes | The number of bins that exist in Aerospike records that correspond to messages in the topic. Valid values are single-bin , multi-bins . |
Single bin
Use these two options to specify the source of the value to write to single-bin Aerospike records.
Option | Required | Description |
---|---|---|
source | yes | The source of the value. |
field-name | no | Name of the field to extract the value from. Only applies when source is value-field . |
Possible values for the source
option
Value | Description |
---|---|
key | Use the whole of the Pulsar record key as the value. |
value | Use the whole of the Pulsar record value as the value. |
value-field | Extract a value from the Pulsar record value. |
Examples
# A single bin Aerospike record with the bin value extracted from the Kafka# record field "name".bins: type: single-bin source: value-field field-name: name
# A single bin Aerospike record with the bin value equal to the whole of the# Kafka record value.bins: type: single-bin source: value
Multiple bins
Use these options to specify the source of the values to write to Aerospike records that are comprised of multiple bins.
Option | Required | Default | Description |
---|---|---|---|
all-value-fields | no | false | Indicates if all fields from the Kafka record value should be converted to Aerospike record bins. |
all-key-fields | no | false | Indicates if all fields from the Kafka record key should be converted to Aerospike record bins. |
map | no | Map of the Aerospike record bin name to the source of the bin value. Use this option only when both all-value-fields and all-key-fields are false. | |
source | yes | Source of the value. See the table below for the possible values. Used with the map option | |
field-name | no | Name of the field to extract the value from. Only applies when source is key-field or value-field . Used with the map option. |
Possible values for the source
option
Value | Description |
---|---|
key | Use the whole of the Pulsar record key as the value. |
value | Use the whole of the Pulsar record value as the value. |
value-field | Extract a value from the Pulsar record value. |
Examples
# Write all the Kafka record value fields as Aerospike record bins.bins: type: multi-bins all-value-fields: true
# Extract the values of Aerospike bins "name" and "city" from the Kafka fields `firstName` and `residence`.bins: type: multi-bins map: name: source: value-field field-name: firstName city: source: value-field field-name: residence
# Use the whole of the Kafka record value as the Aerospike bin "metadata".bins: type: multi-bins map: metadata: source: value