Tweetaspike: A Simple Application
For an interactive Jupyter notebook experience
Tweetaspike is a simple application that illustrates some key aspects of an Aerospike application design.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Use magics to load aerospike client from pom
%%loadFromPOM<dependencies> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-client</artifactId> <version>5.0.1</version> </dependency> <!-- Apache command line parser. --> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <!-- Log4j. --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- JSON simple --> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> </dependency></dependencies>
import java.io.BufferedReader;import java.io.Console;import java.io.IOException;import java.io.InputStreamReader;
public class EclipseConsole { Console systemConsole = System.console(); boolean useSystemConsole = false;
public EclipseConsole(){ this.useSystemConsole = (this.systemConsole != null); }
public void printf(String message){ if (useSystemConsole) systemConsole.printf(message); else { System.out.printf(message); } }
public String readLine(){ if (useSystemConsole) return systemConsole.readLine(); else { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String line = ""; try { line = bufferedReader.readLine(); } catch (IOException e) { e.printStackTrace(); } return line; } }
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/
import java.io.PrintWriter;import java.io.StringWriter;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.query.IndexType;import com.aerospike.client.task.IndexTask;
public class UtilityService { private AerospikeClient client; private EclipseConsole console = new EclipseConsole();
public UtilityService(AerospikeClient c) { this.client = c; }
public void createSecondaryIndexes() throws AerospikeException, InterruptedException {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. The recommended way of creating indexes in production env is via AQL
console.printf("\nCreating secondary index on: set=tweets, bin=username...\n"); IndexTask task1 = client.createIndex(null, "test", "tweets", "username_index", "username", IndexType.STRING); task1.waitTillComplete(100); console.printf("Done creating secondary index on: set=tweets, bin=username\n");
console.printf("\nCreating secondary index on: set=tweets, bin=ts...\n"); IndexTask task2 = client.createIndex(null, "test", "tweets", "ts_index", "ts", IndexType.NUMERIC); task2.waitTillComplete(100); console.printf("Done creating secondary index on: set=tweets, bin=ts\n");
console.printf("\nCreating secondary index on: set=users, bin=tweetcount...\n"); IndexTask task3 = client.createIndex(null, "test", "users", "tweetcount_index", "tweetcount", IndexType.NUMERIC); task3.waitTillComplete(100); console.printf("Done creating secondary index on: set=users, bin=tweetcount\n"); }
public static String printStackTrace(Exception ex) { StringWriter errors = new StringWriter(); ex.printStackTrace(new PrintWriter(errors)); return errors.toString(); }
/* * Example functions not in use */ @SuppressWarnings("unused") private void add() throws AerospikeException { // Java Add Key userKey = new Key("test", "users", "user1234"); Bin bin2 = new Bin("count", 3); client.add(null, userKey, bin2); }
@SuppressWarnings("unused") private void append() throws AerospikeException { // Java Append Key userKey = new Key("test", "users", "user1234"); Bin bin1 = new Bin("greet", "hello"); Bin bin2 = new Bin("greet", " world"); client.append(null, userKey, bin2); }
@SuppressWarnings("unused") private void exists() throws AerospikeException { // Java Exists Key userKey = new Key("test", "users", "user1234"); boolean recordKeyExists = client.exists(null, userKey); }
@SuppressWarnings("unused") private void touch() throws AerospikeException { // Java Touch Key userKey = new Key("test", "users", "user1234"); client.touch(null, userKey); }
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/
import java.io.File;import java.util.Random;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.Language;import com.aerospike.client.Operation;import com.aerospike.client.Record;import com.aerospike.client.ScanCallback;import com.aerospike.client.Value;import com.aerospike.client.lua.LuaConfig;import com.aerospike.client.policy.Priority;import com.aerospike.client.policy.RecordExistsAction;import com.aerospike.client.policy.ScanPolicy;import com.aerospike.client.policy.WritePolicy;import com.aerospike.client.query.Filter;import com.aerospike.client.query.RecordSet;import com.aerospike.client.query.Statement;import com.aerospike.client.task.RegisterTask;import com.aerospike.client.query.IndexType;import com.aerospike.client.task.IndexTask;
public class TweetService { private AerospikeClient client; private EclipseConsole console = new EclipseConsole();
public TweetService(AerospikeClient client) { this.client = client; }
public void createTweet() throws AerospikeException, InterruptedException {
console.printf("\n********** Create Tweet **********\n");
///*********************/// ///*****Data Model*****/// //Namespace: test //Set: tweets //Key: <username:<counter>> //Bins: //tweet - string //ts - int (Stores epoch timestamp of the tweet) //username - string
//Sample Key: dash:1 //Sample Record: //{ tweet: 'Put. A. Bird. On. It.', // ts: 1408574221, // username: 'dash' //} ///*********************///
Record userRecord = null; Key userKey = null; Key tweetKey = null;
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { // Check if username exists userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { int nextTweetCount = Integer.parseInt(userRecord.getValue( "tweetcount").toString()) + 1;
// Get tweet String tweet; console.printf("Enter tweet for " + username + ":"); tweet = console.readLine();
// Write record WritePolicy wPolicy = new WritePolicy(); wPolicy.sendKey = true; wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
// Create timestamp to store along with the tweet so we can // query, index and report on it long ts = getTimeStamp();
tweetKey = new Key("test", "tweets", username + ":" + nextTweetCount); Bin bin1 = new Bin("tweet", tweet); Bin bin2 = new Bin("ts", ts); Bin bin3 = new Bin("username", username);
client.put(wPolicy, tweetKey, bin1, bin2, bin3); console.printf("\nINFO: Tweet record created!\n");
// Update tweet count and last tweet'd timestamp in the user // record updateUser(client, userKey, wPolicy, ts, nextTweetCount); } else { console.printf("ERROR: User record not found!\n"); } } } //createTweet
public void queryTweets() throws AerospikeException { queryTweetsByUsername(); queryUsersByTweetCount(); } //queryTweets
public void queryTweetsByUsername() throws AerospikeException {
console.printf("\n********** Query Tweets By Username **********\n");
RecordSet rs = null; try {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. // NOTE: The recommended way of creating indexes in production env is via AQL. IndexTask task = client.createIndex(null, "test", "tweets", "username_index", "username", IndexType.STRING); task.waitTillComplete(100);
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { String[] bins = { "tweet" }; Statement stmt = new Statement(); stmt.setNamespace("test"); stmt.setSetName("tweets"); stmt.setIndexName("username_index"); stmt.setBinNames(bins); stmt.setFilter(Filter.equal("username", username));
console.printf("\nHere's " + username + "'s tweet(s):\n");
rs = client.query(null, stmt); while (rs.next()) { Record r = rs.getRecord(); console.printf(r.getValue("tweet").toString() + "\n"); } } else { console.printf("ERROR: User record not found!\n"); } } finally { if (rs != null) { // Close record set rs.close(); } } } //queryTweetsByUsername
public void queryUsersByTweetCount() throws AerospikeException {
console.printf("\n********** Query Users By Tweet Count Range **********\n");
RecordSet rs = null; try {
// NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. // NOTE: The recommended way of creating indexes in production env is via AQL. IndexTask task = client.createIndex(null, "test", "users", "tweetcount_index", "tweetcount", IndexType.NUMERIC); task.waitTillComplete(100);
// Get min and max tweet counts int min; int max; console.printf("\nEnter Min Tweet Count:"); min = Integer.parseInt(console.readLine()); console.printf("Enter Max Tweet Count:"); max = Integer.parseInt(console.readLine());
console.printf("\nList of users with " + min + "-" + max + " tweets:\n");
String[] bins = { "username", "tweetcount", "gender" }; Statement stmt = new Statement(); stmt.setNamespace("test"); stmt.setSetName("users"); stmt.setBinNames(bins); stmt.setFilter(Filter.range("tweetcount", min, max));
rs = client.query(null, stmt); while (rs.next()) { Record r = rs.getRecord(); console.printf(r.getValue("username") + " has " + r.getValue("tweetcount") + " tweets\n"); } } finally { if (rs != null) { // Close record set rs.close(); } } } //queryUsersByTweetCount
public void scanSomeTweetsForSomeUsers() { try { // Java Scan ScanPolicy policy = new ScanPolicy(); policy.concurrentNodes = true; policy.priority = Priority.LOW; policy.includeBinData = true; policy.maxRecords = 100; policy.sendKey = true; client.scanAll(policy, "test", "tweets", new ScanCallback() {
@Override public void scanCallback(Key key, Record record) throws AerospikeException { console.printf(key.toString() + " => ");
console.printf(record.getValue("username") + " "); console.printf(record.getValue("tweet") + "\n");
} }, "tweet"); } catch (AerospikeException e) { System.out.println("EXCEPTION - Message: " + e.getMessage()); System.out.println("EXCEPTION - StackTrace: " + UtilityService.printStackTrace(e)); } } //scanSomeTweetsForSomeUsers
private void updateUser(AerospikeClient client, Key userKey, WritePolicy policy, long ts, int tweetCount) throws AerospikeException, InterruptedException {
client.put(policy, userKey, new Bin("tweetcount", tweetCount), new Bin("lasttweeted", ts)); console.printf("\nINFO: The tweet count now is: " + tweetCount); } //updateUser
@SuppressWarnings("unused") private void updateUserUsingOperate(AerospikeClient client, Key userKey, WritePolicy policy, long ts) throws AerospikeException {
Record record = client.operate(policy, userKey, Operation.add(new Bin("tweetcount", 1)), Operation.put(new Bin("lasttweeted", ts)), Operation.get());
console.printf("\nINFO: The tweet count now is: " + record.getValue("tweetcount")); } //updateUserUsingOperate
public void createTweets() throws AerospikeException { String[] randomTweets = { "For just $1 you get a half price download of half of the song and listen to it just once.", "People tell me my body looks like a melted candle", "Come on movie! Make it start!", "Byaaaayy", "Please, please, win! Meow, meow, meow!", "Put. A. Bird. On. It.", "A weekend wasted is a weekend well spent", "Would you like to super spike your meal?", "We have a mean no-no-bring-bag up here on aisle two.", "SEEK: See, Every, EVERY, Kind... of spot", "We can order that for you. It will take a year to get there.", "If you are pregnant, have a soda.", "Hear that snap? Hear that clap?", "Follow me and I may follow you", "Which is the best cafe in Portland? Discuss...", "Portland Coffee is for closers!", "Lets get this party started!", "How about them portland blazers!", "You got school'd, yo", "I love animals", "I love my dog", "What's up Portland", "Which is the best cafe in Portland? Discuss...", "I dont always tweet, but when I do it is on Tweetaspike" }; Random rnd1 = new Random(); Random rnd2 = new Random(); Random rnd3 = new Random(); Key userKey; Record userRecord; int totalUsers = 10000; int maxTweets = 20; String username; long ts = 0;
WritePolicy wPolicy = new WritePolicy(); wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
console.printf("\nCreate up to " + maxTweets + " tweets each for " + totalUsers + " users.\n"); // console.readLine();
for (int j = 0; j < totalUsers; j++) { // Check if user record exists username = "user" + rnd3.nextInt(100000); userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { // create up to maxTweets random tweets for this user int totalTweets = rnd1.nextInt(maxTweets); for (int k = 1; k <= totalTweets; k++) { // Create timestamp to store along with the tweet so we can // query, index and report on it ts = getTimeStamp(); Key tweetKey = new Key("test", "tweets", username + ":" + k); Bin bin1 = new Bin("tweet", randomTweets[rnd2.nextInt(randomTweets.length)]); Bin bin2 = new Bin("ts", ts); Bin bin3 = new Bin("username", username);
client.put(wPolicy, tweetKey, bin1, bin2, bin3); } if (totalTweets > 0) { client.put(wPolicy, userKey, new Bin("tweetcount", totalTweets), new Bin("lasttweeted", ts)); } } } console.printf("\n\nDone creating up to " + maxTweets + " tweets each for " + totalUsers + " users!\n"); } //createTweets
private long getTimeStamp() { return System.currentTimeMillis(); } //getTimeStamp
}
/******************************************************************************* * Copyright 2012-2014 by Aerospike. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. ******************************************************************************/
import java.io.File;import java.util.ArrayList;import java.util.Arrays;import java.util.Map;import java.util.Random;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Key;import com.aerospike.client.Language;import com.aerospike.client.Record;import com.aerospike.client.Value;import com.aerospike.client.lua.LuaConfig;import com.aerospike.client.policy.GenerationPolicy;import com.aerospike.client.policy.BatchPolicy;import com.aerospike.client.policy.RecordExistsAction;import com.aerospike.client.policy.WritePolicy;import com.aerospike.client.query.Filter;import com.aerospike.client.query.ResultSet;import com.aerospike.client.query.Statement;import com.aerospike.client.task.RegisterTask;
public class UserService { private AerospikeClient client; private EclipseConsole console = new EclipseConsole();
public UserService(AerospikeClient client) { this.client = client; }
public void createUser() throws AerospikeException { console.printf("\n********** Create User **********\n");
///*********************/// ///*****Data Model*****/// //Namespace: test //Set: users //Key: <username> //Bins: //username - String //password - String (For simplicity password is stored in plain-text) //gender - String (Valid values are 'm' or 'f') //region - String (Valid values are: 'n' (North), 's' (South), 'e' (East), 'w' (West) -- to keep data entry to minimal we just store the first letter) //lasttweeted - int (Stores epoch timestamp of the last/most recent tweet) -- Default to 0 //tweetcount - int (Stores total number of tweets for the user) -- Default to 0 //interests - Array of interests
//Sample Key: dash //Sample Record: //{ username: 'dash', // password: 'dash', // gender: 'm', // region: 'w', // lasttweeted: 1408574221, // tweetcount: 20, // interests: ['photography', 'technology', 'dancing', 'house music] //} ///*********************///
String username; String password; String gender; String region; String interests;
// Get username console.printf("Enter username: "); username = console.readLine();
if (username != null && username.length() > 0) { // Get password console.printf("Enter password for " + username + ":"); password = console.readLine();
// Get gender console.printf("Select gender (f or m) for " + username + ":"); gender = console.readLine().substring(0, 1);
// Get region console.printf("Select region (north, south, east or west) for " + username + ":"); region = console.readLine().substring(0, 1);
// Get interests console.printf("Enter comma-separated interests for " + username + ":"); interests = console.readLine();
// Write record WritePolicy wPolicy = new WritePolicy(); wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
Key key = new Key("test", "users", username); Bin bin1 = new Bin("username", username); Bin bin2 = new Bin("password", password); Bin bin3 = new Bin("gender", gender); Bin bin4 = new Bin("region", region); Bin bin5 = new Bin("lasttweeted", 0); Bin bin6 = new Bin("tweetcount", 0); Bin bin7 = new Bin("interests", Arrays.asList(interests.split(",")));
client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);
console.printf("\nINFO: User record created!"); } } //createUser
public void getUser() throws AerospikeException { Record userRecord = null; Key userKey = null;
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { // Check if username exists userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { console.printf("\nINFO: User record read successfully! Here are the details:\n"); console.printf("username: " + userRecord.getValue("username") + "\n"); console.printf("password: " + userRecord.getValue("password") + "\n"); console.printf("gender: " + userRecord.getValue("gender") + "\n"); console.printf("region: " + userRecord.getValue("region") + "\n"); console.printf("tweetcount: " + userRecord.getValue("tweetcount") + "\n"); console.printf("interests: " + userRecord.getValue("interests") + "\n"); } else { console.printf("ERROR: User record not found!\n"); } } else { console.printf("ERROR: User record not found!\n"); } } //getUser
public void updatePasswordUsingUDF() throws AerospikeException { Record userRecord = null; Key userKey = null;
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { // Check if username exists userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { // Get new password String password; console.printf("Enter new password for " + username + ":"); password = console.readLine();
// NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL LuaConfig.SourceDirectory = "udf"; File udfFile = new File("udf/updateUserPwd.lua");
RegisterTask rt = client.register(null, udfFile.getPath(), udfFile.getName(), Language.LUA); rt.waitTillComplete(100);
String updatedPassword = client.execute(null, userKey, "updateUserPwd", "updatePassword", Value.get(password)).toString(); console.printf("\nINFO: The password has been set to: " + updatedPassword); } else { console.printf("ERROR: User record not found!"); } } else { console.printf("ERROR: User record not found!"); } } //updatePasswordUsingUDF
public void updatePasswordUsingCAS() throws AerospikeException { Record userRecord = null; Key userKey = null; Bin passwordBin = null;
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { // Check if username exists userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { // Get new password String password; console.printf("Enter new password for " + username + ":"); password = console.readLine();
WritePolicy writePolicy = new WritePolicy(); // record generation writePolicy.generation = userRecord.generation; writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; // password Bin passwordBin = new Bin("password", Value.get(password)); client.put(writePolicy, userKey, passwordBin);
console.printf("\nINFO: The password has been set to: " + password); } else { console.printf("ERROR: User record not found!"); } } else { console.printf("ERROR: User record not found!"); } } //updatePasswordUsingCAS
public void batchGetUserTweets() throws AerospikeException {
Record userRecord = null; Key userKey = null;
// Get username String username; console.printf("\nEnter username:"); username = console.readLine();
if (username != null && username.length() > 0) { // Check if username exists userKey = new Key("test", "users", username); userRecord = client.get(null, userKey); if (userRecord != null) { // Get how many tweets the user has int tweetCount = userRecord.getInt("tweetcount");
// Create an array of keys so we can initiate batch read // operation Key[] keys = new Key[tweetCount]; for (int i = 0; i < keys.length; i++) { keys[i] = new Key("test", "tweets", (username + ":" + (i + 1))); }
console.printf("\nHere's " + username + "'s tweet(s):\n");
// Initiate batch read operation if (keys.length > 0){ Record[] records = client.get(new BatchPolicy(), keys); for (int j = 0; j < records.length; j++) { console.printf(records[j].getValue("tweet").toString() + "\n"); } } } } else { console.printf("ERROR: User record not found!\n"); } } //batchGetUserTweets
@SuppressWarnings("unchecked") public void aggregateUsersByTweetCountByRegion() throws AerospikeException, InterruptedException { ResultSet rs = null; try { int min; int max; console.printf("\nEnter Min Tweet Count:"); min = Integer.parseInt(console.readLine()); console.printf("Enter Max Tweet Count:"); max = Integer.parseInt(console.readLine());
console.printf("\nAggregating users with " + min + "-" + max + " tweets by region. Hang on...\n");
// NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL LuaConfig.SourceDirectory = "udf"; File udfFile = new File("udf/aggregationByRegion.lua");
RegisterTask rt = client.register(null, udfFile.getPath(), udfFile.getName(), Language.LUA); rt.waitTillComplete(100);
String[] bins = { "tweetcount", "region" }; Statement stmt = new Statement(); stmt.setNamespace("test"); stmt.setSetName("users"); stmt.setIndexName("tweetcount_index"); stmt.setBinNames(bins); stmt.setFilter(Filter.range("tweetcount", min, max));
rs = client.queryAggregate(null, stmt, "aggregationByRegion", "sum");
if (rs.next()) { Map<Object, Object> result = (Map<Object, Object>) rs .getObject(); console.printf("\nTotal Users in North: " + result.get("n") + "\n"); console.printf("Total Users in South: " + result.get("s") + "\n"); console.printf("Total Users in East: " + result.get("e") + "\n"); console.printf("Total Users in West: " + result.get("w") + "\n"); } } finally { if (rs != null) { // Close result set rs.close(); } } } //aggregateUsersByTweetCountByRegion
public void createUsers() throws AerospikeException { String[] genders = { "m", "f" }; String[] regions = { "n", "s", "e", "w" }; String[] randomInterests = { "Music", "Football", "Soccer", "Baseball", "Basketball", "Hockey", "Weekend Warrior", "Hiking", "Camping", "Travel", "Photography"}; String username; ArrayList<Object> userInterests = null; int totalInterests = 0; int start = 1; int end = 100000; int totalUsers = end - start; Random rnd1 = new Random(); Random rnd2 = new Random(); Random rnd3 = new Random();
WritePolicy wPolicy = new WritePolicy(); wPolicy.recordExistsAction = RecordExistsAction.UPDATE;
console.printf("\nCreate " + totalUsers + " users.\n"); // console.readLine();
for (int j = start; j <= end; j++) { // Write user record username = "user" + j; Key key = new Key("test", "users", username); Bin bin1 = new Bin("username", "user" + j); Bin bin2 = new Bin("password", "pwd" + j); Bin bin3 = new Bin("gender", genders[rnd1.nextInt(2)]); Bin bin4 = new Bin("region", regions[rnd2.nextInt(4)]); Bin bin5 = new Bin("lasttweeted", 0); Bin bin6 = new Bin("tweetcount", 0);
totalInterests = rnd3.nextInt(7); userInterests = new ArrayList<Object>(); for(int i = 0; i < totalInterests; i++) { userInterests.add(randomInterests[rnd3.nextInt(randomInterests.length)]); } Bin bin7 = new Bin("interests", userInterests);
client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7); //console.printf("Wrote user record for " + username + "\n"); } console.printf("\nDone creating " + totalUsers + "!\n"); } // createUsers
}
import java.io.PrintWriter;import java.io.StringWriter;import java.util.List;
import org.apache.commons.cli.CommandLine;import org.apache.commons.cli.CommandLineParser;import org.apache.commons.cli.HelpFormatter;import org.apache.commons.cli.Options;import org.apache.commons.cli.PosixParser;import org.apache.log4j.Logger;
import com.aerospike.client.AerospikeClient;import com.aerospike.client.AerospikeException;import com.aerospike.client.Bin;import com.aerospike.client.Host;import com.aerospike.client.Key;import com.aerospike.client.Operation;import com.aerospike.client.Record;import com.aerospike.client.policy.ClientPolicy;import com.aerospike.client.policy.GenerationPolicy;import com.aerospike.client.policy.Policy;import com.aerospike.client.policy.WritePolicy;
/** * @author Dash Desai */public class Program { private AerospikeClient client; private String seedHost; private int port; private String namespace; private String set; private WritePolicy writePolicy; private Policy policy; private EclipseConsole console = new EclipseConsole();
private static Logger log = Logger.getLogger(Program.class);
public Program(String host, int port, String namespace, String set) throws AerospikeException { this.seedHost = host; this.port = port; this.namespace = namespace; this.set = set; this.writePolicy = new WritePolicy(); this.writePolicy.totalTimeout = 100; this.policy = new Policy(); this.policy.totalTimeout = 100; // Establish a connection to Aerospike cluster ClientPolicy cPolicy = new ClientPolicy(); cPolicy.timeout = 500; this.client = new AerospikeClient(cPolicy, this.seedHost, this.port); }
protected void finalize() throws Throwable { if (this.client != null){ this.client.close(); } }; public static void engage(String[] args) throws AerospikeException { try { Options options = new Options(); options.addOption("h", "host", true, "Server hostname (default: 12)"); options.addOption("p", "port", true, "Server port (default: 3000)"); options.addOption("n", "namespace", true, "Namespace (default: test)"); options.addOption("s", "set", true, "Set (default: demo)"); options.addOption("u", "usage", false, "Print usage.");
CommandLineParser parser = new PosixParser(); CommandLine cl = parser.parse(options, args, false);
String host = cl.getOptionValue("h", "127.0.0.1"); String portString = cl.getOptionValue("p", "3000"); int port = Integer.parseInt(portString); String namespace = cl.getOptionValue("n", "test"); String set = cl.getOptionValue("s", "demo"); log.debug("Host: " + host); log.debug("Port: " + port); log.debug("Namespace: " + namespace); log.debug("Set: " + set);
@SuppressWarnings("unchecked") List<String> cmds = cl.getArgList(); if (cmds.size() == 0 && cl.hasOption("u")) { logUsage(options); return; }
Program as = new Program(host, port, namespace, set);
as.work();
} catch (Exception e) { log.error("Critical error", e); } }
/** * Write usage to console. */ private static void logUsage(Options options) { HelpFormatter formatter = new HelpFormatter(); StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); String syntax = Program.class.getName() + " [<options>]"; formatter.printHelp(pw, 100, syntax, "options:", options, 0, 2, null); log.info(sw.toString()); }
public void work() throws Exception { try { console.printf("INFO: Connecting to Aerospike cluster...");
// Establish connection to Aerospike server
if (client == null || !client.isConnected()) { console.printf("\nERROR: Connection to Aerospike cluster failed! Please check the server settings and try again!"); console.readLine(); } else { console.printf("\nINFO: Connection to Aerospike cluster succeeded!\n"); while (true) {
// Create instance of UserService UserService us = new UserService(client); // Create instance of TweetService TweetService ts = new TweetService(client); // Create instance of UtilityService UtilityService util = new UtilityService(client);
// Present options console.printf("\nWhat would you like to do:\n"); console.printf("1> Create A User And A Tweet\n"); console.printf("2> Read A User Record\n"); console.printf("3> Batch Read Tweets For A User\n"); console.printf("4> Scan Sample of Tweets \n"); console.printf("5> Query Tweets By Username And Users By Tweet Count Range\n"); console.printf("6> Create canned users\n"); console.printf("7> Create canned tweets\n"); console.printf("0> Exit\n"); console.printf("\nSelect and hit enter:\n"); int feature = Integer.parseInt(console.readLine());
if (feature != 0) { switch (feature) { case 1: console.printf("\n********** Your Selection: Create User And A Tweet **********\n"); us.createUser(); ts.createTweet(); break; case 2: console.printf("\n********** Your Selection: Read A User Record **********\n"); us.getUser(); break; case 3: console.printf("\n********** Your Selection: Batch Read Tweets For A User **********\n"); us.batchGetUserTweets(); break; case 4: console.printf("\n********** Your Selection: Sample Tweets **********\n"); ts.scanSomeTweetsForSomeUsers(); break; case 5: console.printf("\n********** Your Selection: Query Tweets By Username And Users By Tweet Count Range **********\n"); ts.queryTweets(); break; case 6: console.printf("\n********** Create Users **********\n"); us.createUsers(); break; case 7: console.printf("\n********** Create Tweets **********\n"); ts.createTweets(); break; default: break; } } } } } catch (AerospikeException e) { console.printf("AerospikeException - Message: " + e.getMessage() + "\n"); console.printf("AerospikeException - StackTrace: " + UtilityService.printStackTrace(e) + "\n"); } catch (Exception e) { console.printf("Exception - Message: " + e.getMessage() + "\n"); console.printf("Exception - StackTrace: " + UtilityService.printStackTrace(e) + "\n"); } finally { if (client != null && client.isConnected()) { // Close Aerospike server connection client.close(); } console.printf("\n\nINFO: Press any key to exit...\n"); console.readLine(); }
}
/* * example method calls */ public Record readPartial(String userName) throws AerospikeException { // Java read specific bins Key key = new Key("test", "users", userName); Record record = this.client.get(null, key, "username", "password", "gender", "region"); return record; }
public Record readMeta(String userName) throws AerospikeException { // Java get meta data Key key = new Key("test", "users", userName); Record record = this.client.getHeader(null, key); return record; }
public void write(String username, String password) throws AerospikeException { // Java read-modify-write WritePolicy wPolicy = new WritePolicy(); wPolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
Key key = new Key("test", "users", username); Bin bin1 = new Bin("username", username); Bin bin2 = new Bin("password", password);
client.put(wPolicy, key, bin1, bin2); }
public void delete(String username) throws AerospikeException { // Java Delete record. WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); client.delete(wPolicy, key); }
public boolean exisis(String username) throws AerospikeException { // Java exists Key key = new Key("test", "users", username); boolean itsHere = client.exists(policy, key); return itsHere; }
public void add(String username) throws AerospikeException { // Java add WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); Bin counter = new Bin("tweetcount", 1); client.add(wPolicy, key, counter); }
public void touch(String username) throws AerospikeException { // Java touch WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); client.touch(wPolicy, key); }
public void append(String username) throws AerospikeException { // Java append WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); Bin bin2 = new Bin("interests", "cats"); client.append(wPolicy, key, bin2); }
public void operate(String username) throws AerospikeException { // Java operate WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); client.operate(wPolicy, key, Operation.put(new Bin("tweetcount", 153)), Operation.put(new Bin("lasttweeted", 1406755079L)));
}
@SuppressWarnings("unused") public void batch(String username) throws AerospikeException { // Java batch // Create an array of keys so we can initiate batch read operation Key[] keys = new Key[27]; for (int i = 0; i < keys.length; i++) { keys[i] = new Key("test", "tweets", (username + ":" + (i + 1))); }
// Initiate batch read operation Record[] records = client.get(null, keys);
}
@SuppressWarnings({ "unused", "resource" }) public void multipleSeedNodes() throws AerospikeException { Host[] hosts = new Host[] { new Host("a.host", 3000), new Host("another.host", 3000), new Host("and.another.host", 3000) }; AerospikeClient client = new AerospikeClient(new ClientPolicy(), hosts);
} @SuppressWarnings({ "unused", "resource" }) public void connectWithClientPolicy() throws AerospikeException { // Java connection with Client policy ClientPolicy clientPolicy = new ClientPolicy();// clientPolicy.maxThreads = 200; //200 threads clientPolicy.maxSocketIdle = 3; // 3 seconds AerospikeClient client = new AerospikeClient(clientPolicy, "a.host", 3000);
}
public void deleteBin(String username) throws AerospikeException{ // Java delete a bin WritePolicy wPolicy = new WritePolicy(); Key key = new Key("test", "users", username); Bin bin1 = Bin.asNull("shoe-size"); // Set bin value to null to drop bin. client.put(wPolicy, key, bin1); }
}
Program.engage(null);