spark原理解析

1. 什么是Spark

Spark是一个基于内存的集群计算系统,是一个分布式的计算框架。Spark可以将计算任务分发到多个机器并行计算。目前Spark集成了SQL查询,图处理,机器学习,流处理等,在计算引擎中生态比较健全,所以其适用范围比较广。Spark主要解决计算的并行化,集群资源的管理与分配,容错与恢复,任务的分发与回收管理等问题。

目前我们对Spark的主要使用场景是基于SQL对大量离线数据进行清洗、转换和聚合,例如仅使用几个G或几十G的内存资源对OSS云存储中的几个数十G数百G大小的数据表做JOIN聚合处理后输出报表或者输出新的数据表给下游应用消费使用,所以这里着重讲SparkSQL相关知识。

如上图所示,在K8S集群下Spark运行架构存在以下几个角色:任务控制节点(Driver Node)、运行作业任务的工作节点(Executor Node)和每个工作节点上负责具体任务的执行进程(Executor)。一段用户提交的执行代码,会经过Spark分析拆解成最小维度的Task后,由Executor执行,如下图所示。

2. Spark的调度执行

2.1 介绍

现在K8S集群中拉起Spark常用方式有spark-operator和spark-submit两种方式:

  1. spark-submit是Spark自带的命令行工具,需要手动指定Driver和Executor的资源配置(如CPU、内存)和K8S集群的相关配置(如命名空间、服务账号),为开发者提供了较低级别的控制;
  2. spark-operator是一个开源的K8S组件,通过CRD的配置文件来管理Spark应用,相对于spark-submit使得配置更加结构化和易于管理,让开发者可以通过声明式的方式来定义和管理Spark应用,就像管理其他Kubernetes资源一样;

spark-submit样例

1
2
3
4
5
6
7
8
./bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:3.5.0 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar

spark-operator样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "spark:3.5.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
sparkVersion: "3.5.0"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-operator-spark
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.5.0

2.2 流程

当用户提交一个SparkApplication的yaml文件到K8S后,依次执行了以下的流程:

  1. K8S接受并创建了SparkApplication的CRD资源

  2. spark-operator插件监听到SparkApplication的CRD资源创建事件后,解析yaml中的配置,组装成spark-submit命令并执行

    1. spark-submit命令会在spark-operator的pod中拉起一个spark进程
    2. 该spark进程请求K8S创建SparkDriver的Service、Secret、Volume等资源,最后创建SparkDriver的Pod实例
    3. SparkDriver资源创建完成后,该spark进程退出
  3. K8S接收到SparkDriver的Pod创建请求后,拉起SparkDriver的Pod实例,Pod中启动spark进程

    1. SparkDriver的spark进程请求K8S创建SparkExecutor的Secret、Volume等资源,然后创建多个SparkExecutor的Pod实例
    2. 等待所有SparkExecutor启动完成
  4. K8S接收到SparkExecutor的Pod创建请求后,拉起SparkExecutor的Pod实例,Pod中启动spark进程

    1. SparkExecutor的spark进程解析启动参数获取到SparkDriver的Service地址
    2. SparkExecutor的spark进程通过该Service地址主动连到SparkDriver完成注册
  5. SparkDriver等待所有SparkExecutor注册完成后,该Spark集群拉起完成

3. Spark的任务执行

Spark的核心概念之一是弹性分布式数据集(Resilient Distributed Datasets, RDD),这是一种容错的数据结构,允许用户在集群上并行执行操作。RDD可以包含任何类型的数据,并且提供了丰富的API来转换这些数据。

Spark支持多种编程语言如Scala、Java、Python和R操作RDD,下面我举例使用Scala创建和操作RDD:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val lines1 = sc.textFile(inputPath1)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})
.map((k1, k2) => (k1, "pre_" + k2))

val lines2 = sc.textFile(inputPath2)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})

val lines3 = sc.textFile(inputPath3)

val unionRdd = lines2.union(lines3)

val joinRdd = lines1.join(unionRdd)

joinRdd.saveAsTextFile(···)

Spark会将用户编写的代码进行解析,依次拆解成dag、stage、task后,最终拆解成最小颗粒度task后,发送给Spark集群的Execu执行。

3.1. dag

在Spark中,DAG(Directed Acyclic Graph,有向无环图)是一种用来表示数据处理流程的模型,帮助Spark有效地组织和优化作业中的各个阶段。

刚刚的Scala样例可以映射成如下的dag图,其中每个顶点表示RDD,而每个边表示数据的流动方向和数据操作。

3.2. stage

3.2.1. shuffle

要理解stage首先要理解什么是shuffle,shuffle是一种数据重组机制。通过以下一个例子来简单理解什么是shuffle以及为什么需要shuffle。

