Configure streaming from Aerospike to Pulsar
Configuring streaming from Aerospike to Pulsar involves setting up your Aerospike database to send change notifications and modifying the configuration file /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml
.
Setting Up Aerospike Database
In your Aerospike database, you must configure Cross-Datacenter Replication (XDR) and enable change notification. You must also configure your Pulsar cluster to be a “connector” XDR datacenter and the namespace must point to this as xdr remote datacenter. [//]: # (Questions:) [//]: # (1. Namespace for what?) [//]: # (2. Is “xdr remote datacenter” the actual name?) [//]: # (3. How does a namespace “point” to anything?)
See the change notification configuration parameters and example config. [//]: # (12/09/2020: I asked a question in docs-internal about this section.)
Modifying aerospike-pulsar-outbound.yml
You configure streaming from Aerospike to Pulsar by modifying the /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml
YAML configuration file.
[//]: # (What filesystem is this file located in?)
The configuration file has the following sections:
service
Configures the connector’s listening ports, TLS, and network interface.client-props
Configures connection properties for the target Pulsar message broker.topic-wise-producer-props
Configures producer properties for the specific topic. If not specified, it defaults to the Pulsar Java Client configuration.bin-transforms
Specifies the bin transformations that are applied on the Aerospike record.format
Specifies the message format to use for the outbound messages sent to the message broker.batching
Specifies how to collect a group of Change Notification Records into a single batch of outbound messages sent to the message broker.routing
Configures how incoming record updates/deletes from Aerospike are routed to the message broker.namespaces
Configures bin transforms, format, routing at namespace and set level.logging
Configures the destination and level for the connector’s logs.
Cascading Configuration Values
You can configure the bin-transforms
, format
, and routing
sections to include default
values, values for each particular namespace, and values for each particular set within a namespace. For each Aerospike record, the appropriate values are applied.
Here is an annotated example:
routing: mode: static destination: default <1>namespaces: users: routing: mode: static destination: users <2> format: mode: flat-json metadata-key: metadata sets: premium: routing: mode: static destination: premium <3>
- Specifies that records in all namespaces are to be shipped to the
default
topic. - Specifies that records in the
users
namespace are to be shipped to theusers
topic. - Specifies that records in the
premium
set in theusers
namespace are to be shipped to thepremium
topic.
Notes
- Partitioned topic needs to be created using the Pulsar Admin API separately by the user as required by Pulsar
- You have to specify the persistence nature of the pulsar topic in configuration along with the topic name.
For example, if you want to use a non-persistent topic
my-topic
, then you have to configurenon-persistent://public/default/my-topic
in the configuration. If you just specifymy-topic
, it will create/usepersistent://public/default/my-topic
. Namespace and tenant can also be specified in this topic URL
A sample configuration is shown below:
# port that the connector runs onservice: port: 8080
# pulsar client configurationclient-configuration: serviceUrl : pulsar://192.168.254.1:6650 authPluginClassName : null authParams : null operationTimeoutMs : 30000 statsIntervalSeconds : 60 numIoThreads : 1 numListenerThreads : 1 connectionsPerBroker : 1 useTcpNoDelay : true useTls : false tlsTrustCertsFilePath : null tlsAllowInsecureConnection : false tlsHostnameVerificationEnable : false concurrentLookupRequest : 5000 maxLookupRequest : 50000 maxNumberOfRejectedRequestPerConnection : 50 keepAliveIntervalSeconds : 30 connectionTimeoutMs : 10000 requestTimeoutMs : 60000 initialBackoffIntervalNanos : 100000000 maxBackoffIntervalNanos : 60000000000
# pulsar topic wise producer configurationtopic-wise-producer-props: 'persistent://public/default/myTopic': topicName: 'persistent://public/default/myTopic' producerName: 'producer_persistent://public/default/myTopic' sendTimeoutMs: 2000
# log location if not stdoutlogging: file: /var/log/aerospike-pulsar-outbound/aerospike-pulsar-outbound.log
# for the configurations below see the note above on specificity.
# one of json, flat-json, message-pack, or avroformat: mode: json
# a list of transformations and mappings.bin-transforms: map: yellow: red transforms: //will be done in order - uppercase
routing: mode: static destination: default
namespaces: users: routing: mode: static destination: users format: mode: flat-json metadata-key: metadata sets: premium: routing: mode: static destination: premium bin-transforms: map: gold: platinum