分布式数据库期中作业说明
厦门大学林子雨编著
《大数据技术原理与应用》
进阶学习自学教程
Spark快速入门指南 – Spark安装与基础使用
主讲教师:林子雨
厦门大学数据库实验室
二零一六年一月
目录
1 前言 1
2 准备工作 1
3 安装Spark 1
4 运行Spark示例 2
5 通过Spark Shell进行交互分析 3
5.1 基础操作 4
5.2 RDD的更多操作 5
5.3 缓存 6
6 SparkSQL和DataFrames 6
7 Spark Streaming 8
8 独立应用程序(Self-Contained Application) 9
8.1 应用程序代码 9
8.2 安装sbt 10
8.3 使用sbt打包Scala程序 12
8.4 通过spark-submit运行程序 13
9 进阶学习 13
附录1:任课教师介绍 13
附录2:课程教材介绍 14
附录3:中国高校大数据课程公共服务平台介绍 15
《大数据技术原理与应用》
Spark快速入门指南 – Spark安装与基础使用
主讲教师:林子雨
E-mail: ziyulin@xmu. 个人主页:
前言
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 正如其名,最大的特点就是快(Lightning-fast),可比 Hadoop MapReduce 的处理速度快 100 倍。此外,Spark 提供了简单易用的 API,几行代码就能实现 WordCount。本教程主要参考官网快速入门教程,介绍了 Spark 的安装,Spark shell 、RDD、Spark SQL、Spark Streaming 等的基本使用。
[pic]
本教程的具体运行环境如下:
• CentOS 6.4
• Spark 1.6
• Hadoop 2.6.0
• Java JDK 1.7
• Scala 2.10.5
准备工作
运行 Spark 需要 Java JDK 1.7,CentOS 6.x 系统默认只安装了 Java JRE,还需要安装 Java JDK,并配置好 JAVA_HOME 变量。此外,Spark 会用到 HDFS 与 YARN,因此请先安装 Hadoop,具体请浏览Hadoop安装教程,在此就不再复述。
安装Spark
待 Hadoop 安装好之后,我们再开始安装 Spark。
官网下载地址:
本教程选择的是 Spark 1.6.0 版本,选择 package type 为 “Pre-build with user-provided Hadoop [can use with most Hadoop distributions]”,再点击给出的下载连接 就可以下载了,如下图所示:
[pic]
Package type
• Source code: Spark 源码,需要编译才能使用,另外 Scala 2.11 需要使用源码编译才可使用
• Pre-build with user-provided Hadoop: “Hadoop free” 版,可应用到任意 Hadoop 版本
• Pre-build for Hadoop 2.6 and later: 基于 Hadoop 2.6 的预先编译版,需要与本机安装的 Hadoop 版本对应。可选的还有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。
为方便,本教程选择的是 Pre-build with user-provided Hadoop,简单配置后可应用到任意 Hadoop 版本。
下载后,执行如下命令进行安装:
|sudo tar -zxf ~/下载/spark-1.6.0-bin-without-hadoop.tgz -C /usr/local/ |
|cd /usr/local |
|sudo mv ./spark-1.6.0-bin-without-hadoop/ ./spark |
|sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名 |
安装后,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,执行如下命令拷贝一个配置文件:
|cd /usr/local/spark |
|cp ./conf/spark-env.sh.template ./conf/spark-env.sh |
编辑 ./conf/spark-env.sh(vim ./conf/spark-env.sh) ,在最后面加上如下一行:
|export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) |
保存后,Spark 就可以启动、运行了。
运行Spark示例
注意,必须安装 Hadoop 才能使用 Spark,但如果使用 Spark 过程中没用到 HDFS,不启动 Hadoop 也是可以的。此外,接下来教程中出现的命令、目录,若无说明,则一般以 Spark 的安装目录(/usr/local/spark)为当前路径,请注意区分。
在 ./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:
|cd /usr/local/spark |
|./bin/run-example SparkPi |
执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):
|./bin/run-example SparkPi 2>&1 | grep "Pi is roughly" |
过滤后的运行结果如下图所示,可以得到 π 的 5 位小数近似值 :
[pic]
Python 版本的 SparkPi 则需要通过 spark-submit 运行:
|./bin/spark-submit examples/src/main/python/pi.py |
通过Spark Shell进行交互分析
Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本教程选择使用 Scala 来进行介绍。
Scala 是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。
Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。
执行如下命令启动 Spark Shell:
|./bin/spark-shell |
启动成功后如图所示,会有 “scala >” 的命令提示符。
[pic]
6 基础操作
Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。
我们从 ./README 文件新建一个 RDD,代码如下(代码中 // 后的内容为注释,本教程以 //// 开头的内容表示交互式输出结果):
|val textFile = sc.textFile("") |
|//// textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :27 |
代码中通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的错误。
上述命令的输出结果如下图所示:
[pic]
RDDs 支持两种类型的操作:
• actions: 在数据集上运行计算后返回值
• transformations: 转换, 从现有数据集创建一个新的数据集
下面我们就来演示 count() 和 first() 操作:
|textFile.count() // RDD 中的 item 数量,对于文本文件,就是总行数 |
|//// res0: Long = 95 |
|textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容 |
|//// res1: String = # Apache Spark |
接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下:
|val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行 |
|linesWithSpark.count() // 统计行数 |
|//// res4: Long = 17 |
可以看到一共有 17 行内容包含 Spark,这与通过 Linux 命令 cat ./README.md | grep "Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:
|textFile.filter(line => line.contains("Spark")).count() // 统计包含 Spark 的行数 |
|//// res4: Long = 17 |
7 RDD的更多操作
RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词:
|textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) |
|//// res5: Int = 14 |
代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:
|import java.lang.Math |
| |
|textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) |
|//// res6: Int = 14 |
Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount):
|val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // |
|实现单词统计 |
|//// wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :29 |
| |
|wordCounts.collect() // 输出单词统计结果 |
|//// res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...) |
8 缓存
Spark 支持在集群范围内将数据集缓存至每一个节点的内存中,可避免数据传输,当数据需要重复访问时这个特征非常有用,例如查询体积小的“热”数据集,或是运行如 PageRank 的迭代算法。调用 cache(),就可以将数据集进行缓存:
|linesWithSpark.cache() |
SparkSQL和DataFrames
Spark SQL 是 Spark 内嵌的模块,用于结构化数据。在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。
下面仍在 Spark shell 中演示一下 Spark SQL 的基本操作,该部分内容主要参考了 Spark SQL、DataFrames 和 Datasets 指南。
Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。在 Spark shell 启动时,输出日志的最后有这么几条信息:
16/01/16 13:25:41 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/01/16 13:25:41 INFO repl.SparkILoop: Created sql context..
SQL context available as sqlContext.
这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。
使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。作为示例,我们通过 Spark 提供的 JSON 格式的数据源文件 ./examples/src/main/resources/people.json 来进行演示,该数据源内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
执行如下命令导入数据源,并输出内容:
|val df = sqlContext.read.json("") |
|//// df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] |
| |
|df.show() // 输出数据源内容 |
|//// +----+-------+ |
|//// | age| name| |
|//// +----+-------+ |
|//// |null|Michael| |
|//// | 30| Andy| |
|//// | 19| Justin| |
|//// +----+-------+ |
接着,我们来演示 DataFrames 处理结构化数据的一些基本操作:
|df.select("name").show() // 只显示 "name" 列 |
|//// +-------+ |
|//// | name| |
|//// +-------+ |
|//// |Michael| |
|//// | Andy| |
|//// | Justin| |
|//// +-------+ |
| |
|df.select(df("name"), df("age") + 1).show() // 将 "age" 加 1 |
|//// +-------+---------+ |
|//// | name|(age + 1)| |
|//// +-------+---------+ |
|//// |Michael| null| |
|//// | Andy| 31| |
|//// | Justin| 20| |
|//// +-------+---------+ |
| |
|df.filter(df("age") > 21).show() # 条件语句 |
|//// +---+----+ |
|//// |age|name| |
|//// +---+----+ |
|//// | 30|Andy| |
|//// +---+----+ |
| |
|df.groupBy("age").count().show() // groupBy 操作 |
|//// +----+-----+ |
|//// | age|count| |
|//// +----+-----+ |
|//// |null| 1| |
|//// | 19| 1| |
|//// | 30| 1| |
|//// +----+-----+ |
当然,我们也可以使用 SQL 语句来进行操作:
|df.registerTempTable("people") // 将 DataFrame 注册为临时表 people |
|val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age ................
................
In order to avoid copyright disputes, this page is only a partial summary.
To fulfill the demand for quickly locating and searching documents.
It is intelligent file search solution for home and business.