Notes on Apache Spark 2 - The Risberg Family
Notes on Apache Spark 2
Created 01/17/17
Updated 03/29/17, Updated 04/04/17, Updated 05/26/17, Updated 06/01/17, Updated 07/13/17
Introduction
Apache Spark is an open-source data analytics cluster computing framework originally developed in the AMPLab at UC Berkeley. Spark fits into the Hadoop open-source community, building on top of the Hadoop Distributed File System (HDFS). However, Spark is not tied to the two-stage MapReduce paradigm, and promises performance up to 100 times faster than Hadoop MapReduce, for certain applications. Spark provides primitives for in-memory cluster computing that allows user programs to load data into a cluster's memory and query it repeatedly, making it well suited to machine learning algorithms.
Provides a way to carry out distributed calculations and collect the results. Since this largely works in-memory, it can be much more efficient than Hadoop.
There are three levels at which to learn Apache Spark, and we have been carrying these out in a study group:
• As a strong justification for learning Scala, since much of the Spark API is Scala-based (though there are Java and Python versions). Most of the Scala learning sources are from the Coursera class on Functional Programming (see related document)
• As an implementation and deployment facility for distributed calculations on large data sets. With the promise of higher performance compared to Hadoop, and reduced learning curve compared to Hadoop. There are still some Hadoop aspects that are important to learn, such as the Hadoop file system, since that provides a data source.
• As a community that has developed and is developing a set of libraries for running common “big data” analytics and machine learning algorithms on top of the Spark platform, just as Mahout provides a set of libraries for such analytics on the Hadoop platform. Since many of these libraries are less mature than their Mahout/Hadoop counterparts, the emphasis here is the community as a vehicle for support.
There have been two major versions:
• The 2.x versions have added pipelines, and also Java compatibility. In this version, DataFrames and optimization of functions running over data tables have been greatly enhanced (these are called Tungsten and Catalyst optimizations). Most of the Machine Learning libraries to a new version, which uses DataFrames instead of RDDs (feature parity with the prior version of ML libraries is expected as of Spark 3.0).
• The 1.x versions were the initial releases, and created the basic Spark concepts of RDDs and operations on them. The interface was focused on Scala and Python. Starting in release 1.3, the DataFrame object was added as a layer above the RDD, which also included support for columns and database query-like operations.
The current version is 2.2.10, released July 2017.
A prior current version was 2.1.1, released early 2017. The first release of 2.x was in July 2016.
The distribution includes the core libraries, the Scala, Java, and Python API’s, a large set of examples, and the Shark, Streaming, and machine learning libraries. Prior to 2017, we have been primarily working with the Scala API’s.
[pic]
Spark is similar to Hadoop in ecosystem structure. The MLlib is the most useful part for our projects, and is similar to Mahout in structure.
Features
• Java, Scala, and Python APIs.
• Proven scalability to 100 nodes in the research lab and 80 nodes in production at Yahoo!.
• Ability to cache datasets in memory for interactive data analysis: extract a working set, cache it, query it repeatedly.
• Interactive command line interface (in Scala or Python) for low-latency data exploration at scale.
• Higher level library for stream processing, through Spark Streaming.
• Higher level libraries for machine learning and graph processing that because of the distributed memory-based Spark architecture are ten times as fast as Hadoop disk-based Apache Mahout and even scale better than Vowpal Wabbit.[16]
Persistence layers for Spark: Spark can create distributed datasets from any file stored in the Hadoop distributed file
system (HDFS) or other storage systems supported by Hadoop (including your local file system, Amazon S3, Cassandra, Hive, HBase, etc). Spark supports text files, SequenceFiles, Avro, Parquet, and any other Hadoop InputFormat.
Release Status and History
Spark became an Apache Top-Level Project in February 2014, and was previously an Apache Incubator project since June 2013. It has received code contributions from large companies that use Spark, including Yahoo! and Intel as well as small companies and startups such as Conviva, Quantifind, ClearStoryData, Ooyala and many more. By March 2014, over 150 individual developers had contributed code to Spark, representing over 30 different companies. Prior to joining Apache Incubator, versions 0.7 and earlier were licensed under the BSD license.
Who uses Spark
Because Spark is a general-purpose framework for cluster computing, it is used for a diverse range of applications. In the “Learning Spark” book they outlined two personas that this book targets as readers: Data Scientists and Engineers. Let’s take a closer look at each of these personas and how they use Spark. Unsurprisingly, the typical use cases differ across the two personas, but we can roughly classify them into two categories, data science and
data applications.
Of course, these are imprecise personas and usage patterns, and many folks have skills from both, sometimes playing the role of the investigating Data Scientist, and then “changing hat” and writing a hardened data processing system. Nonetheless, it can be illuminating to consider the two personas and their respective use cases separately.
The Data Scientist is running experiments and analyzing data. He/She is focused on algorithms.
The Engineer is developing Data Analysis Applications. He/She is focused on development and deployment.
Resources
The main web site is at
This was a very good article, written by the CTO of Databricks.
“Advanced Analytics with Spark (Second Edition)” by Sandy Ryza, Uri Laserson, Sean Owen, Josh Wills. O'Reilly Media, March 2017 (for early edition), 271 pages. List price $49.99, Amazon price $39.34. Printed version expected in July 2017. We bought this in May 2017.
“Spark: The Definitive Guide” by Bill Chambers and Matei Zaharia. O’Reilly Press, January 2017, 450 pages. Not on Amazon yet, only available from OReilly. Updated for Spark 2.x. This is a pre-print, the printed copy is expected October 2017.
“Spark in Action” by Petar Zecevic and Marko Bonaci. Manning Press, November 2016, 472 pages. Rated 3.8 out of 5 stars. One of the first books updated for Spark 2.x.
“Learning Spark: Lightning-Fast Big Data Analytics” by Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia. O'Reilly Media, early 2015. List price is $39.99. Rated 4.1 stars on . We have this plus updates.
“Advanced Analytics with Spark: Patterns for Learning from Data at Scale” by Sandy Ryza, Uri Laserson, Sean Owen, Josh Wills. O'Reilly Media, November 2014, 268 pages. List price $49.99, Amazon price $39.34. Rated 4.8 stars on .
From the Spark Summit:
About DataFrames:
meetup has been posted:
2016 version of Spark course, using DataFrames:
Others:
These templates (and tutorials) should be a good way to get set up with Spark if you're not already.
Contributing to Spark
They suggested that we download as source.
Concepts
At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
Think of Spark as a job-execution system. You define a job with a set of scheduling parameters, and information about the resources to use (local cluster or remote cluster, etc.).
The job is then carried out, and you get results and callbacks.
What is useful is that objects which are created within the Spark context are will have operations on their automatically distributed to the cluster processors.
There are also a set of monitoring constructs.
Jobs and Stages
[pic]
Modules in the Distribution
common/sketch
common/network-common
common/network-shuffle
common/unsafe
common/tags
core
graphx
mllib
mllib-local
tools
streaming
sql/catalyst
sql/core
sql/hive
assembly
external/flume
external/flume-sink
external/flume-assembly
examples
repl
launcher
external/kafka-0-8
external/kafka-0-8-assembly
external/kafka-0-10
external/kafka-0-10-assembly
external/kafka-0-10-sql
Getting Started
These exercises give you practice in the following areas:
• Installing and experimenting with the Scala language
• Learning about Scala collections
• Installing Spark and running your first job
• Improving performance through multithreading
• Improving performance through configuration
A good example can be downloaded from the Spark-template repo on Git
Prerequisites
This set of exercises requires some basic knowledge of Linux, including the ability to install new applications.
Knowledge of the Scala language is beneficial but not required. You must perform these exercises in order, as they illustrate the installation of the necessary software packages.
Building for Scala 2.10
To produce a Spark package compiled with Scala 2.10, use the -Dscala-2.10 property:
./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
Note that support for Scala 2.10 is deprecated as of Spark 2.1.0 and may be removed in Spark 2.2.0.
Build success in 16 minutes.
First Sample Project
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: SparkPi []")
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map {
i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
// block so we can look at
Thread.sleep(1000 * 3600)
}
}
While Spark was running, I looked at localhost:4040, and captured the following:
[pic]
Second Sample Project
/* Finds the average of a list of numbers */
object BasicAvg {
def main(args: Array[String]) {
val master = args.length match {
case x: Int if x > 0 => args(0)
case _ => "local"
}
val sc = new SparkContext(master, "BasicAvg", System.getenv("SPARK_HOME"))
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.aggregate((0, 0))(
(x, y) => Log("seqOp", (x, y), (x._1 + y, x._2 + 1)),
(x, y) => Log("combOp", (x, y), (x._1 + y._1, x._2 + y._2)))
val avg = result._1 / result._2.toFloat
println(result)
}
}
Where Log is defined as:
object Log {
/** Useful for function literals. */
def apply[U, T](name: String, arg: U, result: T): T = {
println(s"$name: $arg => $result")
result
}
def apply[T](name: String, t: T): T = {
println(s"$name: $t")
t
}
}
Spark API: Control Elements
The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. This is done through the following constructor:
new SparkContext(master, appName, [sparkHome], [jars])
or through new SparkContext(conf), which takes a SparkConf object for more advanced configuration.
The master parameter is a string specifying a Spark or Mesos cluster URL to connect to, or a special “local” string to run in local mode, as described below. appName is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the MASTER environment variable, and you can add JARs to the classpath with the ADD_JARS variable. For example, to run bin/spark-shell on four cores, use
$ MASTER=local[4] ./bin/spark-shell
Or, to also add code.jar to its classpath, use:
$ MASTER=local[4] ADD_JARS=code.jar ./bin/spark-shell
Master URLs
The master URL passed to Spark can be in one of the following formats:
|Master URL |Meaning |
|local |Run Spark locally with one worker thread (i.e. no parallelism at all). |
|local[K] |Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
|spark://HOST:PORT |Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured |
| |to use, which is 7077 by default. |
|mesos://HOST:PORT |Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be |
| |whichever one the master is configured to use, which is 5050 by default. |
If no master URL is specified, the spark shell defaults to “local”.
For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see running on YARN for details.
Spark Session vs. Spark Context
Generally, a session is an interaction between two or more entities. In computer parlance, its usage is prominent in the realm of networked computers on the Internet. First with TCP session, then with login session, followed by HTTP and user session, so no surprise that we now have SparkSession, introduced in Apache Spark 2.0.
Beyond a time-bounded interaction, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. Most importantly, it curbs the number of concepts and constructs a developer has to juggle while interacting with Spark.
Whereas in Spark 2.0 the same effects can be achieved through SparkSession, without explicitly creating SparkConf, SparkContext or SQLContext, as they’re encapsulated within the SparkSession. Using a builder design pattern, it instantiates a SparkSession object if one does not already exist, along with its associated underlying contexts.
// Create a SparkSession. No need to create SparkContext
// You automatically get it as part of the SparkSession
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("SparkSessionZipsExample")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
Programming with Datasets and DataFrames
Good external resource:
Java API
A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.
Operations available on Datasets are divided into transformations and actions. Transformations are the ones that produce new Datasets, and actions are the ones that trigger computation and return results. Example transformations include map, filter, select, and aggregate (groupBy). Example actions count, show, or writing data out to file systems.
Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation required to produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. To explore the logical plan as well as optimized physical plan, use the explain function.
To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark's internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.
There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession.
Programming with Resilient Distributed Datasets (RDDs)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.
What are the ways that RDDs can be created?
HDFS
Persist in memory
Failure recovery
Creating RDD’s
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
The simplest way to create RDDs is to take an existing in-memory collection and pass it to SparkContext’s parallelize method. This approach is very useful when learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them. Keep in mind however, that outside of prototyping and testing, this is not widely used since it requires you have your entire dataset in memory on one machine.
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
Once created, the distributed dataset (distData here) can be operated on in parallel. For example, we might call distData.reduce(_ + _) to add up the elements of the array. We describe operations on distributed datasets later on.
One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).
Hadoop Datasets
Spark can create distributed datasets from any file stored in the Hadoop distributed file system (HDFS) or other storage systems supported by Hadoop (including your local file system, Amazon S3, Hypertable, HBase, etc). Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, kfs://, etc URI). Here is an example invocation:
scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduceoperations as follows: distFile.map(_.size).reduce(_ + _).
The textFile method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.
For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically read IntWritables and Texts.
Finally, for other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
RDD Operations
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new distributed dataset representing the results. On the other hand, reduce is an action that aggregates all the elements of the dataset using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
Transformations
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
By default, each transformed RDD is recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options.
The following table lists the transformations commonly used (see also the RDD API doc for details):
|Transformation |Meaning |
|map(func) |Return a new distributed dataset formed by passing each element of the source through a |
| |function func. |
|filter(func) |Return a new dataset formed by selecting those elements of the source on which func returns true. |
|flatMap(func) |Similar to map, but each input item can be mapped to 0 or more output items (so func should return |
| |a Seq rather than a single item). |
|mapPartitions(func) |Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type |
| |Iterator[T] => Iterator[U] when running on an RDD of type T. |
|mapPartitionsWithIndex(func) |Similar to mapPartitions, but also provides func with an integer value representing the index of |
| |the partition, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of |
| |type T. |
|sample(withReplacement, fraction, seed) |Sample a fraction fraction of the data, with or without replacement, using a given random number |
| |generator seed. |
|union(otherDataset) |Return a new dataset that contains the union of the elements in the source dataset and the |
| |argument. |
|distinct([numTasks])) |Return a new dataset that contains the distinct elements of the source dataset. |
|groupByKey([numTasks]) |When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. |
| |Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an |
| |optional numTasksargument to set a different number of tasks. |
|reduceByKey(func, [numTasks]) |When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for |
| |each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce |
| |tasks is configurable through an optional second argument. |
|sortByKey([ascending], [numTasks]) |When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) |
| |pairs sorted by keys in ascending or descending order, as specified in the |
| |boolean ascending argument. |
|join(otherDataset, [numTasks]) |When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all |
| |pairs of elements for each key. |
|cogroup(otherDataset, [numTasks]) |When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples.|
| |This operation is also called groupWith. |
|cartesian(otherDataset) |When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of |
| |elements). |
A complete list of transformations is available in the RDD API doc.
Actions
The most common action on basic RDDs you will likely use is reduce. Reduce takes in a function which operates on two elements of the same type of your RDD and returns a new element of the same type. A simple example of such a function is + , which we can use to sum our RDD. With reduce we can easily sum the elements of our RDD, count the number of elements, and perform other types of aggregations.
Here is an example of a reduce
val sum = rdd.reduce((x, y) => x + y)
This takes a pair of values from the RDD and produces a new value which is the sum.
Similar to reduce is fold which also takes a function with the same signature as needed for reduce, but also takes a “zero value” to be used for the initial call on each partition.
The zero value you provide should be the identity element for your operation, that is applying it multiple times with your function should not change the value, (e.g. 0 for +, 1 for *, or an empty list for concatenation).
Fold and reduce both require that the return type of our result be the same type as that of the RDD we are operating over. This works well for doing things like sum, but sometimes we want to return a different type. For example when computing the running average we need to have a different return type. We could implement this using a map first where we transform every element into the element and the number 1 so that the reduce function can work on pairs.
The aggregate function frees us from the constraint of having the return be the same type as the RDD which we are working on. With aggregate, like fold, we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.
Here is an example of using aggregate (this is directly from the book):
val result = input.aggregate((0, 0))(
(x, y) => (x._1 + y, x._2 + 1),
(x, y) => (x._1 + y._1, x._2 + y._2))
val avg = result._1 / result._2.toDouble
In this case, the first function closure is the code to update an element for a new data point, and the second function closure is the code to merge two elements.
The following table lists the transformations commonly used (see also the RDD API doc for details):
|Action |Meaning |
|reduce(func) |Aggregate the elements of the dataset using a function func (which takes two arguments |
| |and returns one). The function should be commutative and associative so that it can be |
| |computed correctly in parallel. |
|collect() |Return all the elements of the dataset as an array at the driver program. This is usually|
| |useful after a filter or other operation that returns a sufficiently small subset of the |
| |data. |
|count() |Return the number of elements in the dataset. |
|first() |Return the first element of the dataset (similar to take(1)). |
|take(n) |Return an array with the first n elements of the dataset. Note that this is currently not|
| |executed in parallel. Instead, the driver program computes all the elements. |
|takeSample(withReplacement,num, seed) |Return an array with a random sample of num elements of the dataset, with or without |
| |replacement, using the given random number generator seed. |
|saveAsTextFile(path) |Write the elements of the dataset as a text file (or set of text files) in a given |
| |directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark |
| |will call toString on each element to convert it to a line of text in the file. |
|saveAsSequenceFile(path) |Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local |
| |filesystem, HDFS or any other Hadoop-supported file system. This is only available on |
| |RDDs of key-value pairs that either implement Hadoop's Writable interface or are |
| |implicitly convertible to Writable (Spark includes conversions for basic types like Int, |
| |Double, String, etc). |
|countByKey() |Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count |
| |of each key. |
|foreach(func) |Run a function func on each element of the dataset. This is usually done for side effects|
| |such as updating an accumulator variable (see below) or interacting with external storage|
| |systems. |
A complete list of actions is available in the RDD API doc.
Some actions on RDDs return some or all of the data to our driver program in the form of a regular collection or value.
The simplest and most common operation that returns data to our driver program is collect(), which returns the entire RDDs contents. collect suffers from the restriction that all of your data must fit on a single machine, as it all needs to be copied to the driver.
take(n) returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection. It’s important to note that these operations do not return the elements in the order you might expect.
These operations are useful for unit tests and quick debugging, but may introduce bottlenecks when dealing with large amounts of data.
If there is an ordering defined on our data, we can also extract the top elements from an RDD using top. top will use the default ordering on the data, but we can supply our own comparison function to extract the top elements.
Sometimes we need a sample of our data in our driver program. The takeSample(withReplacement, num, seed) function allows us to take a sample of our data either with or without replacement.
The further standard operations on a basic RDD all behave pretty much exactly as one would imagine from their name. count() returns a count of the elements, and countByValue() returns a map of each unique value to its count.
Passing Functions to Spark
Most of Spark transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data. Each of the core languages has a slightly different mechanism for passing functions to Spark.
RDD Persistence
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter.
As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times. Another trivial example would be doing a count and then writing out the same RDD.
Example 3-36. Scala double execute example
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))
To avoid computing an RDD multiple times, we can ask Spark to persist the data. When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions. If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown.
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
In addition, each RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing aorg.apache.spark.storage.StorageLevel object to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The complete set of available storage levels is:
|Storage Level |Meaning |
|MEMORY_ONLY |Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some |
| |partitions will not be cached and will be recomputed on the fly each time they're needed. This|
| |is the default level. |
|MEMORY_AND_DISK |Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store |
| |the partitions that don't fit on disk, and read them from there when they're needed. |
|MEMORY_ONLY_SER |Store RDD as serialized Java objects (one byte array per partition). This is generally more |
| |space-efficient than deserialized objects, especially when using a fast serializer, but more |
| |CPU-intensive to read. |
|MEMORY_AND_DISK_SER |Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of |
| |recomputing them on the fly each time they're needed. |
|DISK_ONLY |Store the RDD partitions only on disk. |
|MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |Same as the levels above, but replicate each partition on two cluster nodes. |
Which Storage Level to Choose?
Spark’s storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency. We recommend going through the following process to select one:
• If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
• If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
• Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk.
• Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method apply() of theStorageLevel singleton object.
Working with Key-Value Pairs
This chapter covers how to work with RDDs of key-value pairs, which are a common data type required for many operations in Spark. Key-value RDDs expose new operations such as aggregating data items by key (e.g., counting up reviews for each product), grouping together data with the same key, and grouping together two different RDDs.
Oftentimes, to work with data records in Spark, you will need to turn them into keyvalue pairs and apply one of these operations.
We also discuss an advanced feature that lets users control the layout of pair RDDs across nodes: partitioning. Using controllable partitioning, applications can sometimes greatly reduce communication costs, by ensuring that data that will be accessed together will be on the same node. This can provide significant speedups. We illustrate partitioning using the PageRank algorithm as an example. Choosing the right partitioning for a distributed dataset is similar to choosing the right data structure for a local one — in both cases, data layout can greatly affect performance.
More to be filled in here from chapter 4 of the Learning Spark book.
Creating Pair RDDs
There are a number of ways to get Pair RDDs in Spark. Many formats we explore in Chapter 5 will directly return Pair RDDs for their key-value data. In other cases we have a regular RDD that we want to turn into a Pair RDDs. To illustrate creating a Pair RDDs we will key our data by the first word in each line of the input.
In Python, for the functions on keyed data to work we need to make sure our RDD consists of tuples.
Example 4-1. Python create pair RDD using the first word as the key
input.map(lambda x: (x.split(" ")[0], x))
In Scala, to create Pair RDDs from a regular RDD, we simply need to return a tuple from our function.
Example 4-2. Scala create pair RDD using the first word as the key
input.map(x => (x.split(" ")(0), x))
Java doesn’t have a built-in tuple type, so Spark’s Java API has users create tuples using the scala.Tuple2 class. This class is very simple: Java users can construct a new tuple by writing new Tuple2(elem1, elem2) and can then access the elements with the ._1() and ._2() methods.
Java users also need to call special versions of Spark’s functions when creating Pair RDDs. For instance, the mapToPair function should be used in place of the basic map function. This is discussed in more detail in converting between RDD types, but lets look at a simple example below.
PairFunction keyData =
new PairFunction() {
public Tuple2 call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD rdd = input.mapToPair(keyData);
When creating a Pair RDD from an in memory collection in Scala and Python we only need to make sure the types of our data are correct, and call parallelize. To create a Pair RDD in Java from an in memory collection we need to make sure our collection consists of tuples and a also call SparkContext.parallelizePairs instead of SparkContext.parallelize.
Transformations on Pair RDDs
Pair RDDs are allowed to use all the transformations available to standard RDDs. The same rules from passing functions to spark apply. Since Pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.
In Java and Scala when we run a map or filter or similar over a Pair RDDs, our function will be passed an instance of scala.Tuple2. In Scala pattern matching is a common way of extracting the individual values, whereas in Java we use the ._1() and ._2() values to access the elements. In Python our Pair RDDs consist of standard Python tuple objects that we interact with as normal.
For instance, we can take our Pair RDD from the previous section and filter out lines longer than 20 characters.
Operations
Aggregate
Group
Join
Data Partitioning
The final Spark feature we will discuss in this chapter is how to control datasets’ partitioning across nodes. In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance. Much like how a single-node program needs to choose the right data structure for a collection
of records, Spark programs can choose to control their RDD’s partitioning to reduce communication. Partitioning will not be helpful in all applications — for example, if a given RDD is only scanned once, there is no point in partitioning it in advance. It is only useful when a dataset is reused multiple times in key-oriented operations such as joins. We will give some examples below.
Spark’s partitioning is available on all RDDs of key-value pairs, and causes the system to group together elements based on a function of each key. Although Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of
keys will appear together on some node. For example, one might choose to hashpartition an RDD into 100 partitions so that keys that have the same hash value modulo 100 appear on the same node. Or one might range-partition the RDD into sorted ranges of keys so that elements with keys in the same range appear on the same node.
Determining an RDDs Partitioner
In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java).1 This returns a scala.Option object, which is a Scala class for a container that may or may not contain one item. (It is considered good practice in Scala to use Option for fields that may not be present, instead of setting a field to null if a value is not present, running the risk of a null-pointer
exception if the program subsequently tries to use the null as if it were an actual, present value.) You can call isDefined() on the Option to check whether it has a value, and get() to get this value. If present, the value will be a spark.Partitioner object. This is essentially a function telling the RDD which partition each key goes into — more about this later.
Loading and Saving Data
Hadoop
Database
Flat files
Advanced Programming
Shared Variables
Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.
Broadcast Variables
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The interpreter session below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
Accumulators
Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types.
An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the += operator. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
The interpreter session below shows an accumulator being used to add up the elements of an array:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
Running Spark on a Cluster
Before diving into the specifics of running Spark on a cluster, it’s helpful to understand the architecture of Spark when running in a cluster.
When running in cluster mode, Spark utilizes a master-slave architecture with one central coordinator and many distributed workers. The central coordinator is called the driver. The driver communicates with potentially larger number of distributed workers called executors. The driver runs in its own Java process and each executor is a Java
process. A driver and its executors are together termed a Spark application.
A Spark application is launched on a set of machines using an external service called a cluster manager. Spark is packaged with a built-in cluster manager called the Standalone cluster manager. Spark also works with Apache YARN and Apache Mesos, two popular open source cluster managers.
[pic]
There are several possible Cluster Managers
- Standalone server
- Mesos
Configuring
If you want to run your application on a cluster, you will need to specify the two optional parameters to SparkContext to let it find your code:
• sparkHome: The path at which Spark is installed on your worker machines (it should be the same on all of them).
• jars: A list of JAR files on the local machine containing your application’s code and any dependencies, which Spark will deploy to all the worker nodes. You’ll need to package your application into a set of JARs using your build system. For example, if you’re using SBT, the sbt-assembly plugin is a good way to make a single JAR with your code and dependencies.
If you run bin/spark-shell on a cluster, you can add JARs to it by specifying the ADD_JARS environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, ADD_JARS=a.jar,b.jar ./bin/spark-shell will launch a shell with a.jarand b.jar on its classpath. In addition, any new classes you define in the shell will automatically be distributed.
Deploying to a Cluster with Spark-Submit
Spark provides a uniform interface for submitting jobs across all cluster managers, a tool aptly named spark-submit. In Chapter 2 you saw a simple example of submitting a Python program with spark-submit:
bin/spark-submit my_script.py
When spark-submit is called with nothing but the name of a script or jar, it simply runs the supplied Spark program locally. Let’s say we wanted to submit this program to a Spark Standalone cluster. We can provide extra flags with the address of a standalone cluster and a specific size of executor we’d like to launch:
bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py
spark-submit provides a variety of options that let you control specific details about a particular run of your application. These options fall roughly into two categories. The first is the location of the cluster manager along with an amount of resources you’d like to request for your job (as shown above). The second is information about the runtime dependencies of your application, such as libraries or files you want to be present on all worker machines.
The general format for spark-submit is the following:
bin/spark-submit [options] [app options]
[options] are a list of flags for spark-submit. You can enumerate all possible flags by passing --help to spark-submit. A list of common flags is enumerated in Table 7-1.
refers to the jar or Python script containing the entry point
into your application.
[app options] are options that will be passed onto your application. If the main method of your program parses its calling arguments, it will see only [app options] and not the flags specific to spark-submit.
Table 7-1. Common flags for spark-submit
Flag Explanation
--master Indicates the cluster manager to connect to. The options for this flag are described in Table 7-2.
--deploy-mode Whether to launch the driver program locally (“client”) or one of the worker machines inside the cluster
(“cluster”). In client mode spark-submit will run your driver on the same machine where sparksubmit
is itself being invoked. In cluster mode, the driver will be shipped to execute on a worker
node in the cluster. The default is client mode.
--class The “main” class of your application if running a Java or Scala program.
--name A human readable name for your application. This will be displayed in Spark’s web UI.
--jars A list of jar files to upload and place on the classpath of your application. If your application depends
on a small number of third-party jars, you can add them here.
--files A list of files to be placed in the working directory of your application.
--py-files A list of files to be added to the PYTHONPATH of your application.
--executormemory
The amount of memory to use for executors, in bytes. Suffixes can be used to specify larger quantities
such as “512m” (512 Megabytes) or “15g” (15 Gigabytes).
--driver-memory The amount of memory to use for the driver process, in bytes. Suffixes can be used to specify larger
quantities such as “512m” (512 Megabytes) or “15g” (15 Gigabytes).
[pic]
Monitoring Spark servers
Ooyala slides - Spark as a service
Ooyala Spark job server - Github
Also will review this, which is evidently a an open source web-UI for monitoring Spark jobs:
Streaming
More to be filled in here.
Counting and sorting
Ooops, that was just the counting. Here's the sorting:
val sortedCounts = counts.map { case(tag, count) => (count, tag) }.transform(rdd => rdd.sortByKey(false))
sortedCounts.foreach(rdd => println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n")))
On Wed, Jul 9, 2014 at 9:08 PM, Richard Walker wrote:
Here's an interesting bit of Spark code related to our consideration of counting and ranking algos we briefly touched on tonight. This is from a Spark Streaming tutorial included in the Spark Summit link I sent last week:
Count the hashtags over a 5 minute window: Next, we’d like to count these hashtags over a 5 minute moving window. A simple way to do this would be to gather together the last 5 minutes of data and process it in the usual map-reduce way — map each tag to a (tag, 1) key-value pair and then reduce by adding the counts. However, in this case, counting over a sliding window can be done more intelligently. As the window moves, the counts of the new data can be added to the previous window’s counts, and the counts of the old data that falls out of the window can be ‘subtracted’ from the previous window’s counts. This can be done using DStreams as follows:
val counts = hashtags.map(tag => (tag, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1))
Shark
This is described as the counterpart of Hive for Spark. It should make it easier to write Spark “jobs”. Documentation is rather limited at this point. The approach seems to be one of creating steps that fetch from the database.
Chronology
Late 2012 to April 2014: Learned about Spark, attended workshops, organized study groups. All of this was about the 1.0 version
February 2017: Downloaded Spark 2.1.0 and Spark 2.1.1, began to rebuild examples.
May 2017: The second edition of “Advanced Analytics Using Spark” came out, and we began to rebuild and re-learn. Created Spark2* examples.
Open Issues/Questions
How to control number of threads used?
How to assign to a Mesos cluster on AWS?
Appendix A: Learning Projects Created
Spark2J01
Uses Java API. Contains examples of loading DataFrames, and some basic ML examples.
Spark2S01
Uses Scala API. Contains examples of loading DataFrames, and some basic ML examples.
.
................
................
In order to avoid copyright disputes, this page is only a partial summary.
To fulfill the demand for quickly locating and searching documents.
It is intelligent file search solution for home and business.