在大数据场景下,数据表的数据量比较大,因此在物理存储上一般是以文件分片形式保存的,例如HDFS中,一个表的数据会默认以128MB为单位拆分成若干的文件存放。当Spark加载一张数据表时,一张数据表就是一个RDD对象,而底层的每一个文件分片就是一个RDD分区(但实际上如果文件分片数量太多,会出现多个文件分片合并成一个RDD分区)。

如下图所示,两张数据表加载到Spark后,每个文件分片对应一个RDD分区。当用户需要对该两张表做join操作时,如果直接将两张表的所有RDD分区数据全部加载到内存可能会因内存不够而执行失败,因此可以将两张表的RDD分区数据以关联键取模后重新分区。这样就可以保证每一条记录与其关联的对象都位于同一个RDD分区内,将一次大数据量join操作拆分成若干个串行执行的小数据量join操作。

而这种将一组任务的输出数据传递给另一组任务的输入数据,并且这些数据需要以某种规则做数据重组分发的过程,就是shuffle。Spark通过shuffle将大数据集拆分执行,提高了并行能力和整体效率。在Spark中只有少数算子才可能需要通过shuffle操作对数据进行重新分布,提高这些算子的执行效率,比如:groupByKey、reduceByKey、sortByKey、join、distinct等。

3.2.2. stage拆解

首先要理解什么是Stage,在Spark中Stage是其物理执行计划的一个执行单元,是一组相关的、没有shuffle依赖关系的任务集合,每个dag可以拆分成若干个Stage。可能还是难以理解,因此从下面的图例中来理解。

如上图,将之前的DAG视角下的RDD有向无环图转成RDD分区的有向无环图,由于join是个需要shuffle操作的算子,因此在join算子所在的环节,代表数据流向的箭头格外复杂和突出。从DAG末端开始逆向遍历整个有向无环图:

  1. 如果遇到非shuffle操作的算子,则继续向上遍历,直到遇到shuffle操作的算子位置,这一段路径的所有RDD打包成一个Stage;
  2. 如果遇到shuffle操作的算子,则作为上一个Stage的边界,并产生一个新的Stage向上遍历

最终效果如下图,整个DAG基于shuffle的算子被划分成3个Stage。至于Spark为什么需要设计并划分Stage,了解Spark的Task概念后就能理解。

3.3. task

Spark的Stage是基于shuffle的算子进行划分的,因此每个Stage中不存在任何shuffle操作,如下图所示,Stage中的每个RDD分区都是串行执行并相互独立的,Spark将Stage中每个串行执行的数据链路叫做Task。

在Spark中Task是执行的最基本的单位,通过将DAG划分成多个Stage,整个计算流程可以很有效地拆解成多个具有上下游依赖关系的任务集,其中每个Stage就是一个任务集。在Stage中的所有Task可以独立并行执行,这极大提高了计算流程的执行效率。

4. SparkSQL执行

SparkSQL是Spark的一个模块,如下图所示,它提供了SQL语法支持,可以让用户通过SQL语句来查询和操作大数据集,降低用户的使用门槛。并且还实现了Catalyst优化器,让SparkSQL能够自动对用户输入的SQL进行性能优化提高执行效率。

其主要的任务执行流程如下图,一段SQL提交到SparkSQL后,经过其最核心的组件Catalyst处理后转化为底层的RDD操作,最终提交给SparkCore执行。其中Catalyst的主要逻辑有:

  1. SQL解析成AST语法树
  2. AST转换成逻辑计划,并对逻辑计划进行优化
  3. 基于逻辑计划生成多个物理计划,基于计算成本决策最优的物理计划
  4. 物理计划转换成底层RDD操作

4.1. AST解析

SQL在Catalyst中,先通过词法分析解析成一系列tokens,然后在根据SQL的方言语法构建出一个AST树,如下范例:

1
2
3
4
5
SELECT SUM(v) FROM (
SELECT score.id, 100 + 80 + score.math_score + score.english_score AS v
FROM people JOIN score
WHERE people.id=score.id AND people.age >10
) tmp

4.2. 逻辑执行计划

4.2.1. spark优化

SparkSQL在逻辑执行计划步骤中,会将上一步的AST语法树转换成逻辑执行计划,然后对该逻辑执行计划尝试执行一系列的优化规则。如下样例所示,将一个原始AST语法树转换成逻辑执行计划后,再应用优化规则,最终生成一份优化后的逻辑执行计划。

常见的规则有:

  • 常量折叠:将查询中的常量表达式计算并替换为其结果,例如SELECT 10 + 20 AS total转换成SELECT 30 AS total
  • 列裁剪:查询中只提取后续需要的列,减少数据传输量,降低 I/O 成本
  • 谓词下推:尽可能将过滤条件推向数据源,以减少处理的数据量
  • 子查询扁平化:将嵌套的子查询转换为 JOIN 操作,从而提升执行效率
  • 连接重排序:根据表的大小和连接条件的选择性调整连接顺序,以减少中间结果集大小,例如A表joinB表,优先使用小表作为join的驱动表

