张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 店铺 关于

通过一个方法(aggregate)理解spark的rdd

2018-9-14 作者: 张子阳 分类: 大数据处理

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的基本抽象和核心对象。它代表了一个具有容错性的,可以并行运算的元素集合。有两种途径创建RDD:将Driver Program中的集合类型转换成RDD,或者是引用外部存储系统中的数据集,例如文件系统、HDFS、HBase、数据库等。

在实际的应用中,可以认识到Spark是一个独立的计算引擎。先从外部加载数据,然后执行分布式的并行运算,最后将运算结果进行显示、保存、或发给其他应用。其中执行分布式的并行运算位于核心的位置,理解RDD也就很关键,这篇文章想通过一个例子来讲述RDD。

这篇文章假设你已经搭建好了一个多节点的Spark集群,并理解Spark Driver Progame和Spark集群的区别。

一个简单的问题

先考虑一个简单的问题:假设有一个数组[1, 2, 3, 4, 5, 6],要计算出两个值:数组元素个数、数组元素之和。假设将结果保存为一个元组,初始值为(0,0),那么只要像下面这样遍历求和就可以了。其逻辑如下:

运算过程
循环次数 输入值 结果
初始 - (0,0)
第1次遍历 (0,0), 1 (1,1)
第2次遍历 (1,1), 2 (3,2)
第3次遍历 (3,2), 3 (6,3)
第4次遍历 (6,3), 4 (10,4)
第5次遍历 (10,4), 5 (15,5)
第6次遍历 (15,5), 6 (21,6)

每一次循环的输入值是:上一次循环的结果 和 本次循环到的数组元素。其运算的逻辑为:对元组的第1个元素进行结果累加,第2个元素进行+1。

如果这个数组有1000万个元素,这样算可能很慢,我们可能想到的一个办法是什么?将数组分成两个,[1,2,3] 和 [4,5,6] 然后分别进行运算,再将结果累加起来。和上面这种方式不同的是,多了一个步骤:汇总两个拆分后数组的运算结果。

数组[1,2,3]的运算过程
循环次数 输入值 结果
初始 - (0,0)
第1次遍历 (0,0), 1 (1,1)
第2次遍历 (1,1), 2 (3,2)
第3次遍历 (3,2), 3 (6,3)
数组[4,5,6]的运算过程
循环次数 输入值 结果
初始 - (0,0)
第1次遍历 (0,0), 4 (4,1)
第2次遍历 (4,1), 5 (9,2)
第3次遍历 (9,2), 6 (15,3)

再对两个数组进行分别运算后,为了得到最终结果,还要进行一次合并结果的操作:

合并操作的运算过程
循环次数 输入值 结果
初始 - (0,0)
第1次遍历 (0,0), (6,3) (6,3)
第2次遍历 (6,3), (15,3) (21,6)

显然,这是一个CPU密集型(CPU-Bound)运算,对于多核CPU而言,比如说4核CPU,将数组分为4个,然后在本机进行运算是完全可以的。

它依然存在两个问题:

  1. 没有统一的编程模型,我们需要手动地去拆分数据,分别运算,再汇总结果。
  2. 当运算量超出单台计算机的能力,需要将数据和运算分布到多台计算机上时,则异常复杂。

此时,就是Spark出来救场的时候了。可以看到,从原理上来讲,Spark所解决的事情很简单;从实现过程上来讲,则非常复杂。

RDD的aggregate方法

上面计算数据元素个数和之和的例子,实际上来自Spark/Rdd/aggregate方法的官方文档。我提前将它引出,只是为了说明aggregate方法解决一个什么问题。

aggregate接受3个参数:

  1. 第1个参数是一个元组,保存元素个数和元素之和的初始值,显然,为(0,0);
  2. 第2个参数是一个函数(或Lambda表达式),表示对每个拆分后的数组执行的操作;
  3. 第3个参数也是一个函数(或Lamda表达式),表示对拆分后数组的运算结果执行的操作。

这里先看第2个参数的写法:

def seqOp(t, x): 
    print "seqOp: ", [t, x], [t[0] + x, t[1] + 1]
    return (t[0] + x, t[1] + 1)

其中t表示每次上一次循环的结果,由初始元组(0,0)演化而来,x表示初识数组的值。官方文档这里一个最大的问题是将变量名命名成了x,y,像下面这样:

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))

这样很容易造成一个误会,就是以为两个参数x, y,分别代表了元组的第一个值和第二个值,因为x,y成对出现时,一般都以为它们存在关联。

再看第3个参数的写法:

def comOp(x, y):
    print "comOp: ", [x, y],  [x[0] + y[0], x[1]+y[1]]
    return (x[0] + y[0], x[1] + y[1])

这里的x和y均为元组,x为之前所有拆分数组的运算结果累加,y为当前正在处理的数组运算结果。显然,只需要将它们分别累加就可以了:return (x[0] + y[0], x[1] + y[1])。

Spark的集群模式和部署模式

Spark有好几种集群模式,包括 local、standalone、yarn、mesos和Kubernetes;有两种部署模式,client和cluster。上面代码的运行方式,在不同的部署模式上会有所不同。为了更好地了解这个例子,我们先采用 local[1]/client来运行这段代码:

