Intro

本文大部分内容转载于Spark安装和编程实践(Spark2.4.0),另附上一点本人安装过程中的记录,以备参考。

安装 Spark2.4

首先需要下载Spark安装文件。推荐下载地址:清华源华科源,原文是下载Spark2.4.0,但目前已经更新到2.4.6了,所以就选择spark-2.4.6-bin-without-hadoop.tgz下载即可。

本教程的具体运行环境如下:

  • Hadoop 3.1.3
  • Java JDK 1.8
  • Spark 2.4.0

Hadoop和Java JDK的安装参照Hadoop3.1.3安装教程:单机&伪分布式配置

安装Spark(Local模式)

1
2
3
4
sudo tar -zxf ~/下载/spark-2.4.0-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名

安装后,还需要修改Spark的配置文件spark-env.sh:

1
2
cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

1
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

配置完成后就可以直接使用,不需要像Hadoop运行启动命令。
通过运行Spark自带的示例,验证Spark是否安装成功。执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):

1
2
cd /usr/local/spark
bin/run-example SparkPi 2>&1 | grep "Pi is"

过滤后的运行结果如下图示,这里我得到了π的15位小数近似值(这个程序每次运行结果是不一样的):

1
2
hadoop@cuper-Inspiron-7591:/usr/local/spark$ bin/run-example SparkPi 2>&1 | grep "Pi is"
Pi is roughly 3.137155685778429

使用 Spark Shell 编写代码

学习Spark程序开发,建议首先通过spark-shell交互式学习,加深Spark程序开发的理解。
Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。

启动Spark Shell

1
2
cd /usr/local/spark
bin/spark-shell

启动spark-shell后,会自动创建名为sc的SparkContext对象和名为spark的SparkSession对象,如图:

加载text文件

spark创建sc,可以加载本地文件和HDFS文件创建RDD。这里用Spark自带的本地文件README.md文件测试。

1
val textFile = sc.textFile("file:///usr/local/spark/README.md")

加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file:///)进行标识。

加载完成后:

1
2
scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24

简单RDD操作

1
2
3
4
5
6
7
8
9
10
11
scala> textFile.first()//获取RDD文件textFile的第一行内容
res0: String = # Apache Spark

scala> textFile.count()//获取RDD文件textFile所有项的计数
res1: Long = 104

scala> val lineWithSpark = textFile.filter(line => line.contains("Spark"))//抽取含有“Spark”的行,返回一个新的RDD
lineWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25

scala> lineWithSpark.count()//统计新的RDD的行数
res2: Long = 19

可以通过组合RDD操作进行组合,可以实现简易MapReduce操作:

1
2
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)//找出文本中每行的最多单词数
res3: Int = 16

退出Spark Shell

输入:quit(exit好像不行),即可退出spark shell:

1
:quit