4.2.2. 列式存储

在Spark中默认使用的存储格式为Parquet,用于支持对复杂数据的快速数据处理。与CSV等基于行的格式不同,Parquet是面向列的如下图。

Parquet列式存储对列裁剪和谓词下推两种优化有着天然的支持:

  • 列裁剪:每个Parquet文件中存在元数据,存储了所有列的信息如位置、编码方式等,这部分信息使得读取指定列能够直接跳转到该列的位置。例如只读取上图中的ID和age字段的值,仅需要元数据就可以计算出需要读取的文件偏移量范围,从而避免读取整个文件的数据减少了IO消耗
  • 谓词下推:Parquet中还设计了页的概念,每个列的所有记录存储在一个或多个页中。并且每个页还存储了当前页数据的统计信息,包括该页中的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该页是否需要扫描,进一步降低IO消耗

4.3. 物理执行计划

根据上面的步骤,逻辑执行计划已经得到了比较完善的优化,然而,逻辑执行计划依然没办法真正执行,他们只是逻辑上可行,实际上Spark并不知道如何去执行这个东西。比如join是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现合并,逻辑执行计划并没有说明。

此时就需要将逻辑执行计划转化为物理执行计划,也就是将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoinShuffleHashJoin以及SortMergejoin等,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现。

  • BroadcastHashJoin:当一个表足够小可以放入所有工作节点的内存时,会选择Broadcast Join,避免了Shuffle过程,提高效率。
  • SortMergejoin:适用于两个大表且已经排序的情况,通过合并排序后的数据来进行Join,适合于数据量大但Join键上有良好分布的情况。
  • ShuffleHashJoin:通用的Join方式,尤其适用于数据量较大且无法广播的情况,通过哈希表在Shuffle阶段匹配Join键。

比如当前SQL下,对优化后的逻辑计划进行转换后,生成了多个可以执行的物理计划,接着CBO(基于代价优化)优化策略会根据Cost Model算出每个物理计划的代价,并选取代价最小的物理计划用于真正执行。

4.4. RDD转换

SparkSQL按上述步骤解析出物理计划后,不会直接执行,而是将物理计划转化为DAG,目的是为了进一步优化执行流程,例如通过DAG结构解析任务的依赖关系,尽可能降低节点之间数据的传输,并尽可能对无依赖的任务做并行执行,提高执行效率。然后SparkSQL将最终优化过的DAG提交给SparkCore执行,由SparkCore负责将DAG再拆解成Task分配给集群节点执行。

5. 其他竞品

5.1. Hadoop

5.1.1. 介绍

Hadoop是一个能够对大量数据进行分布式处理的软件框架。它允许使用简单的编程模型,在计算机集群上并行处理大规模的数据集。其核心组件有

  • HDFS:一个分布式文件系统,为大规模数据集提供了一个高容错性的存储解决方案
  • MapReduce:一种用于处理大规模数据集的编程模型,通过将任务分解成多个子任务来实现并行计算

Hadoop项目起源于2005年左右,由Apache软件基金会开发。它受到了Google发布的MapReduce论文的影响而诞生。后来随着数据量的增长及数据分析需求的变化,人们开始寻求比MapReduce更高效的数据处理框架。特别是在需要快速迭代或交互式查询的情况下,MapReduce显得不够灵活且效率较低。在2010年前后,加州大学伯克利分校的AMPLab推出了Spark项目,并很快得到了广泛的关注和发展。

5.1.2. MapReduce

MapReduce是一种编程模型,用于处理和生成大数据集。MapReduce的核心思想是将复杂的、运行于大规模集群上的并行计算过程高度抽象到两个函数:Map和Reduce。

  1. 输入分割:首先,MapReduce框架会将输入数据切分成多个小的数据块,这些数据块可以被独立地处理。每个数据块通常称为一个分片,并且大小是可以配置的。
  2. Map阶段:
    1. 每个分片都会被分配给一个Mapper任务
    2. Mapper读取输入数据,并根据用户定义的map()函数执行处理。这个过程主要是对原始数据进行过滤、转换等操作,并由用户指定每条数据的key
    3. 处理后的结果会被组织成键值对的形式输出
  3. Shuffle & Sort阶段:
    1. 在所有Mapper完成其工作之后,接下来是一个中间步骤,叫做Shuffle和Sort
    2. Shuffle负责收集来自不同mapper的所有相同key的value列表,并将它们发送给同一个reducer
    3. 如果用户配置了Sort,则确保了对于每个key来说,其对应的values都是有序排列的
  4. Reduce阶段:
    1. Reducer接收一组具有相同key的键值对作为输入
    2. 用户定义的reduce()函数会对这组值进行汇总或聚合等操作
    3. 最终,reducer会产生新的键值对作为最终输出
  5. 输出:最后,由Reducer产生的结果会被写入到分布式文件系统中或其他存储介质里

