Classifiying documents using Naive Bayes on Apache Spark / MLlib | Chimpler
In recent years, Apache Spark has gained in popularity as a faster alternative to Hadoop and it reached a major milestone last month by releasing the production ready version 1.0.0. It claims to be up to a 100 times faster by leveraging the distributed memory of the cluster and by not being tied to the multi stage execution of Map/Reduce. Like Hadoop, it offers a similar ecosystem with a database (Shark SQL), a machine learning library (MLlib), a graph library (GraphX) and many other tools built on top of Spark. Finally Spark integrates well with Scala and one can manipulate distributed collections just like regular Scala collections and Spark will take care of distributing the processing to the different workers. In this post, we describe how we used Spark / MLlib to classify HTML documents using the popular Reuters 21578 collection of documents that appeared on Reuters newswire in 1987 as a training set. The Reuters collection can be obtained from http://archive.ics.uci.
Read full article from Classifiying documents using Naive Bayes on Apache Spark / MLlib | Chimpler
In recent years, Apache Spark has gained in popularity as a faster alternative to Hadoop and it reached a major milestone last month by releasing the production ready version 1.0.0. It claims to be up to a 100 times faster by leveraging the distributed memory of the cluster and by not being tied to the multi stage execution of Map/Reduce. Like Hadoop, it offers a similar ecosystem with a database (Shark SQL), a machine learning library (MLlib), a graph library (GraphX) and many other tools built on top of Spark. Finally Spark integrates well with Scala and one can manipulate distributed collections just like regular Scala collections and Spark will take care of distributing the processing to the different workers. In this post, we describe how we used Spark / MLlib to classify HTML documents using the popular Reuters 21578 collection of documents that appeared on Reuters newswire in 1987 as a training set. The Reuters collection can be obtained from http://archive.ics.uci.
We’re going to do the following steps:
- parse XML documents (extract topic and content)
- tokenize and stem the documents
- create a dictionary out of all the words in the collection of documents and compute IDF (Inverse Document Frequency for each term)
- vectorize documents using TF-IDF scores
- train the Naive Bayes classifier
- classify HTML documents
We initialize the Spark Context to run locally using 4 workers:
1
| val sc = new SparkContext( "local[4]" , "naivebayes" ) |
Then we convert the collection of documents to a Resilient Distributed DataSet (RDD) using sc.parallelize():
1
| val termDocsRdd = sc.parallelize[TermDoc](termDocs.toSeq) |
Then in order to vectorize the documents, we create a dictionary that contains all the words contained in all the documents. This is simply achieved using a simple transformation:
1
| val terms = termDocsRdd.flatMap( _ .terms).distinct().collect().sortBy(identity) |
Spark will take care of distributing the work to the different workers and collect() will collect the data from the different workers.
Based on the dictionary, we compute the IDF score for each term. There are different formulas to calculate IDF scores. It’s usually:
idf(term, docs) = log[(number of documents) / (number of documents containing term)]
However in the implementation of Naive Bayes in MLlib, it’s using log, so we can get rid of it in the formula.
idf(term, docs) = (number of documents) / (number of documents containing term)
We also exclude words that are present in less than 3 documents (arbitrary) to remove too specific terms:
1
2
3
4
| val idfs = (termDocsRdd.flatMap(termDoc = > termDoc.terms.map((termDoc.doc, _ ))).distinct().groupBy( _ . _ 2 ) collect { case (term, docs) if docs.size > 3 = > term -> (numDocs.toDouble / docs.size.toDouble) }).collect.toMap |
We then vectorize each document by computing the TF-IDF score for each term they contain:
1
2
3
4
| (filteredTerms.groupBy(identity).map { case (term, instances) = > (indexOf(term), (instances.size.toDouble / filteredTerms.size.toDouble) * idfs(term)) }).toSeq.sortBy( _ . _ 1 ) // sort by termId |
and convert them into a collection of LabeledPoints. Each LabeledPoint represents a training document associated to a label id (a double number) and a sparse vector:
1
2
3
4
5
6
7
8
9
10
| val tfidfs = termDocsRdd flatMap { termDoc = > val termPairs = termDict.tfIdfs(termDoc.terms, idfs) termDoc.labels.headOption.map { label = > val labelId = labelDict.indexOf(label).toDouble val vector = Vectors.sparse(termDict.count, termPairs) LabeledPoint(labelId, vector) } } |
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.