local[1]/client

当使用local[1]时,表示只使用1个CPU核心,这种方式在生产环境并不常用。它在本机运行,并只使用一个CPU核心,因此实际上没有并行运算。

from pyspark import SparkContext, SparkConf

# $SPARK_HOME/bin/spark-submit /data/pyjobs/test/aggregate.py

conf = SparkConf().setAppName("test_aggregate").setMaster("local[1]")
sc = SparkContext(conf=conf)

def seqOp(x, y):
    print "seqOp: ", [x, y], [x[0] + y, x[1] + 1]
    return x[0] + y, x[1] + 1

def comOp(x, y):
    print "comOp: ", [x, y],  [x[0] + y[0], x[1]+y[1]]
    return x[0] + y[0], x[1] + y[1]

c = sc.parallelize([1, 2, 3, 4, 5, 6]).aggregate((0, 0), seqOp, comOp)
print c

输出如下,可以看到这个结果和我们第一节中表格中的例子完全相同,并且注意到只执行了一次comOp:

seqOp:  [(0, 0), 1] [1, 1]
seqOp:  [(1, 1), 2] [3, 2]
seqOp:  [(3, 2), 3] [6, 3]
seqOp:  [(6, 3), 4] [10, 4]
seqOp:  [(10, 4), 5] [15, 5]
seqOp:  [(15, 5), 6] [21, 6]
comOp:  [(0, 0), (21, 6)] [21, 6]
(21, 6)

local[2]/client

代码只需要修改一处,意思是采用2个CPU核心:

conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")

运算的结果如下:

seqOp:  [(0, 0), 1] [1, 1]
seqOp:  [(1, 1), 2] [3, 2]
seqOp:  [(3, 2), 3] [6, 3]
seqOp:  [(0, 0), 4] [4, 1]
seqOp:  [(4, 1), 5] [9, 2]
seqOp:  [(9, 2), 6] [15, 3]
comOp:  [(0, 0), (6, 3)] [6, 3]
comOp:  [(6, 3), (15, 3)] [21, 6]
(21, 6)

可以看到这段代码的seqOp分为了两部分,一部分从[(0, 0), 1]开始,一部分从[(0, 0), 4]开始。同时,执行了两次comOp,因为数组被分为了两个。同时,可以看到这个输出和第一节下面的表格完全一致。

类似地,可以改为local[3],结果如下:

seqOp:  [(0, 0), 1] [1, 1]
seqOp:  [(1, 1), 2] [3, 2]
seqOp:  [(0, 0), 3] [3, 1]
seqOp:  [(3, 1), 4] [7, 2]
seqOp:  [(0, 0), 5] [5, 1]
seqOp:  [(5, 1), 6] [11, 2]
combOp:  [(0, 0), (3, 2)] [3, 2]
combOp:  [(3, 2), (7, 2)] [10, 4]
combOp:  [(10, 4), (11, 2)] [21, 6]
(21, 6)

可见,借助于Spark,我们可以轻松地改变并行运算的数目。

本来这个例子到此就结束了,但还可以延伸一下,看下将部署模式改为cluster,以及提交到集群会有什么区别。

local[2]/cluster

当部署模式改为cluster时,配置如下:

conf = SparkConf().setAppName("test_aggregate").setMaster("local[2]")\
    .set("spark.submit.deployMode", "cluster")

运算结果如下:

seqOp: [seqOp: ( 0, 0), 4][( [4, 1] seqOp: 0[(4, , 01)), , 15]] [9 , [21], 1]seqOp: [(9, seqOp: 2)[, (16, ]1 )[, 215], 3 ][ 3, 2] seqOp: [(3, 2), 3] [6, 3] comOp: [(0, 0), (6, 3)] [6, 3] comOp: [(6, 3), (15, 3)] [21, 6] (21, 6)

可以看到上面输出的次序变乱,因为seqOp函数并行执行,同时在控制台进行输出。在实际应用中,由于local集群模式几乎只用于开发和测试,因此,部署模式选用client就好了。

standalone/client

当集群模式为standalone时,配置如下:

conf = SparkConf().setAppName("test_aggregate").setMaster("spark://hadoop01:7077")\
    .set("spark.submit.deployMode", "client")
当前,对于Python应用,Standalone集群模式不支持Cluster部署。

此时,再次运行,因为seqOp是在不同的服务器上运行的,当执行print函数时,是输出到当前运行这段代码的主机上。因此,运行在Driver Programe的机器上,即执行spark-submit提交python代码的机器上,没有任何的显示。输出如下所示:

comOp: [(0, 0), (6, 3)] [6, 3] comOp: [(6, 3), (15, 3)] [21, 6] (21, 6)

至此,我们已经完成了这个例子。可以看到,Spark是一个分布式的运算引擎,通过它,我们可以将自各种数据源的数据,在不同的机器上进行并行运算,并获得最终结果。总体上,是一种分而治之的思想,和MapReduce的解决思路是类似地。只不过,Spark的执行效能要高出MapReduce很多(官方参考10~100倍)。

感谢阅读,希望这篇文章能给你带来帮助!