Recommendation Engine by using Apache Mahout & Cassandra
As a part of R&D we are trying to build a recommendations for users by using Apache Mahout & Cassandra.There was a direct Cassandra DataModel
CassandraDataModel based on a Cassandra keyspace, but there was a lot of confusions which involves so many methods and functions,then we are tried to minimize it by using Cassandra Indexes and Mahout FileDataModel
Here we are having 1 millions of Records in Cassandra Database in a testkeyspace with 2 tables and their schema are
CREATE TABLE SampleUser (
user_id bigint,
ratings float,
item_id bigint,
PRIMARY KEY (user_id, ratings, item_id)
)
CREATE TABLE SampleItems (
item_id bigint,
user_id bigint,
ratings float,
PRIMARY KEY (item_id, user_id)
)
We want to provide User based Recommendations, Item based recommendation with help of similar users (For example, based on the rating giving by other users in a common locality or other similarity).
They are
1)Customers Who Bought This Item Also Bought 2)Customers Who Viewed This Item Also Viewed
The java code for Mahout Recommendations in Cassandra Database
[html]
public class RecommendationsFromCassandra {
private Cluster cluster;
private Session session;
/*
* Connection to Cassandra Database
*/
public void connect(String node) {
cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
session = cluster.connect();
}
/*
* Executing the query to get the user ids who are having particular item
*/
public void query1() throws IOException {
String userids = null;
ResultSet results = session
.execute("SELECT user_id FROM testkeyspace.sampleitems WHERE item_id = 914");
for (Row row : results)
{
if (userids == null)
userids = Long.toString(row.getLong("user_id"));
else
userids = userids + "," + row.getLong("user_id");
}
/*
* Transfer the all user ids to the 2nd query to get the all users and
* their item id’s , Ratings
*/
String stmt = "SELECT user_id,item_id,ratings FROM testkeyspace.sampleuser WHERE user_id in ("
+ userids + ") ;";
Statement s1 = new SimpleStatement(stmt);
s1.setFetchSize(Integer.MAX_VALUE);
ResultSet results1 = session.execute(s1);
File newTextFile = new File("${env:HOME}/thetextfile.txt");
FileWriter fw = new FileWriter(newTextFile);
for (Row row2 : results1) {
fw.write(row2.getLong("user_id") + "\t" + row2.getLong("item_id")
+ "\t" + row2.getFloat("ratings") + "\n");
}
fw.close();
/*
* Recommendation Part
*/
UserSimilarity similarity;
try {
DataModel datamodel = new FileDataModel(new File(
"${env:HOME}/thetextfile.txt"));
similarity = new PearsonCorrelationSimilarity(datamodel);
UserNeighborhood neighbourhood = new NearestNUserNeighborhood(100,
similarity, datamodel);
Recommender recommender = new GenericUserBasedRecommender(
datamodel, neighbourhood, similarity);
long start = System.currentTimeMillis();
List<RecommendedItem> recommendations = recommender.recommend(10,
10);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
}
long stop = System.currentTimeMillis();
System.out.println("Took: " + (stop – start) + " millis");
} catch (TasteException e) {
e.printStackTrace();
}
}
public void close() {
cluster.close();
}
public static void main(String[] args) throws IOException {
RecommendationsFromCassandra client = new RecommendationsFromCassandra();
client.connect("127.0.0.1");
client.query1();
// CassandraRecommender r = new CassandraRecommender();
client.close();
File file = new File("${env:HOME}/thetextfile.txt");
file.delete();
}
}
[/html]
The time consumming for Recommendations to particular user is 2.40 sec for 1m records