Configuring Aerospike Connect for Spark
You can use Spark's option()
function to set the configuration parameters for reads and writes. For example, if you are using the Spark connector version 2.7.0 or later, here’s how you specify the Aerospike set that you want to read from:
as_data=spark \
.read \
.format("aerospike") \
.option("aerospike.set", "natality").load()
If you are using the Spark connector version 2.6.0 or earlier, set the format to com.aerospike.spark.sql
, like this:
as_data=spark \
.read \
.format("com.aerospike.spark.sql") \
.option("aerospike.set", "natality").load()
You can use spark.conf()
to set other properties, as in these examples:
spark.conf.set(“aerospike.namespace”,AS_NAMESPACE)
spark.conf.set(“aerospike.seedhost”,AS_CONNECTION_STRING)
spark.conf.set(“aerospike.keyPath”,AS_FEATURE_KEY_PATH )
Feature-key​
A feature-key file with the feature key spark connector
or the feature key raf-realtime-analysis-framework
turned on is required for using the Spark connector. If you try to use the Spark connector without a feature key, the connector logs an error message in Spark's log file and does not run.
- For Spark connector versions 2.8 (for use with Spark 2.4.x) and 3.1 (for use with Spark 3.x), you can include the feature key in the feature-key file for your Aerospike cluster. If that is not feasible, then use either the
aerospike.keyPath
oraerospike.featureKey
properties in your Spark applications. - For earlier versions of the Spark connector, you must use either the
aerospike.keyPath
oraerospike.featureKey
properties in your Spark applications.
Spark Connector Version | How to give the Spark connector access to a feature key? |
---|---|
3.1 and later | Include the feature-key file features.conf in the Aerospike cluster. |
2.8 | Include the feature-key file features.conf in the Aerospike cluster. |
Earlier than 3.1 and 2.8 | Specify the feature key in the Spark application using aerospike.keyPath or aerospike.featureKey property. |
aerospike.featureKey
Description: Allows you to set the content of a feature-key file as a string in a Spark application.
- This flag was discontinued in version 4.0.0.
- Use this configuration property for testing only.
- This property will be deprecated in a future release.
Here is an example of this configuration property being used:
val df=spark
.read
.format("aerospike").
.option(“aerospike.seedhost”, “cluster1:3000”)
.option("aerospike.set", "cluster1")
.option("aerospike.featureKey", "my-feature-file-content")
.load()
Your code can read the value in many ways, such as from an environment variable.
Possible values: N/A
Default value: N/A
Required?: Yes, if you are not using aerospike.keyPath
and the feature-key file is not located in the path that is the default value for aerospike.keyPath
. The connector must have access to either the feature key spark connector
or the feature key raf-realtime-analysis-framework
.
aerospike.keyPath
Description: Path to an Aerospike feature-key file. A copy of this file must be present on all nodes of your Spark cluster, including the master node, at this location.
- This flag was discontinued in version 4.0.0.
- Use this property for production systems.
- This property will be deprecated in a future release.
- If you plan to store the feature-key file in HDFS, set the property
aerospike.hdfsCoreSiteXmlPath
to the path of thecore-site.xml
file.
Possible values: N/A
Default value: /etc/aerospike/features.conf
The connector uses this default value when neither aerospike.keyPath
nor aerospike.featureKey
is specified.
Required?: Yes, if you are not using aerospike.featureKey
and the feature-key file is not at the default path.
aerospike.hdfsCoreSiteXmlPath
*This flag was discontinued in version 4.0.0.
Description: Absolute path to an HDFS core-site.xml
file.
Possible values: N/A
Default value: N/A
Required?: Yes, if you are using aerospike.keyPath
and are storing the feature-key file in HDFS.
Initial configuration and tuning​
aerospike.digestType
Description: The data type of digests. You use this property together with the aerospike.schema.scan
property.
Possible values: string
, binary
Default value: binary
Required?: No
aerospike.schema.scan
Description: The number of records for the connector to scan to infer the schema of the data in a DataFrame. The connector infers the schema for every query. Aerospike recommends using schema inference when you are using complex types, such as maps and lists.
- If the records in your Aerospike set use the same schema, you can set a lower value.
- If more than one schema is used by the records, you can set a higher value.
The more variety there is in the schemas, the higher you should set the value. If the value is too low in this case, the number of records that the connector scans cannot adequately account for variations in the schema. If the value is too high, the master node of the Spark cluster can run out of memory because schema inference uses resources on that node.
Possible values: N/A
Default value: 100
Required?: Yes, if you do not provide the schema to use.
aerospike.seedhost
Description: The nodes from which the client application should obtain a list of the locations and ports of all of the nodes in the Aerospike cluster.
Possible values: Comma-separated host:port pairs, like these: host1:port1,host2:port2
You can also operate between multiple clusters by loading from one aerospike cluster and writing the dataframe into another. See an example here.
Default value: localhost:3000
Required?: Yes, if the Aerospike cluster is not running locally.
aerospike.set
Description: The Aerospike set from which records are to be read into a Spark DataFrame.
Possible values: N/A
Default value: N/A
Required?: Yes, for read operations
aerospike.useServicesAlternate
Description: This option specifies how the info request is made during cluster tending. See the Java client documentation for a detailed description. This flag is available in Spark connector 2.9.0 in the v2.x series and in Spark connector >= 3.5.0 in v3.x series.
Possible values: true
, false
Default value: false
Required?: No
aerospike.updateByDigest
Description: This option specifies that updates are done by digest with the value in the column specified in option("aerospike.updateByDigest", "Digest")
Possible values: N/A
Default value: N/A
Required?: Yes, if no value is given for aerospike.updateByKey
.
aerospike.updateByKey
Description: This option specifies that updates are done by primary key with the value in the column specified in option("aerospike. updateByKey","[column-name]")
This configuration parameter supports nested keys. Such key types must be accepted by a client application. Consider this simple Structfield: {A:{B:1}}
. A client app can use option("aerospike.updateByKey", "A.B")
to update records with the value 1
. Use .
to separate paths in a structure. Ensure that names in a Structfield do not include periods.
Possible values: N/A
Default value: N/A
Required?: Yes, if no value is given for aerospike.updateByDigest
aerospike.write.batchsize
Description: Connector supports both batch and non-batched records writes. Batch writes requires Database 6.0 and later releases.
When the internal write buffer reaches the specified value, the connector blocks subsequent requests until all previously submitted write requests are resolved.
aerospike.write.batchsize
affects the write throughput (writes per second) and the latency of the write workload. It applies to each Spark partition individually.
aerospike.write.batchsize
begins operating when one of the following conditions are met:
aerospike.write.buffered
is set tofalse
- Aerospike servers are older than 6.0
If you set the value to 20,000 records, the connector checks the status of each write operation after writing each batch of 20,000 records to your Aerospike cluster. A large value ensures that write throughput (writes per second)) is high. However, the larger the value, the longer the delay before the connector checks for and handles errors. If the value is set to 1, then error checking and handling occurs much more frequently, and therefore the latency of the write workload is much higher.
Possible values: N/A
Default value: 10000
Required?: No
aerospike.booleanbin
Description: Specifies whether the records that the connector reads from your Aerospike cluster use the Boolean data type for values in Boolean bins, rather than using integer values of 0 and 1. Ensure that your Aerospike cluster is Aerospike Database Enterprise Edition version 5.6.0.3 or later before you set the value of this configuration parameter to true
.
Setting this value to true can affect client applications that query the data after it is loaded into Spark. Those applications might expect integer values from a Boolean column, rather than Boolean values. Also, if the application that uses the connector to load data into Spark currently filters on a Boolean column and the filter is defined based on the integer values 0 and 1, and the data in the column is a mix of integer values and Boolean values, the filter needs to be revised. If filtering takes place on the Aerospike cluster, the application should follow this logic:
- Query the Aerospike cluster by filtering on integer values in the Boolean column.
- Query the Aerospike cluster by filtering on Boolean values in the Boolean column.
- Create a union of the result sets in the dataframe in Spark.
Possible values: true
, false
Default value: false
Required?: No
aerospike.commitLevel
Description: The consistency level for Aerospike to apply when committing transactions from a client application.
Possible values: CommitLevel.COMMIT_ALL
The database waits to commit a transaction until after committing on a master mode and all of its replicas. CommitLevel.COMMIT_MASTER
The database waits to commit a transaction until after committing on a master node only, and doesn't wait until after committing on the node's replicas, too.
Default value: CommitLevel.COMMIT_ALL
Required?: No
aerospike.digestColumn
Description: Name of the column in the DataFrame that contains digests for corresponding Aerospike records.
Possible values: N/A
Default value: __digest
Required?: No
aerospike.expiryColumn
Description: Name of the column in the DataFrame that contains expiration dates (in seconds since Citrusleaf epoch (00:00:00 UTC on 1 Jan 2010), and calculated as the sum of the time that the record was written and the TTL value) for corresponding Aerospike records.
Possible values: N/A
Default value: __expiry
Required?: No
aerospike.generationColumn
Description: Name of the column in the DataFrame for the record-generation counter in corresponding Aerospike records.
Possible values: N/A
Default value: __generation
Required?: No
aerospike.generationPolicy
Description: How to handle record writes based on the record-generation counter. Generation is the number of times an Aerospike record has been modified. When a record is created, its counter is set to 1. For more information, see the "Write Generation Policy" section in "Policies".
Possible values: EXPECT_GEN_EQUAL
Update/delete record if expected generation is equal to server generation. EXPECT_GEN_GT
Update/delete record if expected generation greater than the server generation. NONE
Do not use record generation to restrict writes.
Default value: NONE
Required?: No
aerospike.keyColumn
Description: Name of the column in the DataFrame that contains keys for corresponding Aerospike records.
Possible values: N/A
Default value: __key
Required?: No
aerospike.keyType
Description: Data type of the primary key in a DataFrame. You can use this parameter if the connector is inferring the schema of the data in the DataFrame.
Possible values: int
, short
, long
, double
, float
, date
, timestamp
, string
, binary
Default value: string
Required?: No
aerospike.log.level
Description: Aerospike connector logging level.
Possible values: info
, debug
, error
, all
, warn
, trace
, off
, fatal
Default value: warn
Required?: No
aerospike.namespace
Description: Name of the Aerospike namespace to read from and write to.
Possible values: N/A
Default value: test
Required?: No
aerospike.schema.flexible
Description: Spark SQL assumes that the records in a Spark table (that maps to an Aerospike set) use a single schema. However, as a NoSQL database, Aerospike does not require records in a set to use a single schema. An Aerospike bin (which maps to a column in a Spark DataFrame) within an Aerospike set (which maps to a Spark table) could contain values that are of multiple data types. Schemas in Spark can be inferred or user-provided. The configuration parameter aerospike.schema.flexible
, when set to true
, reconciles this incompatibility. For more information, see "Flexible schemas" below.
Possible values: true
, false
Default value: true
Required?: No
aerospike.sendKey
Description: Specifies whether the connector should include the primary key when writing a record to an Aerospike database. If this value is true
, the value of the primary key is stored in the bin named '__key' in the database.
Possible values: N/A
Default value: false
Required?: No
aerospike.sockettimeout
Description: Server-side socket timeout (in milliseconds) for query/scan operations. (0 = no timeout)
Possible values: N/A
Default value: 86400000
Required?: No
aerospike.timeout
Description: Timeout (in milliseconds) for database operations invoked by the connector.
Possible values: N/A
Default value: 86400000
Required?: No
aerospike.ttlColumn
Description: Name of the column in the DataFrame for the TTL value in corresponding Aerospike records.
Possible values: N/A
Default value: __ttl
Required?: No
aerospike.update.partial
Description: Specifies whether to allow partial updates to existing Aerospike records. A partial update affects only a subset of the bins in a record, with the rest of the bins in the update set to null. A null value indicates that the bin is not being updated and that the current value in the record should remain as it is. By design, however, if a record update includes null bins, Aerospike deletes the corresponding bins from the record. The aerospike.update.partial
configuration parameter, when set to true
, prevents the corresponding bins from being deleted. For example, suppose a record contains the bins prod_ID
, which contains the primary key, prod_name
, and prod_price
. A partial update for a particular prod_ID
sets prod_name
to null
and updates the value of prod_price
. If aerospike.update.partial
is set to the default, false
, Aerospike deletes the bin prod_name
from the record and updates the value in the bin prod_price
. If aerospike.update.partial
is set to true
, then Aerospike leaves the bin prod_name
as it is and updates the value in the bin prod_price
. You can set the value to true
only when the value of aerospike.write.mode
is set to UPDATE
or UPDATE_ONLY
. This feature allows for the deletion of records in the Aerospike database only if all bins are null and when using the UPDATE
or UPDATE_ONLY
policy.
Possible values: true
, false
Default value: false
aerospike.write.mode
Description: The write policy to use when saving records in an Aerospike set. This flag was introduced in Aerospike Connect for Spark 3.0.0 (for Apache Spark 3.x) and 2.8.0 (for Apache Spark 2.4.x).
This flag overrides the specified Spark's SaveMode setting. For instance, if a user specifies Spark write mode SaveMode.Overwrite
and sets aerospike.write.mode
to CREATE_ONLY
, then records are written using Java client policy RecordExistsAction.CREATE_ONLY
.
Possible values:
UPDATE
Create or update record. Merge write command bins with existing bins.UPDATE_ONLY
Update record only. Fail if record does not exist. Merge write command bins with existing bins.REPLACE
Create or replace record. Delete existing bins not referenced by write command bins.REPLACE_ONLY
Replace record only. Fail if record does not exist. Delete existing bins not referenced by write command bins.CREATE_ONLY
Create only. Fail if record exists.
Default value: UPDATE
Required?: No
aerospike.writeset
Description: The Aerospike set into which records from a Spark Dataframe are to be written.
Possible values: N/A
Default value: The set specified by aerospike.set
Required?: No
Example of setting aerospike.seedhost
for loading from one Aerospike cluster and writing the dataframe into another​
dfCluster1=spark.read.format("aerospike").
.option("aerospike.seedhost", "cluster1:3000").option("aerospike.set", "cluster1").load()
dfCluster1.write.format("aerospike").
.option("aerospike.seedhost", "cluster2:3000").option("aerospike.set", "cluster2").save()
Example of setting aerospike.pushdown.expressions
​
How to retrieve records of a set table
with bin name col
where content of the bin col
is divisible by 5.
import com.aerospike.spark.utility.AerospikePushdownExpressions
// col % 5 == 0
//Equivalent Exp Exp.eq(Exp.mod(Exp.intBin("col"), Exp.`val`(5)), Exp.`val`(0))
val expIntBin=AerospikePushdownExpressions.intBin("col") // id is the name of column
val expMODIntBinEqualToZero=AerospikePushdownExpressions.eq(
AerospikePushdownExpressions.mod(expIntBin,
AerospikePushdownExpressions.`val`(5)),
AerospikePushdownExpressions.`val`(0))
val expMODIntBinToBase64= AerospikePushdownExpressions.build(expMODIntBinEqualToZero).getBase64
val pushDownDF =spark.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "table")
.option("aerospike.pushdown.expressions", expMODIntBinToBase64)
.load()
Flexible schemas​
The configuration parameter aerospike.schema.flexible
determines the response of the connector when the data type of a bin in an Aerospike record differs from the data type of the corresponding column in the schema. The schema can be inferred or provided. Spark SQL assumes that the records in a data store (an Aerospike set, in this case) use a single schema. However, as a NoSQL database, Aerospike does not require records in a set to use a single schema. An Aerospike bin (which maps to a column in a Spark DataFrame) within an Aerospike set (which maps to a Spark DataFrame) could contain values that are of multiple data types. The configuration parameter aerospike.schema.flexible
, when set to true
, reconciles this incompatibility. For more information, see the section "Flexible schema inference" in this notebook.
Possible values​
The configuration parameter aerospike.schema.flexible
accepts the values true
and false
.
true
If this is the value, and one or more of the data types for columns in a schema do not match the types of the corresponding bins in an Aerospike record, the connector returns NULL values for those columns. Note: NULL values due to mismatches are indistinguishable from missing values. Therefore, a client application must treat all NULLs as missing values. To filter out NULLs, use Spark's filter()
function, as in this example: df.filter("gender == NULL").show(false)
false
If this is the value, and one or more of the data types for columns in a schema do not match the types of the corresponding bins in an Aerospike record, queries fail. You can use this value if you have modeled your data in Aerospike to adhere to a strict schema (i.e., each record within the set has the same schema).
Performance​
The Spark connector provides the following configuration parameters to help you tune it for optimal performance. These configuration parameters apply to the connector only, not to Aerospike Database or Spark.
aerospike.batchMax
Description: The aerolookup
function uses Aerospike batch-read requests to read data from an Aerospike database.
This property sets the maximum batch size to improve throughput. You must set the value of aerospike.batchMax
to be less than or equal to the value of batch-max-requests
. For more information about the aerolookup
function, see this tutorial.
Possible values: N/A
Default value: 5000
Required?: No
aerospike.compression
Description: Enables the compression of data sent between the connector and an Aerospike database.
Possible values: true
, false
Default value: false
Required?: No
aerospike.partition.factor
Description: Specifies the number of Spark partitions to map to the 4096 partitions of the selected Aerospike namespace. (A Spark partition is the fundamental unit of parallelism in Spark. Aerospike evenly distributes a namespace's data across 4096 Aerospike partitions.)
Examples:
- Setting the value to 8 maps 4096 Aerospike partitions to 2^8 or 256 Spark partitions.
- Setting the value to 15 maps 4096 Aerospike partitions to 2^15 or 32K Spark partitions.
- Setting the value to 0 maps 4096 Aerospike partitions to 2^0 or 1 Spark partition. In this case, the job is not parallelized.
You should be able to achieve massive parallelization by sizing your Spark cluster to support reads from a large number of partitions. In versions of Aerospike Connect for Spark before 2.2.0, the number of Spark partitions is computed by 4096 >>VALUE
.
Possible values: N/A
Default value: 8 In versions of Aerospike Connect for Spark before 2.2.0, the default value is 0.
Required?: Yes, if you are also using aerospike.sample.size
.
aerospike.recordspersecond
Description: The maximum number of records per second that one compute unit (Spark partition) can request. For example, if a Spark batch read job uses 8 compute units (supposing that the value of aerospike.partition.factor
is 3) and the value of aerospike.recordspersecond
is 100, then the job can request no more than 800 records per second.
aerospike.recordspersecond
throttles the throughput for scans. The related property aerospike.transaction.rate
throttles the throughput for batch reads. We recommend you configure both properties.
Possible values: N/A
Default value: 0
Required?: No
aerospike.sample.size
Description: Specifies the number of records to sample from your Aerospike database and write into a Spark DataFrame. This property allows you to avoid loading vast amounts of data from Aerospike into a Spark DataFrame before you can perform an analysis of only a sample of that data. For example, you could sample a small percentage of the data across globally distributed Aerospike clusters and then aggregate the results. You must use this configuration property together with the configuration property aerospike.partition.factor
, which determines the parallelism in the connector and impacts query latency. Tune it carefully based on available resources, otherwise you may notice performance degradation. See the connector configuration page for more information. For example, if you have 12 vCPUs in the system, a partition factor of 10 or 11 may give the best performance (query latency) for your queries. For higher values, performance may degrade. Based on our testing, for the values of aerospike.partition.factor
higher than 10, the records read from an Aerospike database are greater than the sample size that you set with aerospike.sample.size
. Consider also using either Spark's sampling()
function or its limit()
function to ensure that the number of records to your Spark DataFrame equal your desired sample size.
Sampling is not random. Each query invocation returns the same set of records. However, the order of records in the result set is randomized. Therefore, if you need random sampling, sample more data that you need and perhaps use the Spark sampling()
function to randomize the sample.
Here is an example snippet of code that shows how to use aerospike.sample.size
:
val df3=spark.read.format("aerospike")
.option("aerospike.partition.factor","2")
.option("aerospike.set",setname)
.option("aerospike.sample.size","101")
.load()
Refer to the notebook tutorials for further details about setting the values of configuration parameters.
Possible values: N/A
Default value: 0
Required?: No
aerospike.transaction.rate
Description: This configuration property is available in Aerospike Connect for Apache Spark 3.2.0 and later.
It throttles the transactions (reads and/or writes) per seconds between each Spark partition and the Aerospike clusters.
It is not mandatory to use this property in conjunction with the Aerospike Rate Quotas, which ensures that you will not exceed specified read and write rates.
When writing to the Aerospike database, the value of this property is applied to each Spark partition that writes data to Aerospike. For example, if you set the value of aerospike.transaction.rate
to 4000 and your Spark application uses 10 partitions for writing, the maximum possible wps (writes per second) that is achievable is 4000 * 10 i.e. 40,000, with a maximum of 4000 per partition.
This property is applicable to writes and batch-reads which is used in aerolookup
function and the Spark queries that use batch-read requests.
If the write/batch-read rate exceeds the calculated limit, then the connector blocks until the requested transaction meets or satisfies the set limit.
aerospike.transaction.rate
throttles the throughput for batch reads. The related property aerospike.recordspersecond
throttles the throughput for scans. We recommend you configure both properties.
Possible values: positive float values
Default value: No rate limits on the transactions.
Required?: No
aerospike.write.buffered
Description: If set to false, non-batched writes are used. If set to true, batch (requires Database 6.0 and later releases) is used for record insertion.
This flag applies to both Apache Spark batch and stream writes. This means you can mix and match buffered writes in streaming and batch applications. This feature has been available since Spark connector 3.5.0.
Possible values: true
, false
Default value: true
Required?: No
aerospike.pushdown.expressions
Description: Spark filter class allows limited operators in a predicate to be pushed down to the database, hence the Spark connector is limited in the number of Aerospike expressions that it can generate and pushdown to the database. This property allows you to pushdown Base64 encoded Aerospike Expressions directly to the database, thereby significantly reducing the data movement between Aerospike and the Spark clusters and consequently improving the performance significantly. For the list of supported Exp object and how to compose them, see Expression generator in the API documentation for Aerospike's Java client library. Here are a few key points to be aware of when using this property:
All expressions-related functionality is defined in
com.aerospike.spark.utility.AerospikePushdownExpressions
package.Do NOT use this property in conjunction with a Spark predicate, which is typically located in the WHERE clause or a filter().
You can use this property in conjunction with
aerolookup
API. To do so, you must specify it in theSparkConf
object as a key value pair.See the Jupyter Notebook for more information on how to push Aerospike expressions down to the database.
Possible values: N/A
Default value: ""
Required?: No
aerospike.client.maxconnspernode
Description: This option was introduced in the connector 4.2.0 release. It sets the underlying Java client maxConnsPerNode. The Java client uses this flag in sync read/write operations. The Spark connector uses the following synchronous Java client APIs:
get
in theaerolookup
API (in all connector versions) and read queries involving lookup from the Aerospike database.put
while writing data into Aerospike database versions prior to 6.0.0. Ifaerospike.write.buffered
is set to false with database 6.0+, then the connector uses the sync API.
Possible values: N/A
Default value: 500
Required?: No
aerospike.client.minconnspernode
Description: This option was introduced in the connector 4.2.0 release. It sets the underlying Java client minConnsPerNode. The Java client uses this flag in sync read/write operations when the connector uses the Java client synchronous API (see aerospike.client.maxconnspernode
section).
Possible values: N/A
Default value: 0
Required?: No
aerospike.client.asyncmaxconnspernode
Description: This option was introduced in the connector 4.2.0 release. It sets the underlying Java client asyncMaxConnsPerNode. The Java client uses this flag in asynchronous read/write operations when the connector uses the Java client asynchronous APIs. The Spark connector uses the following asynchronous Java client APIs:
scanPartitions
andqueryPartitions
in scan read queries.operate
to write data into Aerospike database versions 6.0+ (introduced in the connector 3.5.0 release). However, ifaerospike.write.buffered
is set to false with database 6.0+, then the connector will use synchronous API.
Possible values: N/A
Default value: 500
Required?: No
aerospike.client.asyncminconnspernode
Description: This option was introduced in the connector 4.2.0 release. It sets the underlying Java client asyncMinConnsPerNode. The Java client uses this flag in asynchronous read/write operations when the connector uses the Java client asynchronous APIs (see aerospike.client.maxconnspernode
section).
Possible values: N/A
Default value: 0
Required?: No
aerospike.client.durabledelete
Description: This option was introduced in the connector 4.3.0 release. It sets the underlying Java client BatchWritePolicy durableDelete or Java Client WritePolicy durableDelete.
BatchWritePolicy sets durableDelete
when aerospike.write.buffered
is enabled in Aerospike Database 6.0 and later.
Possible values: true, false
Default value: false
Required?: No
Authentication​
aerospike.password
Description: Password for authentication. Leave null for clusters running without restricted access.
Default value: null
Required?: No
aerospike.tls.allowedpeerNames
Description: The valid peer names that are allowed in the TLS client certificate, if mutual authentication is required.
Default value: N/A
Required?: No
aerospike.tls.authmode
Description: Supported authentication modes.
Possible values:
Value | Description |
---|---|
0 | Use internal authentication only. The hashed password is stored on the server. Do not send clear password. This is the default. |
1 | Use external authentication (such as LDAP). Specific external authentication is configured on the server. If TLS defined, send clear password on node login using TLS. Throw exception if TLS is not defined. |
2 | Use external authentication (such as LDAP). Specific external authentication is configured on the server. Send clear password on node login whether TLS is defined. Use only for testing because it does not provide secure authentication. |
3 | Authentication and authorization based on a certificate (PKI). No user name or password needs to be configured. Requires TLS and a client certificate. Requires Database 5.7.0 or later. Supported since the release of connector 4.1.0 |
Default value: 0
Required?: No
aerospike.tls.ciphers
Description: Comma-separated values of TLS ciphers. Available cipher names. A value of null allows the default ciphers that are defined by the JVM.
Default value: null
Required?: No
aerospike.tls.enabletls
Description: Boolean flag for whether to use TLS client-server communication. If the value is true
, TLS is enabled for communication between Aerospike and the connector.
Default value: false
Required?: No
aerospike.tls.forloginonly
Description: A value of true
specifies to use TLS connections only for login authentication; all other communication with the server use non-TLS connections. A value of false
means that TLS connections are used for all communications with the server.
Default value: false
Required?: No
aerospike.tls.keystore-store-file
Description: Path of the keystore store file
Default value: N/A
Required?: Yes, if you are using a keystore.
aerospike.tls.keystore-key-password-file
Description: Path of the keystore key password file
Default value: N/A
Required?: Yes, if you are using a keystore.
aerospike.tls.keystore-store-password-file
Description: Path of the keystore store password file
Default value: N/A
Required?: Yes, if you are using a keystore.
aerospike.tls.keystore-store-type
Description: The connector supports both the proprietary Java Keystore format ("JKS") as well as the "PKCS12" format, based on the RSA PKCS12 Personal Information Exchange Syntax Standard. The default keystore format is JKS. One of the differences between the two formats is that JKS protects each private key with its individual password, while also protecting the integrity of the entire keystore with a (possibly different) password. A PKCS12 keystore, on the other hand, only uses a single password for the entire keystore. For more information, see "Setting Up TLS Keystores for Aerospike Connect".
Default value: JKS
Required?: Yes, if you are using a keystore.
aerospike.tls.protocols
Description: Comma-separated list of TLS protocols. All possible values are TLSv1
, TLSv1.1
, TLSv1.2
Default value: TLSv1.2
Required?: Yes, if you are using TLS.
aerospike.tls.revokecertificates
Description: Comma-separated list of revoked certificates in long (arabic numeral) format.
Default value: N/A
Required?: No
aerospike.tls.tlsname
Description: The tls-name
is used by the connector to authenticate each TLS socket connection against a server node, based on the certificate presented by the Aerospike Database node during the initial connection handshake. tls-name
for a node is typically the node's hostname.
Default value: N/A
Required?: No
aerospike.tls.truststore-store-file
Description: Path of the truststore store file
Default value: N/A
Required?: Yes, if you are using a truststore.
aerospike.tls.truststore-key-password-file
Description: Path of the truststore key password file
Default value: N/A
Required?: Yes, if you are using a truststore.
aerospike.tls.truststore-store-password-file
Description: Path of the truststore store password file
Default value: N/A
Required?: Yes, if you are using a truststore.
aerospike.tls.truststore-store-type
Description: The truststore store type.
Default value: JKS
Required?: Yes, if you are using a truststore.
aerospike.user
Description: User for authentication. Leave null for clusters running without restricted access.
Default value: null
Required?: No
Secondary index​
aerospike.sindex.enable
Description: Boolean/String flag to enable secondary index query.
Possible values: true
, false
Default value: true
Required?: No
aerospike.sindex
Description: secondary index to be used for query evaluation.
If not specified, the connector will select an appropriate index to match the order of Spark columns in the where
clause, provided aerospike.sindex.enable
is set.
User may invoke sindexList(namepsace)
API to list all the secondary indices defined in a namespace.
This API assumes that SparkSession
has the parameters required to create a connection to the database set, including aerospike.seedhost
, aerospike.namespace
and other configurations.
See the notebooks for a complete example.
//Scala
import com.aerospike.spark._
PythonUtil.sindexList("test") //namespace = "test"
#python
scala_py_util = sc._jvm.com.aerospike.spark.PythonUtil #sc is sparkContext
print(scala_py_util.sindexList("test")) #namespace = "test"
Possible values: Any secondary index
Default value: ""
Required?: No
aerospike.sindex.filter
Description: JSON representation of the filter. If set, the constructed filter will be used to query the secondary index.
aerospike.sindex
must be set with aerospike.sindex.filter
flag. See the Secondary Index Support for information on setting different types of filters.
Possible values: contains
, range
and equal
filter in JSON format
Default value: ""
Required?: No
Exponential backoff retry​
Connector 3.3.0 and later
When certain error conditions occur as the connector interacts with your Aerospike cluster, the connector can "back off" from the server. "Backing off" means not only to retry the actions that led to the error, but to retry them at exponentially-increasing intervals. The duration of the interval before the first attempt is specified by the configuration property aerospike.retry.initialmillis
. If the database cannot service the request because it is busy, the connector attempts the same action after exponentially longer intervals. To compute the length of each successive interval, the connector multiplies the duration of the current interval by the value of the configuration parameter aerospike.retry.multiplier
.
For example, if the initial wait time is 1s (1000 milliseconds) and the multiplier is 2, the retries are attempted after 1s, 2s, 4s, 8s, 16s, 32s, and so on. The connector retries the action until the database can service the request, or until the connector reaches the maximum number of retries allowed, which you can specify with aerospike.retry.maxattempts
.
Violation of a rate quota is the most important error conditions that prompts the connector to back off from the server. The other error conditions are internal error conditions.
You can specify retry specific error codes using aerospike.retry.errorcodes
.
aerospike.retry.initialmillis
Description: The time to wait (in milliseconds) before retrying for the first time the action that led to the error condition. If the error condition persists after the initial retry, subsequent retries are attempted after intervals that become exponentially longer.
Default value: 1000
Required?: No
aerospike.retry.maxattempts
Description: The maximum number of times to retry an action that led to an error condition. A value of 0 prevents the connector from retrying.
Default value: 0
Required?: No
aerospike.retry.multiplier
Description: The integer by which to multiply the duration of the current wait interval to determine the duration of the next wait interval.
Default value: 2
Required?: No
aerospike.retry.errorcodes
Description: String comma-separated error codes to trigger exponential backoff retry.
Default value: 83,75,-7,-9,-12,-8
Required?: No