环境准备

Spark: 3.3.2

安装Spark

Spark官网下载好安装包
Spark官网

1
tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /mysoft/

配置环境变量

vi /etc/profile
1
2
3
#Spark enviroment variables
export SPARK_HOME=/mysoft/spark-3.3.2-bin-hadoop3/
export PATH=$PATH:$SPARK_HOME/bin
使环境变量生效
1
source /etc/profile

Standalone集群模式

1
2
cd $SPARK_HOME
cd conf/

配置spark-env.sh

1
cp spark-env.sh.template spark-env.sh
vi spark-env.sh
1
2
3
4
5
6
export JAVA_HOME=/usr/local/jdk1.8.0_341
export HADOOP_CONF_DIR=/usr/local/hadoop-3.3.4/etc/hadoop
export SPARK_MASTER_HOST=hsq01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WOEKER_INSTANCES=1

虚拟机内存默认2G+,若比较小,需要把worker的内存和executor内存降低成900M或者更低

1
2
3
export SPARK_EXECUTOR_MEMORY=512M
export SPARK_WORKER_MEMORY=512M
export SPARK_DRIVER_MEMORY=512M

配置workers

1
cp workers.template workers
vi workers
1
2
3
hsq01
hsq02
hsq03

把localhost删掉

将Spark目录及Spark环境分发到其他节点

1
2
for i in {2..3}; do scp -r /mysoft/spark-3.3.2-bin-hadoop3/ hsq0$i:/mysoft/;done
for i in {2..3}; do scp /etc/profile hsq0$i:/etc/profile;done

启动Spark集群(standalone模式)

1
2
cd $SPARK_HOME
sbin/start-all.sh

使用 hsq01:8081 在 web 界面查看

web查看

启用HA功能

修改`spark-env.sh`

export SPARK_MASTER_HOST=hsq01export SPARK_MASTER_PORT=7077前面加#注释掉

添加如下内容

vi spark-env.sh
1
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsq01:2181,hsq02:2181,hsq03:2181 -Dspark.deploy.zookeeper.dir=/spark"

分发到其他节点
1
for i in {2..3}; do scp /mysoft/spark-3.3.2-bin-hadoop3/conf/spark-env.sh hsq0$i:/mysoft/spark-3.3.2-bin-hadoop3/conf/spark-env.sh;done
重启spark集群
spark关了重新启动一下
1
2
sbin/stop-all.sh
sbin/start-all.sh

三个节点jps查看是否有QuorumPeerMain
否则重启Zookeeper

查看HA是否配置成功
仅对hsq01
1
2
3
4
zkCli.sh
ls /
ls /spark -s
quit

同时看到spark和zookeeper才算成功

设置备用主机
仅对hsq02
1
2
3
cd $SPARK_HOME
sbin/start-master.sh
tail -n 100 /mysoft/spark-3.3.2-bin-hadoop3//logs/spark-root-org.apache.spark.deploy.master.Master-1-hsq02.out

找到端口号
备用主机网页查看

宕机测试

测试hsq01的master进程宕机(此时hsq02是备用主机)

1
2
jps
kill -9 (master端口号)

不断刷新浏览器页面,此时hsq01的页面已经无法打开,hsq02的要多刷新几次(等一会儿)
hsq02web查看
也就是此时hsq02(备用主机)完全继承了原先主机hsq01的功能!

检测能否还原
对hsq01
1
2
cd $SPARK_HOME
sbin/start-master.sh

发现无论怎么刷新网页都是回不去的,也就是说此时原先的备用主机(hsq02 STANDBY)变成了实质上的主机,而原先的主机(hsq01 ALIVE)变成了实质上的备用主机,继承的功能并不会随原主机的恢复而还回去!这就是spark集群的特点。

例程——程蒙特卡洛计算圆周率

1
bin/spark-submit --master spark://hsq01:7077,hsq02:7077,hsq03:7077 --class org.apache.spark.examples.SparkPi  /mysoft/spark-3.3.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.3.2.jar 10000

