Configuring Streaming from Aerospike to Pulsar
Configuring streaming from Aerospike to Pulsar involves setting up your Aerospike database to send change notifications and modifying the configuration file /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml
.
Setting Up Aerospike Database
In your Aerospike database, you must configure Cross-Datacenter Replication (XDR) and enable change notification. You must also configure your Pulsar cluster to be a "connector" XDR datacenter and the namespace must point to this as xdr remote datacenter.
See the change notification configuration parameters and example config.
Modifying aerospike-pulsar-outbound.yml
You configure streaming from Aerospike to Pulsar by modifying the /etc/aerospike-pulsar-outbound/aerospike-pulsar-outbound.yml
YAML configuration file.
The configuration file has the following sections:
service
Configures the connector's listening ports, TLS, and network interface.client-props
Configures connection properties for the target Pulsar message broker.topic-wise-producer-props
Configures producer properties for the specific topic. If not specified, the default configuration provided by the Pulsar Java Client is used.bin-transforms
Specifies the bin transformations to apply on the Aerospike record.format
Specifies the message format to use for the outbound messages sent to the message broker.batching
Specifies how to collect a group of Change Notification Records into a single batch of outbound messages sent to the message broker.routing
Configures how incoming record updates/deletes from Aerospike are routed to the message broker.namespaces
Configures bin transforms, format, routing at namespace and set level.logging
Configures the destination and level for the connector's logs.
If the route or format is not specified for an Aerospike record, then the record is skipped and logged at the warn level.
Cascading Configuration Values
You can configure the bin-transforms
, format
, and routing
sections to include default
values, values for each particular namespace, and values for each particular set within a namespace. For each Aerospike record, the appropriate values are applied.
Here is an annotated example:
routing:
mode: static
destination: default <1>
namespaces:
users:
routing:
mode: static
destination: users <2>
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium <3>
- Specifies that records in all namespaces are to be shipped to the
default
topic. - Specifies that records in the
users
namespace are to be shipped to theusers
topic. - Specifies that records in the
premium
set in theusers
namespace are to be shipped to thepremium
topic.
Notes
- Partitioned topic needs to be created using the Pulsar Admin API separately by the user as required by Pulsar
- You have to specify the persistence nature of the pulsar topic in configuration along with the topic name.
For example, if you want to use a non-persistent topic
my-topic
, then you have to configurenon-persistent://public/default/my-topic
in the configuration. If you just specifymy-topic
, it will create/usepersistent://public/default/my-topic
. Namespace and tenant can also be specified in this topic URL
A sample configuration is shown below:
# port that the connector runs on
service:
port: 8080
# pulsar client configuration
client-configuration:
serviceUrl : pulsar://192.168.254.1:6650
authPluginClassName : null
authParams : null
operationTimeoutMs : 30000
statsIntervalSeconds : 60
numIoThreads : 1
numListenerThreads : 1
connectionsPerBroker : 1
useTcpNoDelay : true
useTls : false
tlsTrustCertsFilePath : null
tlsAllowInsecureConnection : false
tlsHostnameVerificationEnable : false
concurrentLookupRequest : 5000
maxLookupRequest : 50000
maxNumberOfRejectedRequestPerConnection : 50
keepAliveIntervalSeconds : 30
connectionTimeoutMs : 10000
requestTimeoutMs : 60000
initialBackoffIntervalNanos : 100000000
maxBackoffIntervalNanos : 60000000000
# pulsar topic wise producer configuration
topic-wise-producer-props:
'persistent://public/default/myTopic':
topicName: 'persistent://public/default/myTopic'
producerName: 'producer_persistent://public/default/myTopic'
sendTimeoutMs: 2000
# log location if not stdout
logging:
file: /var/log/aerospike-pulsar-outbound/aerospike-pulsar-outbound.log
# for the configurations below see the note above on specificity.
# one of json, flat-json, message-pack, or avro
format:
mode: json
# a list of transformations and mappings.
bin-transforms:
map:
yellow: red
transforms: //will be done in order
- uppercase
routing:
mode: static
destination: default
namespaces:
users:
routing:
mode: static
destination: users
format:
mode: flat-json
metadata-key: metadata
sets:
premium:
routing:
mode: static
destination: premium
bin-transforms:
map:
gold: platinum