Wednesday, November 26, 2014

Parallel Programming With Spark

Improves efficiency through:
» In4memory computing primitives
»General computation graphs

Spark originally written in Scala, which allows
concise function syntax and interactive use
Recently added Java API for standalone apps

Concept: resilient distributed datasets (RDDs)
» Immutable collections of objects spread across a cluster
» Built through parallel transformations (map, filter, etc)
» Automatically rebuilt on failure
» Controllable persistence (e.g. caching in RAM) for reuse

Main Primitives
Resilient distributed datasets (RDDs)
» Immutable, partitioned collections of objects
Transformations (e.g. map, filter, groupBy, join)
» Lazy operations to build RDDs from other RDDs
Actions (e.g. count, collect, save)
»Return a result or write it to storage

RDD Fault Tolerance
RDDs track the series of transformations used to
build them (their lineage) to recompute lost data

Easiest way: Spark interpreter (spark-shell)
» Modified version of Scala interpreter for cluster use
Runs in local mode on 1 thread by default, but
can control through MASTER environment var:
MASTER=local ./spark-shell # local, 1 thread
MASTER=local[2] ./spark-shell # local, 2 threads
MASTER=host:port ./spark-shell # run on Mesos

First Stop: SparkContext
Main entry point to Spark functionality
Created for you in spark-shell as variable sc

Creating RDDs
// Turn a Scala collection into an RDD
sc.parallelize(List(1, 2, 3))
// Load text file from local FS, HDFS, or S3
// Use any existing Hadoop InputFormat
sc.hadoopFile(keyClass, valClass, inputFmt, conf)

val nums = sc.parallelize(List(1, 2, 3))
// Pass each element through a function
val squares = => x*x) // {1, 4, 9}
// Keep elements passing a predicate
val even = squares.filter(_ % 2 == 0) // {4}
// Map each element to zero or more others
nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}

Working with KeyUValue Pairs
Spark’s "distributed reduce" transformations
operate on RDDs of key4value pairs
Scala pair syntax:
val pair = (a, b) // sugar for new Tuple2(a, b)
Accessing pair elements:
pair._1 // => a
pair._2 // => b

Some KeyUValue Operations
val pets = sc.parallelize( List(("cat", 1), ("dog", 1), ("cat", 2)))
pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)}
pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)}
pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey also automatically implements
combiners on the map side

val lines = sc.textFile("hamlet.txt")
val counts = lines.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)

 val visits = sc.parallelize(List(
, ""),
, ""),
, "")))
val pageNames = sc.parallelize(List(
, "Home"), ("about.html"
, "About")))
// ("index.html", ("", "Home"))
// ("index.html", ("", "Home"))
// ("about.html", ("", "About"))
// ("index.html", (Seq("", ""), Seq("Home")))
// ("about.html", (Seq(""), Seq("About")))

Controlling The Number of
Reduce Tasks
All the pair RDD operations take an optional
second parameter for number of tasks
words.reduceByKey(_ + _, 5)
visits.join(pageViews, 5)
Can also set spark.default.parallelism property

Using Local Variables
Any external variables you use in a closure will
automatically be shipped to the cluster:
val query = Console.readLine()
Some caveats:
» Each task gets a new copy (updates aren’t sent back)
»Variable must be Serializable
»Don’t use fields of an outer object (ships all of it!)

sample(): deterministically sample a subset
union(): merge two RDDs
cartesian(): cross product
pipe(): pass through external program

Task Scheduler
Runs general task
Pipelines functions
where possible
Cache4aware data
reuse   locality
to avoid shuffles
