Spark 通过一个方法(aggregate)理解的rdd
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的基本抽象和核心对象。它代表了一个具有容错性的,可以并行运算的元素集合。有两种途径创建RDD:将Driver Program中的集合类型转换成RDD,或者是引用外部存储系统中的数据集,例如文件系统、HDFS、HBase、数据库等。
在实际的应用中,可以认识到Spark是一个独立的计算引擎。先从外部加载数据,然后执行分布式的并行运算,最后将运算结果进行显示、保存、或发给其他应用。其中执行分布式的并行运算位于核心的位置,理解RDD也就很关键,这篇文章想通过一个例子来讲述RDD。
这篇文章假设你已经搭建好了一个多节点的Spark集群,并理解Spark Driver Progame和Spark集群的区别。
一个简单的问题
先考虑一个简单的问题:假设有一个数组[1, 2, 3, 4, 5, 6],要计算出两个值:数组元素个数、数组元素之和。假设将结果保存为一个元组,初始值为(0,0),那么只要像下面这样遍历求和就可以了。其逻辑如下:
循环次数 | 输入值 | 结果 |
---|---|---|
初始 | - | (0,0) |
第1次遍历 | (0,0), 1 | (1,1) |
第2次遍历 | (1,1), 2 | (3,2) |
第3次遍历 | (3,2), 3 | (6,3) |
第4次遍历 | (6,3), 4 | (10,4) |
第5次遍历 | (10,4), 5 | (15,5) |
第6次遍历 | (15,5), 6 | (21,6) |
每一次循环的输入值是:上一次循环的结果 和 本次循环到的数组元素。其运算的逻辑为:对元组的第1个元素进行结果累加,第2个元素进行+1。
如果这个数组有1000万个元素,这样算可能很慢,我们可能想到的一个办法是什么?将数组分成两个,[1,2,3] 和 [4,5,6] 然后分别进行运算,再将结果累加起来。和上面这种方式不同的是,多了一个步骤:汇总两个拆分后数组的运算结果。
循环次数 | 输入值 | 结果 |
---|---|---|
初始 | - | (0,0) |
第1次遍历 | (0,0), 1 | (1,1) |
第2次遍历 | (1,1), 2 | (3,2) |
第3次遍历 | (3,2), 3 | (6,3) |
循环次数 | 输入值 | 结果 |
---|---|---|
初始 | - | (0,0) |
第1次遍历 | (0,0), 4 | (4,1) |
第2次遍历 | (4,1), 5 | (9,2) |
第3次遍历 | (9,2), 6 | (15,3) |
再对两个数组进行分别运算后,为了得到最终结果,还要进行一次合并结果的操作:
循环次数 | 输入值 | 结果 |
---|---|---|
初始 | - | (0,0) |
第1次遍历 | (0,0), (6,3) | (6,3) |
第2次遍历 | (6,3), (15,3) | (21,6) |
显然,这是一个CPU密集型(CPU-Bound)运算,对于多核CPU而言,比如说4核CPU,将数组分为4个,然后在本机进行运算是完全可以的。
它依然存在两个问题:
- 没有统一的编程模型,我们需要手动地去拆分数据,分别运算,再汇总结果。
- 当运算量超出单台计算机的能力,需要将数据和运算分布到多台计算机上时,则异常复杂。
此时,就是Spark出来救场的时候了。可以看到,从原理上来讲,Spark所解决的事情很简单;从实现过程上来讲,则非常复杂。
RDD的aggregate方法
上面计算数据元素个数和之和的例子,实际上来自Spark/Rdd/aggregate方法的官方文档。我提前将它引出,只是为了说明aggregate方法解决一个什么问题。
aggregate接受3个参数:
- 第1个参数是一个元组,保存元素个数和元素之和的初始值,显然,为(0,0);
- 第2个参数是一个函数(或Lambda表达式),表示对每个拆分后的数组执行的操作;
- 第3个参数也是一个函数(或Lamda表达式),表示对拆分后数组的运算结果执行的操作。
这里先看第2个参数的写法:
def seqOp(t, x): print "seqOp: ", [t, x], [t[0] + x, t[1] + 1] return (t[0] + x, t[1] + 1)
其中t表示每次上一次循环的结果,由初始元组(0,0)演化而来,x表示初识数组的值。官方文档这里一个最大的问题是将变量名命名成了x,y,像下面这样:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
这样很容易造成一个误会,就是以为两个参数x, y,分别代表了元组的第一个值和第二个值,因为x,y成对出现时,一般都以为它们存在关联。
再看第3个参数的写法:
def comOp(x, y): print "comOp: ", [x, y], [x[0] + y[0], x[1]+y[1]] return (x[0] + y[0], x[1] + y[1])
这里的x和y均为元组,x为之前所有拆分数组的运算结果累加,y为当前正在处理的数组运算结果。显然,只需要将它们分别累加就可以了:return (x[0] + y[0], x[1] + y[1])。
Spark的集群模式和部署模式
Spark有好几种集群模式,包括 local、standalone、yarn、mesos和Kubernetes;有两种部署模式,client和cluster。上面代码的运行方式,在不同的部署模式上会有所不同。为了更好地了解这个例子,我们先采用 local[1]/client来运行这段代码:
local[1]/client
当使用local[1]时,表示只使用1个CPU核心,这种方式在生产环境并不常用。它在本机运行,并只使用一个CPU核心,因此实际上没有并行运算。
from pyspark import SparkContext, SparkConf # $SPARK_HOME/bin/spark-submit /data/pyjobs/test/aggregate.py conf = SparkConf().setAppName("test_aggregate").setMaster("local[1]") sc = SparkContext(conf=conf) def seqOp(x, y): print "seqOp: ", [x, y], [x[0] + y, x[1] + 1] return x[0] + y, x[1] + 1 def comOp(x, y): print "comOp: ", [x, y], [x[0] + y[0], x[1]+y[1]] return x[0] + y[0], x[1] + y[1] c = sc.parallelize([1, 2, 3, 4, 5, 6]).aggregate((0, 0), seqOp, comOp) print c
输出如下,可以看到这个结果和我们第一节中表格中的例子完全相同,并且注意到只执行了一次comOp:
seqOp: [(0, 0), 1] [1, 1] seqOp: [(1, 1), 2] [3, 2] seqOp: [(3, 2), 3] [6, 3] seqOp: [(6, 3), 4] [10, 4] seqOp: [(10, 4), 5] [15, 5] seqOp: [(15, 5), 6] [21, 6] comOp: [(0, 0), (21, 6)] [21, 6] (21, 6)
local[2]/client
代码只需要修改一处,意思是采用2个CPU核心:
conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")
运算的结果如下:
seqOp: [(0, 0), 1] [1, 1] seqOp: [(1, 1), 2] [3, 2] seqOp: [(3, 2), 3] [6, 3] seqOp: [(0, 0), 4] [4, 1] seqOp: [(4, 1), 5] [9, 2] seqOp: [(9, 2), 6] [15, 3] comOp: [(0, 0), (6, 3)] [6, 3] comOp: [(6, 3), (15, 3)] [21, 6] (21, 6)
可以看到这段代码的seqOp分为了两部分,一部分从[(0, 0), 1]开始,一部分从[(0, 0), 4]开始。同时,执行了两次comOp,因为数组被分为了两个。同时,可以看到这个输出和第一节下面的表格完全一致。
类似地,可以改为local[3],结果如下:
seqOp: [(0, 0), 1] [1, 1] seqOp: [(1, 1), 2] [3, 2] seqOp: [(0, 0), 3] [3, 1] seqOp: [(3, 1), 4] [7, 2] seqOp: [(0, 0), 5] [5, 1] seqOp: [(5, 1), 6] [11, 2] combOp: [(0, 0), (3, 2)] [3, 2] combOp: [(3, 2), (7, 2)] [10, 4] combOp: [(10, 4), (11, 2)] [21, 6] (21, 6)
可见,借助于Spark,我们可以轻松地改变并行运算的数目。
本来这个例子到此就结束了,但还可以延伸一下,看下将部署模式改为cluster,以及提交到集群会有什么区别。
local[2]/cluster
当部署模式改为cluster时,配置如下:
conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")\ .set("spark.submit.deployMode", "cluster")
运算结果如下:
seqOp: [seqOp: ( 0, 0), 4][( [4, 1] seqOp: 0[(4, , 01)), , 15]] [9 , [21], 1]seqOp: [(9, seqOp: 2)[, (16, ]1 )[, 215], 3 ][ 3, 2] seqOp: [(3, 2), 3] [6, 3] comOp: [(0, 0), (6, 3)] [6, 3] comOp: [(6, 3), (15, 3)] [21, 6] (21, 6)
可以看到上面输出的次序变乱,因为seqOp函数并行执行,同时在控制台进行输出。在实际应用中,由于local集群模式几乎只用于开发和测试,因此,部署模式选用client就好了。
standalone/client
当集群模式为standalone时,配置如下:
conf = SparkConf().setAppName("test_aggregate").setMaster("spark://hadoop01:7077")\ .set("spark.submit.deployMode", "client")
当前,对于Python应用,Standalone集群模式不支持Cluster部署。
此时,再次运行,因为seqOp是在不同的服务器上运行的,当执行print函数时,是输出到当前运行这段代码的主机上。因此,运行在Driver Programe的机器上,即执行spark-submit提交python代码的机器上,没有任何的显示。输出如下所示:
comOp: [(0, 0), (6, 3)] [6, 3] comOp: [(6, 3), (15, 3)] [21, 6] (21, 6)
至此,我们已经完成了这个例子。可以看到,Spark是一个分布式的运算引擎,通过它,我们可以将自各种数据源的数据,在不同的机器上进行并行运算,并获得最终结果。总体上,是一种分而治之的思想,和MapReduce的解决思路是类似地。只不过,Spark的执行效能要高出MapReduce很多(官方参考10~100倍)。
感谢阅读,希望这篇文章能给你带来帮助!