Tuesday, November 18, 2014

Building a food recommendation engine with Spark / MLlib and Play | Chimpler


Building a food recommendation engine with Spark / MLlib and Play | Chimpler
In this post, we're going to implement a recommender for food using Apache Spark and MLlib . So for instance, if one is interested by some coffee products then we might recommend her some other coffee brands, coffee filters or some related products that some other users like too. To make it more interactive,

The parsing of the CSV is pretty straightforward:
1
2
3
4
5
6
7
8
9
10
11
12
val rawTrainingRatings = sc.textFile(ratingFile).map {
  line =>
    val Array(userId, productId, scoreStr) = line.split(",")
    AmazonRating(userId, productId, scoreStr.toDouble)
}
 
// only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products
val trainingRatings = rawTrainingRatings.groupBy(_.userId)
                                        .filter(r => MinRecommendationsPerUser <= r._2.size  && r._2.size < MaxRecommendationsPerUser)
                                        .flatMap(_._2)
                                        .repartition(NumPartitions)
                                        .cache()

The ALS recommender accepts as input an RDD of Ratings(user: Int, product: Int, rating: Double). Since in the CSV, all the IDs are String identifier, we create a simple dictionary Dictionary to map Strings to their position in an index.
The ratings from the existing reviews can then be created as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val userDict = new Dictionary(MyUsername +: trainingRatings.map(_.userId).distinct.collect)
val productDict = new Dictionary(trainingRatings.map(_.productId).distinct.collect)
 
private def toSparkRating(amazonRating: AmazonRating) = {
  Rating(userDict.getIndex(amazonRating.userId),
    productDict.getIndex(amazonRating.productId),
    amazonRating.rating)
}
 
private def toAmazonRating(rating: Rating) = {
  AmazonRating(userDict.getWord(rating.user),
    productDict.getWord(rating.product),
    rating.rating
  )
}
 
// convert to Spark Ratings using the dictionaries
val sparkRatings = trainingRatings.map(toSparkRating)
Then in order to train the recommender, we pass the existing ratings from the CSV as well as the ratings that you left and then predict the ratings on all the products for you that you haven’t rated. We then order them by rating and keep the top 10.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def predict(ratings: Seq[AmazonRating]) = {
  // train model
  val myRatings = ratings.map(toSparkRating)
  val myRatingRDD = sc.parallelize(myRatings)
  val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions),
                        10, 20, 0.01)
 
  val myProducts = myRatings.map(_.product).toSet
  val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains))
 
  // get all products not in my history ordered by rating (higher first)
  val myUserId = userDict.getIndex(MyUsername)
  val recommendations = model.predict(candidates.map((myUserId, _))).collect()
  recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
}
 In the past 10 years, recommendation algorithms have improved to support incremental updates (Incremental Collaborative Filtering or ICF) to provide real-time recommendations that can be particularly useful especially in advertising for real-time intent targeting. Indeed presenting relevant ads to a user based on her immediate history have more impact than presenting ads based on her history from 2 hours ago.
Read full article from Building a food recommendation engine with Spark / MLlib and Play | Chimpler

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.