运算圆周率
在网页上查看(谁是现在实质上的主机master就观察谁)
web查看作业

答:不是这样的。
master只是管理作业的和产生运行调度表的,实际运行是在各个worker中excutor进程完成的,不用管哪个master是alive(实质上的主机)了,系统会自动找到alive的master并提交任务的。
更进一步来讲,上面启用HA功能只是为了测试方便,所以只弄了两台hsq01,hsq02,本来这段代码中间应该是--master spark://hsq01:7077,hsq02:7077 但是真实情况是会连hsq03也要写进去的(其中一台是alive,其他全是standby),所有都写的好处在于不去指定,让系统自己找,因为作为客户端大多数时候是没有权限去查看spark的状态的。

Quick Start

使用Spark-Shell进行交互式分析

基本使用

Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.

1
2
cd $SPARK_HOME
./bin/spark-shell

spark-shell

Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let’s make a new Dataset from the text of the README file in the Spark source directory:

1
val textFile = spark.read.textFile("file:///mysoft/spark-3.3.2-bin-hadoop3/README.md")

用文件创建数据集

You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one. For more details, please read the API doc.

1
textFile.count() //Number of items in this Dataset
1
textFile.first() // First item in this Dataset

Now let’s transform this Dataset into a new one. We call filter to return a new Dataset with a subset of the items in the file.

1
val linesWithSpark = textFile.filter(line => line.contains("Spark"))

We can chain together transformations and actions(转换和操作可以一起):

1
textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?

更多Dataset操作

Dataset actions and transformations can be used for more complex computations. If we want to find the line with the most words:

1
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

This first maps a line to an integer value, creating a new Dataset.
reduce is called on that Dataset to find the largest word count.
The arguments to map and reduce are Scala function literals (closures)(Scala语言(闭包)), and can use any language feature or Scala/Java library.
For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:

1
2
import java.lang.Math
textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:

1
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

Here, we call flatMap to transform a Dataset of lines to a Dataset of words, and then combine groupByKey and count to compute the per-word counts in the file as a Dataset of (String, Long) pairs.
To collect the word counts in our shell, we can call collect:

1
wordCounts.collect()

Caching(缓存)

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm(迭代算法) like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:

1
2
linesWithSpark.cache()
linesWithSpark.count()

It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively(交互) by connecting bin/spark-shell to a cluster, as described in the RDD programming guide.

创建属于自己的应用

Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python (pip).

This example will use Maven to compile an application JAR, but any similar build system will work.

This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README.md.
Unlike the earlier examples with the Spark shell, which initializes its own SparkSession, we initialize a SparkSession as part of the program.

To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts(构件) are tagged with a Scala version.

1
2
3
4
5
6
7
8
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.hsq;

import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

public class SimpleApp {
public static void main(String[] args) {
//默认HDFS,所以加上file:///
String logFile = "file:///mysoft/spark-3.3.2-bin-hadoop3//README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();

long numAs = logData.filter((FilterFunction<String>) s -> s.contains("a")).count();
long numBs = logData.filter((FilterFunction<String>) s -> s.contains("b")).count();

System.out.println("包含a的行数为: " + numAs + ", 包含b的行数为: " + numBs);

spark.stop();
}
}

Now, we can package the application using Maven and execute it with ./bin/spark-submit.

1
bin/spark-submit --master spark://hsq01:7077,hsq02:7077,hsq03:7077 --class "com.hsq.SimpleApp" testSpark-1.0-SNAPSHOT.jar

得到结果

多元的选择

最后,Spark在示例目录中包含几个示例(Scala,Java,Python,R)。可以按如下方式运行它们:

1
2
3
4
5
6
7
8
#For Scala and Java, use run-example:
./bin/run-example SparkPi

#For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

#For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R(执行不了)

./bin/run-example SparkPi
./bin/spark-submit examples/src/main/python/pi.py