Wednesday, November 26, 2014

Apache Spark RDD API Examples


Apache Spark RDD API Examples
an RDD is actually more than that. On cluster installations, separate data partitions can be on separate nodes. Using the RDD as a handle one can access all partitions and perform computations and transformations using the contained data. Whenever a part of a RDD or an entire RDD is lost, the system is able to reconstruct the data of lost partitions by using lineage information. Lineage refers to the sequence of transformations used to produce the current RDD. As a result, Spark is able to recover automatically from most failures.

All RDDs available in Spark derive either directly or indirectly from the class RDD. This class comes with a large set of methods that perform operations on the data within the associated partitions. The class RDD is abstract. Whenever, one uses a RDD, one is actually using a concertized implementation of RDD. These implementations have to overwrite some core functions to make the RDD behave as expected.

It does not impose restrictions regarding what data can be stored within RDD partitions.
The basic RDD API considers each data item as a single value. However, users often want to work with key-value pairs. Therefore Spark extended the interface of RDD to provide additional functions (PairRDDFunctions), which explicitly work on key-value pairs. Currently, there are four extensions to the RDD API available in spark. They are as follows:
DoubleRDDFunctions 
This extension contains many useful methods for aggregating numeric values. They become available if the data items of an RDD are implicitly convertible to the Scala data-type double.
PairRDDFunctions 
Methods defined in this interface extension become available when the data items have a two component tuple structure. Spark will interpret the first tuple item (i.e. tuplename. 1) as the key and the second item (i.e. tuplename. 2) as the associated value.
OrderedRDDFunctions 
Methods defined in this interface extension become available if the data items are two-component tuples where the key is implicitly sortable.
SequenceFileRDDFunctions 
This extension contains several methods that allow users to create Hadoop sequence- les from RDDs. The data items must be two compo- nentkey-value tuples as required by the PairRDDFunctions. However, there are additional requirements considering the convertibility of the tuple components to Writable types.

val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggregate(0)(math.max(_, _), _ + _)

sortBy

This function sorts the input RDD's data and stores it in a new RDD. The first parameter requires you to specify a function which  maps the input data into the key that you want to sortBy. The second parameter (optional) specifies whether you want the data to be sorted in ascending or descending order.
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
z.sortBy(c => c._2, true).collect

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.stats

sortByKey [Ordered]

This function sorts the input RDD's data and stores it in a new RDD. The output RDD is a shuffled RDD because it stores data that is output by a reducer which has been shuffled. The implementation of this function is actually very clever. First, it uses a range partitioner to partition the data in ranges within the shuffled RDD. Then it sorts these ranges individually with mapPartitions using standard sort mechanisms.
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.sum
takeOrdered

Orders the data items of the RDD using their inherent implicit ordering function and returns the first n items as an array.
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(2)
takeSample

Behaves different from sample in the following respects:
  •   It will return an exact number of samples (Hint: 2nd parameter)
  •   It returns an Array instead of RDD.
  •   It internally randomizes the order of the items returned.
val x = sc.parallelize(1 to 1000, 3)
x.takeSample(true, 100, 1)
toDebugString
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.toDebugString
top

Utilizes the implicit ordering of $T$ to determine the top $k$ values and returns them as an array.
union, ++

Performs the standard set operation: A union B
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect

unpersist

Dematerializes the RDD (i.e. Erases all data items from hard-disk and memory). However, the RDD object remains. If it is referenced in a computation, Spark will regenerate it automatically using the stored dependency graph.

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.values.collect

zip

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-value pairs by the methods provided by the PairRDDFunctions extension.
val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
val c = sc.parallelize(201 to 300, 3)
a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect

zipWithIndex

Zips the elements of the RDD with its element indexes. The indexes start from 0. If the RDD is spread across multiple partitions then a spark Job is started to perform this operation.
val z = sc.parallelize(Array("A", "B", "C", "D"))
val r = z.zipWithIndex
zipWithUniqueId

This is different from zipWithIndex since just gives a unique id to each data element but the ids may not match the index number of the data element. This operation does not start a spark job even if the RDD is spread across multiple partitions.
val z = sc.parallelize(100 to 120, 5)
val r = z.zipWithUniqueId
r.collect

Read full article from Apache Spark RDD API Examples

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.