Skip to main content
Loading

Configuring Streaming from Aerospike to Pulsar

Configuring streaming from Aerospike to Pulsar involves setting up your Aerospike database to send change notifications and modifying the configuration file /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml.

Setting Up Aerospike Database

In your Aerospike database, you must configure Cross-Datacenter Replication (XDR) and enable change notification. You must also configure your Pulsar cluster to be a "connector" XDR datacenter and the namespace must point to this as xdr remote datacenter.

See the change notification configuration parameters and example config.

Modifying aerospike-pulsar-outbound.yml

You configure streaming from Aerospike to Pulsar by modifying the /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml YAML configuration file.

The configuration file has the following sections:

  • service Configures the connector's listening ports, TLS, and network interface.
  • client-props Configures connection properties for the target Pulsar message broker.
  • topic-wise-producer-props Configures producer properties for the specific topic. If not specified, the default configuration provided by the Pulsar Java Client is used.
  • bin-transforms Specifies the bin transformations to apply on the Aerospike record.
  • format Specifies the message format to use for the outbound messages sent to the message broker.
  • batching Specifies how to collect a group of Change Notification Records into a single batch of outbound messages sent to the message broker.
  • routing Configures how incoming record updates/deletes from Aerospike are routed to the message broker.
  • namespaces Configures bin transforms, format, routing at namespace and set level.
  • logging Configures the destination and level for the connector's logs.
caution

If the route or format is not specified for an Aerospike record, then the record is skipped and logged at the warn level.

Cascading Configuration Values

You can configure the bin-transforms, format, and routing sections to include default values, values for each particular namespace, and values for each particular set within a namespace. For each Aerospike record, the appropriate values are applied.

Here is an annotated example:

routing:
mode: static
destination: default <1>
‎‎
namespaces:
users:
routing:
mode: static
destination: users <2>
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium <3>
  1. Specifies that records in all namespaces are to be shipped to the default topic.
  2. Specifies that records in the users namespace are to be shipped to the users topic.
  3. Specifies that records in the premium set in the users namespace are to be shipped to the premium topic.

Notes

  1. Partitioned topic needs to be created using the Pulsar Admin API separately by the user as required by Pulsar
  2. You have to specify the persistence nature of the pulsar topic in configuration along with the topic name. For example, if you want to use a non-persistent topic my-topic, then you have to configure non-persistent://public/default/my-topic in the configuration. If you just specify my-topic, it will create/use persistent://public/default/my-topic. Namespace and tenant can also be specified in this topic URL

A sample configuration is shown below:

# port that the connector runs on
service:
port: 8080

# pulsar client configuration
client-configuration:
serviceUrl : pulsar://192.168.254.1:6650
authPluginClassName : null
authParams : null
operationTimeoutMs : 30000
statsIntervalSeconds : 60
numIoThreads : 1
numListenerThreads : 1
connectionsPerBroker : 1
useTcpNoDelay : true
useTls : false
tlsTrustCertsFilePath : null
tlsAllowInsecureConnection : false
tlsHostnameVerificationEnable : false
concurrentLookupRequest : 5000
maxLookupRequest : 50000
maxNumberOfRejectedRequestPerConnection : 50
keepAliveIntervalSeconds : 30
connectionTimeoutMs : 10000
requestTimeoutMs : 60000
initialBackoffIntervalNanos : 100000000
maxBackoffIntervalNanos : 60000000000

# pulsar topic wise producer configuration
topic-wise-producer-props:
'persistent://public/default/myTopic':
topicName: 'persistent://public/default/myTopic'
producerName: 'producer_persistent://public/default/myTopic'
sendTimeoutMs: 2000

# log location if not stdout
logging:
file: /var/log/aerospike-pulsar-outbound/aerospike-pulsar-outbound.log

# for the configurations below see the note above on specificity.

# one of json, flat-json, message-pack, or avro
format:
mode: json

# a list of transformations and mappings.
bin-transforms:
map:
yellow: red
transforms: //will be done in order
- uppercase

routing:
mode: static
destination: default

namespaces:
users:
routing:
mode: static
destination: users
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium
bin-transforms:
map:
gold: platinum