5.1.3. 差异比较

  • 架构设计:

    • Hadoop基于MapReduce模型,这是一种批处理模型,它将计算任务分解为Map和Reduce两个阶段来执行,并且这两个阶段之间需要通过磁盘进行数据交换,对于需要多次计算的任务来说效率较低
    • Spark则采用了更先进的DAG调度器和内存管理机制,让任务能够在内存中完成大部分计算过程,极大地提高了处理速度,但对内存要求较高
  • 易用性:

    • Hadoop主要使用Java编写,编程接口相对较为底层,开发者需要手动编写大量的代码来实现逻辑,对于不熟悉Java语言的使用者来说不够友好
    • Spark提供了丰富的语言支持(如Scala、Java、Python、SQL等),此外SparkSQL提供了一个类似于SQL的标准接口,方便了非程序员背景的分析师使用
  • 容错机制:

    • Hadoop MapReduce中,如果某个节点失败,则整个任务会被重新执行
    • Spark则基于Stage和Task维度拆分任务,实现细粒度的数据恢复。当发生故障时,只有丢失的部分才会被重新计算,而不是整个作业

虽然Spark在很多方面都优于Hadoop,但它并没有完全取代Hadoop:

  1. 一方面,Hadoop拥有更加成熟完善的生态系统,在某些特定领域或应用场景下,Hadoop提供的解决方案可能更为成熟稳定
  2. 另一方面,使用Spark进行快速数据处理时,由于其主要依赖于内存来提高性能,因此对于硬件配置有较高要求,在一些对实时性要求不高或者预算有限的情况下,基于磁盘的Hadoop可能是更具成本效益的选择

5.2. odps

5.2.1. 介绍

odps(Open Data Processing Service,又称 MaxCompute)和Spark都是大规模数据处理的工具,于2009年飞天项目正式启动,于2014年正式对外提供服务。odps作为阿里云的核心大数据处理平台,设计之初便是为了解决海量数据的存储、查询和计算需求,优化点深入到存储、计算、调度等多个层面。

5.2.2. odps的优化

相对于Spark,odps做了非常多的特定优化,在某些场景下(尤其是批处理和海量数据处理场景)显著快于Spark。TPCx-BB是一个大数据基准测试,衡量大数据系统在处理大规模数据集时的性能表现,odps在该项测试中成绩突出:

  • 在TPCx-BB 100TB标准测试中,连续6次获得全球冠军,保持性能和性价比第一
  • 连年获得TPCx-BB 30TB规模和TPC-DS 10TB规模,性能与性价比双料冠军
5.2.2.1. 存储优化

Spark是一个通用的分布式计算框架,通常依赖于外部的分布式存储系统,如HDFS或OSS,难以做针对性的优化。odps则在存储层专门设计了分布式存储系统,使得计算和数据的传输、读取都在同一个平台内进行。这种设计减少了数据在网络中传输的次数,降低了IO开销。另外对列式存储进行了深度优化,使得查询和计算性能更高效,尤其在处理超大规模数据时优势明显。

5.2.2.2. shuffle优化

Spark和odps在计算过程中都需要依赖shuffle做数据重新分布,但shuffle操作的成本很高,往往是整个计算中的性能瓶颈。对此odps相对于Spark额外做了很多针对性的优化:

  • 通过定制化的网络协议和高效的数据压缩/解压缩机制,在shuffle阶段减少网络传输和IO成本
  • 根据任务的特性动态调整shuffle策略,减少数据移动,或者在某些情况下完全跳过shuffle操作
5.2.2.3. sql优化

Spark的SQL引擎Catalyst是一个功能强大的优化器,但在面对极其庞大的数据集或复杂的查询条件下,确实存在一些问题,例如优化规则不足导致很多复杂SQL中无法得出最优解,对任务并行度不能自动调整导致集群资源难以高效利用等。

odps则内置了一个高度优化的SQL查询引擎,专门针对大规模数据处理进行了多项优化,使得SQL查询能够高效运行,并且可以基于每个任务的历史执行情况自动调整优化方案找到最优解。

5.2.2.4. 物理执行计划优化

Spark的执行计划依赖于Catalyst优化器,但它的优化更多是逻辑层面的,对于底层执行的物理优化相对较少,因此在物理执行计划方面优化难以得出最优解。

而odps则在这里做了许多针对性优化,能够在提交任务时自动进行代码优化,将用户提交的任务代码转换为更高效的执行计划:合并任务、移除不必要的中间步骤,并采用专有的分布式执行计划生成机制,从而减少任务的执行时间。

>