Skip to content
Visit booth 3171 at Google Cloud Next to see how to unlock real-time decisions at scaleMore info

Configure streaming from Aerospike to Pulsar

Configuring streaming from Aerospike to Pulsar involves setting up your Aerospike database to send change notifications and modifying the configuration file /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml.

Setting Up Aerospike Database

In your Aerospike database, you must configure Cross-Datacenter Replication (XDR) and enable change notification. You must also configure your Pulsar cluster to be a “connector” XDR datacenter and the namespace must point to this as xdr remote datacenter. [//]: # (Questions:) [//]: # (1. Namespace for what?) [//]: # (2. Is “xdr remote datacenter” the actual name?) [//]: # (3. How does a namespace “point” to anything?)

See the change notification configuration parameters and example config. [//]: # (12/09/2020: I asked a question in docs-internal about this section.)

Modifying aerospike-pulsar-outbound.yml

You configure streaming from Aerospike to Pulsar by modifying the /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml YAML configuration file. [//]: # (What filesystem is this file located in?)

The configuration file has the following sections:

  • service Configures the connector’s listening ports, TLS, and network interface.
  • client-props Configures connection properties for the target Pulsar message broker.
  • topic-wise-producer-props Configures producer properties for the specific topic. If not specified, it defaults to the Pulsar Java Client configuration.
  • bin-transforms Specifies the bin transformations that are applied on the Aerospike record.
  • format Specifies the message format to use for the outbound messages sent to the message broker.
  • batching Specifies how to collect a group of Change Notification Records into a single batch of outbound messages sent to the message broker.
  • routing Configures how incoming record updates/deletes from Aerospike are routed to the message broker.
  • namespaces Configures bin transforms, format, routing at namespace and set level.
  • logging Configures the destination and level for the connector’s logs.

Cascading Configuration Values

You can configure the bin-transforms, format, and routing sections to include default values, values for each particular namespace, and values for each particular set within a namespace. For each Aerospike record, the appropriate values are applied.

Here is an annotated example:

routing:
mode: static
destination: default <1>
‎‎
namespaces:
users:
routing:
mode: static
destination: users <2>
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium <3>
  1. Specifies that records in all namespaces are to be shipped to the default topic.
  2. Specifies that records in the users namespace are to be shipped to the users topic.
  3. Specifies that records in the premium set in the users namespace are to be shipped to the premium topic.

Notes

  1. Partitioned topic needs to be created using the Pulsar Admin API separately by the user as required by Pulsar
  2. You have to specify the persistence nature of the pulsar topic in configuration along with the topic name. For example, if you want to use a non-persistent topic my-topic, then you have to configure non-persistent://public/default/my-topic in the configuration. If you just specify my-topic, it will create/use persistent://public/default/my-topic. Namespace and tenant can also be specified in this topic URL

A sample configuration is shown below:

# port that the connector runs on
service:
port: 8080
# pulsar client configuration
client-configuration:
serviceUrl : pulsar://192.168.254.1:6650
authPluginClassName : null
authParams : null
operationTimeoutMs : 30000
statsIntervalSeconds : 60
numIoThreads : 1
numListenerThreads : 1
connectionsPerBroker : 1
useTcpNoDelay : true
useTls : false
tlsTrustCertsFilePath : null
tlsAllowInsecureConnection : false
tlsHostnameVerificationEnable : false
concurrentLookupRequest : 5000
maxLookupRequest : 50000
maxNumberOfRejectedRequestPerConnection : 50
keepAliveIntervalSeconds : 30
connectionTimeoutMs : 10000
requestTimeoutMs : 60000
initialBackoffIntervalNanos : 100000000
maxBackoffIntervalNanos : 60000000000
# pulsar topic wise producer configuration
topic-wise-producer-props:
'persistent://public/default/myTopic':
topicName: 'persistent://public/default/myTopic'
producerName: 'producer_persistent://public/default/myTopic'
sendTimeoutMs: 2000
# log location if not stdout
logging:
file: /var/log/aerospike-pulsar-outbound/aerospike-pulsar-outbound.log
# for the configurations below see the note above on specificity.
# one of json, flat-json, message-pack, or avro
format:
mode: json
# a list of transformations and mappings.
bin-transforms:
map:
yellow: red
transforms: //will be done in order
- uppercase
routing:
mode: static
destination: default
namespaces:
users:
routing:
mode: static
destination: users
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium
bin-transforms:
map:
gold: platinum
Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?