Tweetaspike: A Simple Application
For an interactive Jupyter notebook experience:
Tweetaspike is a simple application that illustrates some key aspects of an Aerospike application design.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Use magics to load aerospike client from pom
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.1</version>
</dependency>
<!-- Apache command line parser. -->
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<!-- Log4j. -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- JSON simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
</dependency>
</dependencies>
import java.io.BufferedReader;
import java.io.Console;
import java.io.IOException;
import java.io.InputStreamReader;
public class EclipseConsole {
Console systemConsole = System.console();
boolean useSystemConsole = false;
public EclipseConsole(){
this.useSystemConsole = (this.systemConsole != null);
}
public void printf(String message){
if (useSystemConsole)
systemConsole.printf(message);
else {
System.out.printf(message);
}
}
public String readLine(){
if (useSystemConsole)
return systemConsole.readLine();
else {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String line = "";
try {
line = bufferedReader.readLine();
} catch (IOException e) {
e.printStackTrace();
}
return line;
}
}
}
/*******************************************************************************
* Copyright 2012-2014 by Aerospike.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
******************************************************************************/
import java.io.PrintWriter;
import java.io.StringWriter;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
public class UtilityService {
private AerospikeClient client;
private EclipseConsole console = new EclipseConsole();
public UtilityService(AerospikeClient c) {
this.client = c;
}
public void createSecondaryIndexes() throws AerospikeException,
InterruptedException {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. The recommended way of creating indexes in production env is via AQL
console.printf("\nCreating secondary index on: set=tweets, bin=username...\n");
IndexTask task1 = client.createIndex(null, "test", "tweets",
"username_index", "username", IndexType.STRING);
task1.waitTillComplete(100);
console.printf("Done creating secondary index on: set=tweets, bin=username\n");
console.printf("\nCreating secondary index on: set=tweets, bin=ts...\n");
IndexTask task2 = client.createIndex(null, "test", "tweets", "ts_index",
"ts", IndexType.NUMERIC);
task2.waitTillComplete(100);
console.printf("Done creating secondary index on: set=tweets, bin=ts\n");
console.printf("\nCreating secondary index on: set=users, bin=tweetcount...\n");
IndexTask task3 = client.createIndex(null, "test", "users",
"tweetcount_index", "tweetcount", IndexType.NUMERIC);
task3.waitTillComplete(100);
console.printf("Done creating secondary index on: set=users, bin=tweetcount\n");
}
public static String printStackTrace(Exception ex) {
StringWriter errors = new StringWriter();
ex.printStackTrace(new PrintWriter(errors));
return errors.toString();
}
/*
* Example functions not in use
*/
@SuppressWarnings("unused")
private void add() throws AerospikeException {
// Java Add
Key userKey = new Key("test", "users", "user1234");
Bin bin2 = new Bin("count", 3);
client.add(null, userKey, bin2);
}
@SuppressWarnings("unused")
private void append() throws AerospikeException {
// Java Append
Key userKey = new Key("test", "users", "user1234");
Bin bin1 = new Bin("greet", "hello");
Bin bin2 = new Bin("greet", " world");
client.append(null, userKey, bin2);
}
@SuppressWarnings("unused")
private void exists() throws AerospikeException {
// Java Exists
Key userKey = new Key("test", "users", "user1234");
boolean recordKeyExists = client.exists(null, userKey);
}
@SuppressWarnings("unused")
private void touch() throws AerospikeException {
// Java Touch
Key userKey = new Key("test", "users", "user1234");
client.touch(null, userKey);
}
}
/*******************************************************************************
* Copyright 2012-2014 by Aerospike.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
******************************************************************************/
import java.io.File;
import java.util.Random;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Language;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.Value;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.policy.Priority;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
public class TweetService {
private AerospikeClient client;
private EclipseConsole console = new EclipseConsole();
public TweetService(AerospikeClient client) {
this.client = client;
}
public void createTweet() throws AerospikeException, InterruptedException {
console.printf("\n********** Create Tweet **********\n");
///*********************///
///*****Data Model*****///
//Namespace: test
//Set: tweets
//Key: <username:<counter>>
//Bins:
//tweet - string
//ts - int (Stores epoch timestamp of the tweet)
//username - string
//Sample Key: dash:1
//Sample Record:
//{ tweet: 'Put. A. Bird. On. It.',
// ts: 1408574221,
// username: 'dash'
//}
///*********************///
Record userRecord = null;
Key userKey = null;
Key tweetKey = null;
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0) {
// Check if username exists
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null) {
int nextTweetCount = Integer.parseInt(userRecord.getValue(
"tweetcount").toString()) + 1;
// Get tweet
String tweet;
console.printf("Enter tweet for " + username + ":");
tweet = console.readLine();
// Write record
WritePolicy wPolicy = new WritePolicy();
wPolicy.sendKey = true;
wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
// Create timestamp to store along with the tweet so we can
// query, index and report on it
long ts = getTimeStamp();
tweetKey = new Key("test", "tweets", username + ":"
+ nextTweetCount);
Bin bin1 = new Bin("tweet", tweet);
Bin bin2 = new Bin("ts", ts);
Bin bin3 = new Bin("username", username);
client.put(wPolicy, tweetKey, bin1, bin2, bin3);
console.printf("\nINFO: Tweet record created!\n");
// Update tweet count and last tweet'd timestamp in the user
// record
updateUser(client, userKey, wPolicy, ts, nextTweetCount);
} else {
console.printf("ERROR: User record not found!\n");
}
}
} //createTweet
public void queryTweets() throws AerospikeException {
queryTweetsByUsername();
queryUsersByTweetCount();
} //queryTweets
public void queryTweetsByUsername() throws AerospikeException {
console.printf("\n********** Query Tweets By Username **********\n");
RecordSet rs = null;
try {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
// NOTE: The recommended way of creating indexes in production env is via AQL.
IndexTask task = client.createIndex(null, "test", "tweets",
"username_index", "username", IndexType.STRING);
task.waitTillComplete(100);
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0) {
String[] bins = { "tweet" };
Statement stmt = new Statement();
stmt.setNamespace("test");
stmt.setSetName("tweets");
stmt.setIndexName("username_index");
stmt.setBinNames(bins);
stmt.setFilter(Filter.equal("username", username));
console.printf("\nHere's " + username + "'s tweet(s):\n");
rs = client.query(null, stmt);
while (rs.next()) {
Record r = rs.getRecord();
console.printf(r.getValue("tweet").toString() + "\n");
}
} else {
console.printf("ERROR: User record not found!\n");
}
} finally {
if (rs != null) {
// Close record set
rs.close();
}
}
} //queryTweetsByUsername
public void queryUsersByTweetCount() throws AerospikeException {
console.printf("\n********** Query Users By Tweet Count Range **********\n");
RecordSet rs = null;
try {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax.
// NOTE: The recommended way of creating indexes in production env is via AQL.
IndexTask task = client.createIndex(null, "test", "users",
"tweetcount_index", "tweetcount", IndexType.NUMERIC);
task.waitTillComplete(100);
// Get min and max tweet counts
int min;
int max;
console.printf("\nEnter Min Tweet Count:");
min = Integer.parseInt(console.readLine());
console.printf("Enter Max Tweet Count:");
max = Integer.parseInt(console.readLine());
console.printf("\nList of users with " + min + "-" + max
+ " tweets:\n");
String[] bins = { "username", "tweetcount", "gender" };
Statement stmt = new Statement();
stmt.setNamespace("test");
stmt.setSetName("users");
stmt.setBinNames(bins);
stmt.setFilter(Filter.range("tweetcount", min, max));
rs = client.query(null, stmt);
while (rs.next()) {
Record r = rs.getRecord();
console.printf(r.getValue("username") + " has "
+ r.getValue("tweetcount") + " tweets\n");
}
} finally {
if (rs != null) {
// Close record set
rs.close();
}
}
} //queryUsersByTweetCount
public void scanSomeTweetsForSomeUsers() {
try {
// Java Scan
ScanPolicy policy = new ScanPolicy();
policy.concurrentNodes = true;
policy.priority = Priority.LOW;
policy.includeBinData = true;
policy.maxRecords = 100;
policy.sendKey = true;
client.scanAll(policy, "test", "tweets", new ScanCallback() {
@Override
public void scanCallback(Key key, Record record)
throws AerospikeException {
console.printf(key.toString() + " => ");
console.printf(record.getValue("username") + " ");
console.printf(record.getValue("tweet") + "\n");
}
}, "tweet");
} catch (AerospikeException e) {
System.out.println("EXCEPTION - Message: " + e.getMessage());
System.out.println("EXCEPTION - StackTrace: "
+ UtilityService.printStackTrace(e));
}
} //scanSomeTweetsForSomeUsers
private void updateUser(AerospikeClient client, Key userKey,
WritePolicy policy, long ts, int tweetCount) throws AerospikeException, InterruptedException {
client.put(policy, userKey, new Bin("tweetcount", tweetCount), new Bin("lasttweeted", ts));
console.printf("\nINFO: The tweet count now is: " + tweetCount);
} //updateUser
@SuppressWarnings("unused")
private void updateUserUsingOperate(AerospikeClient client, Key userKey,
WritePolicy policy, long ts) throws AerospikeException {
Record record = client.operate(policy, userKey,
Operation.add(new Bin("tweetcount", 1)),
Operation.put(new Bin("lasttweeted", ts)),
Operation.get());
console.printf("\nINFO: The tweet count now is: " + record.getValue("tweetcount"));
} //updateUserUsingOperate
public void createTweets() throws AerospikeException {
String[] randomTweets = {
"For just $1 you get a half price download of half of the song and listen to it just once.",
"People tell me my body looks like a melted candle",
"Come on movie! Make it start!", "Byaaaayy",
"Please, please, win! Meow, meow, meow!",
"Put. A. Bird. On. It.",
"A weekend wasted is a weekend well spent",
"Would you like to super spike your meal?",
"We have a mean no-no-bring-bag up here on aisle two.",
"SEEK: See, Every, EVERY, Kind... of spot",
"We can order that for you. It will take a year to get there.",
"If you are pregnant, have a soda.",
"Hear that snap? Hear that clap?",
"Follow me and I may follow you",
"Which is the best cafe in Portland? Discuss...",
"Portland Coffee is for closers!",
"Lets get this party started!",
"How about them portland blazers!", "You got school'd, yo",
"I love animals", "I love my dog", "What's up Portland",
"Which is the best cafe in Portland? Discuss...",
"I dont always tweet, but when I do it is on Tweetaspike" };
Random rnd1 = new Random();
Random rnd2 = new Random();
Random rnd3 = new Random();
Key userKey;
Record userRecord;
int totalUsers = 10000;
int maxTweets = 20;
String username;
long ts = 0;
WritePolicy wPolicy = new WritePolicy();
wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
console.printf("\nCreate up to " + maxTweets + " tweets each for "
+ totalUsers + " users.\n");
// console.readLine();
for (int j = 0; j < totalUsers; j++) {
// Check if user record exists
username = "user" + rnd3.nextInt(100000);
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null) {
// create up to maxTweets random tweets for this user
int totalTweets = rnd1.nextInt(maxTweets);
for (int k = 1; k <= totalTweets; k++) {
// Create timestamp to store along with the tweet so we can
// query, index and report on it
ts = getTimeStamp();
Key tweetKey = new Key("test", "tweets", username + ":" + k);
Bin bin1 = new Bin("tweet",
randomTweets[rnd2.nextInt(randomTweets.length)]);
Bin bin2 = new Bin("ts", ts);
Bin bin3 = new Bin("username", username);
client.put(wPolicy, tweetKey, bin1, bin2, bin3);
}
if (totalTweets > 0) {
client.put(wPolicy, userKey, new Bin("tweetcount", totalTweets), new Bin("lasttweeted", ts));
}
}
}
console.printf("\n\nDone creating up to " + maxTweets
+ " tweets each for " + totalUsers + " users!\n");
} //createTweets
private long getTimeStamp() {
return System.currentTimeMillis();
} //getTimeStamp
}
/*******************************************************************************
* Copyright 2012-2014 by Aerospike.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
******************************************************************************/
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Language;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.task.RegisterTask;
public class UserService {
private AerospikeClient client;
private EclipseConsole console = new EclipseConsole();
public UserService(AerospikeClient client) {
this.client = client;
}
public void createUser() throws AerospikeException {
console.printf("\n********** Create User **********\n");
///*********************///
///*****Data Model*****///
//Namespace: test
//Set: users
//Key: <username>
//Bins:
//username - String
//password - String (For simplicity password is stored in plain-text)
//gender - String (Valid values are 'm' or 'f')
//region - String (Valid values are: 'n' (North), 's' (South), 'e' (East), 'w' (West) -- to keep data entry to minimal we just store the first letter)
//lasttweeted - int (Stores epoch timestamp of the last/most recent tweet) -- Default to 0
//tweetcount - int (Stores total number of tweets for the user) -- Default to 0
//interests - Array of interests
//Sample Key: dash
//Sample Record:
//{ username: 'dash',
// password: 'dash',
// gender: 'm',
// region: 'w',
// lasttweeted: 1408574221,
// tweetcount: 20,
// interests: ['photography', 'technology', 'dancing', 'house music]
//}
///*********************///
String username;
String password;
String gender;
String region;
String interests;
// Get username
console.printf("Enter username: ");
username = console.readLine();
if (username != null && username.length() > 0) {
// Get password
console.printf("Enter password for " + username + ":");
password = console.readLine();
// Get gender
console.printf("Select gender (f or m) for " + username + ":");
gender = console.readLine().substring(0, 1);
// Get region
console.printf("Select region (north, south, east or west) for "
+ username + ":");
region = console.readLine().substring(0, 1);
// Get interests
console.printf("Enter comma-separated interests for " + username + ":");
interests = console.readLine();
// Write record
WritePolicy wPolicy = new WritePolicy();
wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
Key key = new Key("test", "users", username);
Bin bin1 = new Bin("username", username);
Bin bin2 = new Bin("password", password);
Bin bin3 = new Bin("gender", gender);
Bin bin4 = new Bin("region", region);
Bin bin5 = new Bin("lasttweeted", 0);
Bin bin6 = new Bin("tweetcount", 0);
Bin bin7 = new Bin("interests", Arrays.asList(interests.split(",")));
client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);
console.printf("\nINFO: User record created!");
}
} //createUser
public void getUser() throws AerospikeException {
Record userRecord = null;
Key userKey = null;
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0) {
// Check if username exists
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null) {
console.printf("\nINFO: User record read successfully! Here are the details:\n");
console.printf("username: " + userRecord.getValue("username")
+ "\n");
console.printf("password: " + userRecord.getValue("password")
+ "\n");
console.printf("gender: " + userRecord.getValue("gender") + "\n");
console.printf("region: " + userRecord.getValue("region") + "\n");
console.printf("tweetcount: " + userRecord.getValue("tweetcount") + "\n");
console.printf("interests: " + userRecord.getValue("interests") + "\n");
} else {
console.printf("ERROR: User record not found!\n");
}
} else {
console.printf("ERROR: User record not found!\n");
}
} //getUser
public void updatePasswordUsingUDF() throws AerospikeException
{
Record userRecord = null;
Key userKey = null;
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0)
{
// Check if username exists
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null)
{
// Get new password
String password;
console.printf("Enter new password for " + username + ":");
password = console.readLine();
// NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL
LuaConfig.SourceDirectory = "udf";
File udfFile = new File("udf/updateUserPwd.lua");
RegisterTask rt = client.register(null, udfFile.getPath(),
udfFile.getName(), Language.LUA);
rt.waitTillComplete(100);
String updatedPassword = client.execute(null, userKey, "updateUserPwd", "updatePassword", Value.get(password)).toString();
console.printf("\nINFO: The password has been set to: " + updatedPassword);
}
else
{
console.printf("ERROR: User record not found!");
}
}
else
{
console.printf("ERROR: User record not found!");
}
} //updatePasswordUsingUDF
public void updatePasswordUsingCAS() throws AerospikeException
{
Record userRecord = null;
Key userKey = null;
Bin passwordBin = null;
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0)
{
// Check if username exists
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null)
{
// Get new password
String password;
console.printf("Enter new password for " + username + ":");
password = console.readLine();
WritePolicy writePolicy = new WritePolicy();
// record generation
writePolicy.generation = userRecord.generation;
writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
// password Bin
passwordBin = new Bin("password", Value.get(password));
client.put(writePolicy, userKey, passwordBin);
console.printf("\nINFO: The password has been set to: " + password);
}
else
{
console.printf("ERROR: User record not found!");
}
}
else
{
console.printf("ERROR: User record not found!");
}
} //updatePasswordUsingCAS
public void batchGetUserTweets() throws AerospikeException {
Record userRecord = null;
Key userKey = null;
// Get username
String username;
console.printf("\nEnter username:");
username = console.readLine();
if (username != null && username.length() > 0) {
// Check if username exists
userKey = new Key("test", "users", username);
userRecord = client.get(null, userKey);
if (userRecord != null) {
// Get how many tweets the user has
int tweetCount = userRecord.getInt("tweetcount");
// Create an array of keys so we can initiate batch read
// operation
Key[] keys = new Key[tweetCount];
for (int i = 0; i < keys.length; i++) {
keys[i] = new Key("test", "tweets",
(username + ":" + (i + 1)));
}
console.printf("\nHere's " + username + "'s tweet(s):\n");
// Initiate batch read operation
if (keys.length > 0){
Record[] records = client.get(new BatchPolicy(), keys);
for (int j = 0; j < records.length; j++) {
console.printf(records[j].getValue("tweet").toString() + "\n");
}
}
}
} else {
console.printf("ERROR: User record not found!\n");
}
} //batchGetUserTweets
@SuppressWarnings("unchecked")
public void aggregateUsersByTweetCountByRegion() throws AerospikeException,
InterruptedException {
ResultSet rs = null;
try {
int min;
int max;
console.printf("\nEnter Min Tweet Count:");
min = Integer.parseInt(console.readLine());
console.printf("Enter Max Tweet Count:");
max = Integer.parseInt(console.readLine());
console.printf("\nAggregating users with " + min + "-"
+ max + " tweets by region. Hang on...\n");
// NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL
LuaConfig.SourceDirectory = "udf";
File udfFile = new File("udf/aggregationByRegion.lua");
RegisterTask rt = client.register(null, udfFile.getPath(),
udfFile.getName(), Language.LUA);
rt.waitTillComplete(100);
String[] bins = { "tweetcount", "region" };
Statement stmt = new Statement();
stmt.setNamespace("test");
stmt.setSetName("users");
stmt.setIndexName("tweetcount_index");
stmt.setBinNames(bins);
stmt.setFilter(Filter.range("tweetcount", min, max));
rs = client.queryAggregate(null, stmt, "aggregationByRegion", "sum");
if (rs.next()) {
Map<Object, Object> result = (Map<Object, Object>) rs
.getObject();
console.printf("\nTotal Users in North: " + result.get("n") + "\n");
console.printf("Total Users in South: " + result.get("s") + "\n");
console.printf("Total Users in East: " + result.get("e") + "\n");
console.printf("Total Users in West: " + result.get("w") + "\n");
}
} finally {
if (rs != null) {
// Close result set
rs.close();
}
}
} //aggregateUsersByTweetCountByRegion
public void createUsers() throws AerospikeException {
String[] genders = { "m", "f" };
String[] regions = { "n", "s", "e", "w" };
String[] randomInterests = { "Music", "Football", "Soccer", "Baseball", "Basketball", "Hockey", "Weekend Warrior", "Hiking", "Camping", "Travel", "Photography"};
String username;
ArrayList<Object> userInterests = null;
int totalInterests = 0;
int start = 1;
int end = 100000;
int totalUsers = end - start;
Random rnd1 = new Random();
Random rnd2 = new Random();
Random rnd3 = new Random();
WritePolicy wPolicy = new WritePolicy();
wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
console.printf("\nCreate " + totalUsers
+ " users.\n");
// console.readLine();
for (int j = start; j <= end; j++) {
// Write user record
username = "user" + j;
Key key = new Key("test", "users", username);
Bin bin1 = new Bin("username", "user" + j);
Bin bin2 = new Bin("password", "pwd" + j);
Bin bin3 = new Bin("gender", genders[rnd1.nextInt(2)]);
Bin bin4 = new Bin("region", regions[rnd2.nextInt(4)]);
Bin bin5 = new Bin("lasttweeted", 0);
Bin bin6 = new Bin("tweetcount", 0);
totalInterests = rnd3.nextInt(7);
userInterests = new ArrayList<Object>();
for(int i = 0; i < totalInterests; i++) {
userInterests.add(randomInterests[rnd3.nextInt(randomInterests.length)]);
}
Bin bin7 = new Bin("interests", userInterests);
client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);
//console.printf("Wrote user record for " + username + "\n");
}
console.printf("\nDone creating " + totalUsers + "!\n");
} // createUsers
}
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Host;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
/**
* @author Dash Desai
*/
public class Program {
private AerospikeClient client;
private String seedHost;
private int port;
private String namespace;
private String set;
private WritePolicy writePolicy;
private Policy policy;
private EclipseConsole console = new EclipseConsole();
private static Logger log = Logger.getLogger(Program.class);
public Program(String host, int port, String namespace, String set)
throws AerospikeException {
this.seedHost = host;
this.port = port;
this.namespace = namespace;
this.set = set;
this.writePolicy = new WritePolicy();
this.writePolicy.totalTimeout = 100;
this.policy = new Policy();
this.policy.totalTimeout = 100;
// Establish a connection to Aerospike cluster
ClientPolicy cPolicy = new ClientPolicy();
cPolicy.timeout = 500;
this.client = new AerospikeClient(cPolicy, this.seedHost, this.port);
}
protected void finalize() throws Throwable {
if (this.client != null){
this.client.close();
}
};
public static void engage(String[] args) throws AerospikeException {
try {
Options options = new Options();
options.addOption("h", "host", true,
"Server hostname (default: 12)");
options.addOption("p", "port", true, "Server port (default: 3000)");
options.addOption("n", "namespace", true,
"Namespace (default: test)");
options.addOption("s", "set", true, "Set (default: demo)");
options.addOption("u", "usage", false, "Print usage.");
CommandLineParser parser = new PosixParser();
CommandLine cl = parser.parse(options, args, false);
String host = cl.getOptionValue("h", "127.0.0.1");
String portString = cl.getOptionValue("p", "3000");
int port = Integer.parseInt(portString);
String namespace = cl.getOptionValue("n", "test");
String set = cl.getOptionValue("s", "demo");
log.debug("Host: " + host);
log.debug("Port: " + port);
log.debug("Namespace: " + namespace);
log.debug("Set: " + set);
@SuppressWarnings("unchecked")
List<String> cmds = cl.getArgList();
if (cmds.size() == 0 && cl.hasOption("u")) {
logUsage(options);
return;
}
Program as = new Program(host, port, namespace, set);
as.work();
} catch (Exception e) {
log.error("Critical error", e);
}
}
/**
* Write usage to console.
*/
private static void logUsage(Options options) {
HelpFormatter formatter = new HelpFormatter();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
String syntax = Program.class.getName() + " [<options>]";
formatter.printHelp(pw, 100, syntax, "options:", options, 0, 2, null);
log.info(sw.toString());
}
public void work() throws Exception {
try {
console.printf("INFO: Connecting to Aerospike cluster...");
// Establish connection to Aerospike server
if (client == null || !client.isConnected()) {
console.printf("\nERROR: Connection to Aerospike cluster failed! Please check the server settings and try again!");
console.readLine();
} else {
console.printf("\nINFO: Connection to Aerospike cluster succeeded!\n");
while (true) {
// Create instance of UserService
UserService us = new UserService(client);
// Create instance of TweetService
TweetService ts = new TweetService(client);
// Create instance of UtilityService
UtilityService util = new UtilityService(client);
// Present options
console.printf("\nWhat would you like to do:\n");
console.printf("1> Create A User And A Tweet\n");
console.printf("2> Read A User Record\n");
console.printf("3> Batch Read Tweets For A User\n");
console.printf("4> Scan Sample of Tweets \n");
console.printf("5> Query Tweets By Username And Users By Tweet Count Range\n");
console.printf("6> Create canned users\n");
console.printf("7> Create canned tweets\n");
console.printf("0> Exit\n");
console.printf("\nSelect and hit enter:\n");
int feature = Integer.parseInt(console.readLine());
if (feature != 0) {
switch (feature) {
case 1:
console.printf("\n********** Your Selection: Create User And A Tweet **********\n");
us.createUser();
ts.createTweet();
break;
case 2:
console.printf("\n********** Your Selection: Read A User Record **********\n");
us.getUser();
break;
case 3:
console.printf("\n********** Your Selection: Batch Read Tweets For A User **********\n");
us.batchGetUserTweets();
break;
case 4:
console.printf("\n********** Your Selection: Sample Tweets **********\n");
ts.scanSomeTweetsForSomeUsers();
break;
case 5:
console.printf("\n********** Your Selection: Query Tweets By Username And Users By Tweet Count Range **********\n");
ts.queryTweets();
break;
case 6:
console.printf("\n********** Create Users **********\n");
us.createUsers();
break;
case 7:
console.printf("\n********** Create Tweets **********\n");
ts.createTweets();
break;
default:
break;
}
}
}
}
} catch (AerospikeException e) {
console.printf("AerospikeException - Message: " + e.getMessage()
+ "\n");
console.printf("AerospikeException - StackTrace: "
+ UtilityService.printStackTrace(e) + "\n");
} catch (Exception e) {
console.printf("Exception - Message: " + e.getMessage() + "\n");
console.printf("Exception - StackTrace: "
+ UtilityService.printStackTrace(e) + "\n");
} finally {
if (client != null && client.isConnected()) {
// Close Aerospike server connection
client.close();
}
console.printf("\n\nINFO: Press any key to exit...\n");
console.readLine();
}
}
/*
* example method calls
*/
public Record readPartial(String userName) throws AerospikeException {
// Java read specific bins
Key key = new Key("test", "users", userName);
Record record = this.client.get(null, key, "username", "password",
"gender", "region");
return record;
}
public Record readMeta(String userName) throws AerospikeException {
// Java get meta data
Key key = new Key("test", "users", userName);
Record record = this.client.getHeader(null, key);
return record;
}
public void write(String username, String password)
throws AerospikeException {
// Java read-modify-write
WritePolicy wPolicy = new WritePolicy();
wPolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
Key key = new Key("test", "users", username);
Bin bin1 = new Bin("username", username);
Bin bin2 = new Bin("password", password);
client.put(wPolicy, key, bin1, bin2);
}
public void delete(String username) throws AerospikeException {
// Java Delete record.
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
client.delete(wPolicy, key);
}
public boolean exisis(String username) throws AerospikeException {
// Java exists
Key key = new Key("test", "users", username);
boolean itsHere = client.exists(policy, key);
return itsHere;
}
public void add(String username) throws AerospikeException {
// Java add
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
Bin counter = new Bin("tweetcount", 1);
client.add(wPolicy, key, counter);
}
public void touch(String username) throws AerospikeException {
// Java touch
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
client.touch(wPolicy, key);
}
public void append(String username) throws AerospikeException {
// Java append
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
Bin bin2 = new Bin("interests", "cats");
client.append(wPolicy, key, bin2);
}
public void operate(String username) throws AerospikeException {
// Java operate
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
client.operate(wPolicy, key, Operation.put(new Bin("tweetcount", 153)),
Operation.put(new Bin("lasttweeted", 1406755079L)));
}
@SuppressWarnings("unused")
public void batch(String username) throws AerospikeException {
// Java batch
// Create an array of keys so we can initiate batch read operation
Key[] keys = new Key[27];
for (int i = 0; i < keys.length; i++) {
keys[i] = new Key("test", "tweets", (username + ":" + (i + 1)));
}
// Initiate batch read operation
Record[] records = client.get(null, keys);
}
@SuppressWarnings({ "unused", "resource" })
public void multipleSeedNodes() throws AerospikeException {
Host[] hosts = new Host[] { new Host("a.host", 3000),
new Host("another.host", 3000),
new Host("and.another.host", 3000) };
AerospikeClient client = new AerospikeClient(new ClientPolicy(), hosts);
}
@SuppressWarnings({ "unused", "resource" })
public void connectWithClientPolicy() throws AerospikeException {
// Java connection with Client policy
ClientPolicy clientPolicy = new ClientPolicy();
// clientPolicy.maxThreads = 200; //200 threads
clientPolicy.maxSocketIdle = 3; // 3 seconds
AerospikeClient client = new AerospikeClient(clientPolicy, "a.host", 3000);
}
public void deleteBin(String username) throws AerospikeException{
// Java delete a bin
WritePolicy wPolicy = new WritePolicy();
Key key = new Key("test", "users", username);
Bin bin1 = Bin.asNull("shoe-size"); // Set bin value to null to drop bin.
client.put(wPolicy, key, bin1);
}
}
Program.engage(null);