Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.
1 2
cd $SPARK_HOME ./bin/spark-shell
Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let’s make a new Dataset from the text of the README file in the Spark source directory:
1
val textFile = spark.read.textFile("file:///mysoft/spark-3.3.2-bin-hadoop3/README.md")
You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one. For more details, please read the API doc.
1
textFile.count() //Number of items in this Dataset
1
textFile.first() // First item in this Dataset
Now let’s transform this Dataset into a new one. We call filter to return a new Dataset with a subset of the items in the file.
1
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
We can chain together transformations and actions(转换和操作可以一起):
1
textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
更多Dataset操作
Dataset actions and transformations can be used for more complex computations. If we want to find the line with the most words:
1
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
This first maps a line to an integer value, creating a new Dataset. reduce is called on that Dataset to find the largest word count. The arguments to map and reduce are Scala function literals (closures)(Scala语言(闭包)), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:
1 2
import java.lang.Math textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
1
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
Here, we call flatMap to transform a Dataset of lines to a Dataset of words, and then combine groupByKey and count to compute the per-word counts in the file as a Dataset of (String, Long) pairs. To collect the word counts in our shell, we can call collect:
1
wordCounts.collect()
Caching(缓存)
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm(迭代算法) like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:
1 2
linesWithSpark.cache() linesWithSpark.count()
It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively(交互) by connecting bin/spark-shell to a cluster, as described in the RDD programming guide.
创建属于自己的应用
Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python (pip).
This example will use Maven to compile an application JAR, but any similar build system will work.
This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README.md. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession, we initialize a SparkSession as part of the program.
To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts(构件) are tagged with a Scala version.
publicclassSimpleApp { publicstaticvoidmain(String[] args) { //默认HDFS,所以加上file:/// StringlogFile="file:///mysoft/spark-3.3.2-bin-hadoop3//README.md"; // Should be some file on your system SparkSessionspark= SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read().textFile(logFile).cache();
longnumAs= logData.filter((FilterFunction<String>) s -> s.contains("a")).count(); longnumBs= logData.filter((FilterFunction<String>) s -> s.contains("b")).count();
#For Scala and Java, use run-example: ./bin/run-example SparkPi #For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py #For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R(执行不了)