我 们都知道,之前的 CBO,都是基于静态信息来对 执行计划进行优化,静态统计信息大家都懂的,不一定准确,比如hive中的catalog中记录的统计信息可以认为是不可信的,在一个不准确的统计信息的基础上优化出来的执行计划必然不是最优的。 AQE 就是为了解决这个问题而诞生的,随着spark 官方对AQE持续的优化,下面举一些用户使用场景来展示AQE是如何用的。
优化 Shuffles 过程
Spark shuffles 可以认为是影响查询性能最重要的影响因素,在 shuffle 的时候, 配置多少个 reducer 从来都是spark 用户的老大难问题,相信很多使用spark 的朋友在配置 spark.sql.shuffle.partitions 参数的时候,都是多少有些懵逼的,配置大了,会产生很多小的task 影响运行性能, 配置小了,就会导致task数目很少,单个task 拉取大量的数据,从而带来GC,spill磁盘,甚至OOM的问题,相信很多朋友都碰到过 executor lost, fetch failure等等错误,这里的本质问题是我们并不是很清楚真实的数据量到底有多大, 即使知道了,因为这个参数是全局的,我们一个application 里面不同的query 之间,甚至同一个query 同一个job 不同的stage 之间的shuffle read 的数据量并不是相同的,所以很难使用一个固定的值来统一。
现在 AQE 的实现了动态调整 shuffler partition number 的机制,在跑不同的query 不同的stage的时候,会根据 map 端 shuffle write 的实际数据量,来决定启动多少个 reducer 来处理,这样无论数据量怎么变换,都可以通过不同的 reducer 个数来均衡数据,从而保证单个 reducer 拉取的数据量不至于太大。
这里需要说明的是,AQE 并不是万能的,AQE 并不晓得 map 端需要对数据分出来多少份,所以实际使用的时候,可以把 spark.sql.shuffle.partitions 参数往大了设。
调整 Join 策略
在成本优化中,选择 join 的类型是比较重要的一块,因为在合适的时候选择 broadcast join,就直接避免了 shuffle, 会大大提升执行的效率,但是如果静态数据是错误的,对一个比较大的(统计数据看起来比较小)的 relation 进行了broadcast,就会直接把 driver 内存给搞爆。
AQE 中,会在运行时根据真实的数据来进行判断,如果有一个表小于 broadcast join 配置的阈值,就会把执行计划中的 shuffle join 动态修改为 broadcast join。
处理Join 过程中的数据倾斜
数据倾斜历来都是老大难的问题,数据倾斜,顾名思义,就是指数据中某些 key 的数据量特别大,然后按照 hash 分区的时候,某个分区的数据量就特别大,这种数据分布会导致性能严重下降,特别是在 sort merge join 的情况下,在 spark ui 上可以看到,某几个 task 拉取的数据量远远大于其他的task,运行时间也远远超过其他task,从而这个短板拖慢了整体的运行时间。
因为某些task 拉取了大多数的数据量,就会导致 spill 到磁盘,这样的话,就会更慢,更严重的话,直接就把executor 的内存搞爆了。
因为我们很难事先知道数据的特征,所以在join 的时候数据倾斜就很难通过静态统计信息来避免了,即使加上 hint, 在AQE中,通过收集运行时统计信息,我们就可以动态探测出倾斜的分区,从而对倾斜的分区,分裂出来子分区,每个子分区对应一个 reducer, 从而缓解数据倾斜对性能的影响。
从Spark UI 上观察AQE的运行情况
Understand AQE Query Plans
AQE 的执行计划是在运行过程中动态变化的,在 spark 3.0 中,针对 AQE 引入了几个特定的执行计划节点,AQE 会在Spark UI 上同时显示出初始的计划,和最终优化过的计划,下面我们通过图示的方式来展示一下。
The AdaptiveSparkPlan Node
开启了 AQE,查询中会添加一个或者多个 AdaptiveSparkPlan 节点作为query 或者子查询的根节点,在执行前和执行过程中,isFinalPlan 会被标记为false, query 执行完成后,isFinalPlan 会变为true, 一旦被标记为 true 在 AdaptiveSparkPlan 节点下面的计划也就不再变动。
The CustomShuffleReader Node
CustomShuffleReader 是AQE优化中关键的一环,这个算子节点会根据上一个stage 运行后的真实统计数据,动态的调整后一个 stage 分区的数目,在 spark UI 上,鼠标放在上面,如果你看到
coalesced
标记的话,就说明AQE 已经探测出了大量的小分区,根据配置的比较合适的分区数据量,把他们合并在了一起,可以点开 details, 里面可以看到原始的分区数据,已经合并后的分区数目。
当出现
skewed
标记的时候,说明 AQE在 sort-merge 的计算过程中, 探测出了倾斜的分区,details 里面可以看到,有多少个倾斜的分区,已经从这些倾斜分区中分裂出的分区数目。
当然上面两种优化效果是可以叠加的:
Detecting Join Strategy Change
对比执行计划,可以看出来在AQE优化前后的执行计划的区别,执行计划中,会展示出来初始 执行计划,和 Final 执行计划,下面的例子中,可以看出,初始的 SortMergeJoin 被优化为了 BroadcastHashJoin。
在 Spark UI 上面可以更加清楚的看到优化效果,当然spark ui 上只会展示当前的执行计划图,你可以在 query 开始的时候,和query 完成的时候,对比当时的执行计划图的区别。
Detecting Skew Join
下面的图例中可以根据 skew=true 的标记来判断 引擎有没有执行数据倾斜优化:
AQE 还是很强大的,因为依据的是真实数据的统计信息,AQE 可以很准确的选择最合适的 reducer 数目,转化join 策略,以及处理数据倾斜。
大家都在看
spark sql源码系列:
是时候学习真正的spark技术了 丨 从0到1认识 spark sql 丨 spark sql 源码剖析 PushDownPredicate 丨 spark sql 源码剖析 OptimizeIn 篇
structured streaming 系列:
structured streaming 原理剖析 丨 structured streaming 碰上kafka 丨 structured streaming 是如何搞定乱序时间的
spark streaming 系列:
spark streaming 读取kafka各种姿势详解 丨 spark streaming流式计算中的困境与解决之道
spark core 系列:
彻底搞懂spark shuffle过程(1) 丨 彻底搞懂spark shuffle过程(2) 丨 spark内存管理-Tungsten框架探秘
spark 机器学习系列:
学习了 streaming 和 sql,别忘了还有 Mlib
近些年来,在对Spark SQL优化上,CBO是最成功的一个特性之一。
CBO会计算一些和业务数据相关的统计数据,来优化查询,例如行数、去重后的行数、空值、最大最小值等。
Spark根据这些数据,自动选择BHJ或者SMJ,对于多Join场景下的Cost-based Join Reorder,来达到优化执行计划的目的。
但是,由于这些统计数据是需要预先处理的,会过时,所以我们在用过时的数据进行判断,在某些情况下反而会变成负面效果,拉低了SQL执行效率。
AQE在执行过程中统计数据,并动态地调节执行计划,从而解决了这个问题。
对于AQE而言,最重要的问题就是什么时候去重新计算优化执行计划。Spark任务的算子如果管道排列,依次并行执行。然而,shuffle或者broadcast exchange会打断算子的排列执行,我们称其为物化点(Materialization Points),并且用”Query Stages”来代表那些被物化点所分割的小片段。每个Query Stage会产出中间结果,当且仅当该stage及其并行的所有stage都执行完成后,下游的Query Stage才能被执行。所以当上游部分stage执行完成,partitions的统计数据也获取到了,并且下游还未开始执行,这就给AQE提供了reoptimization的机会。
在查询开始时,生成完了执行计划,AQE框架首先会找到并执行那些不存在上游的stages。一旦这些stage有一个或多个完成,AQE框架就会将其在physical plan中标记为完成,并根据已完成的stages提供的执行数据来更新整个logical plan。基于这些新产出的统计数据,AQE框架会执行optimizer,根据一系列的优化规则来进行优化;AQE框架还会执行生成普通physical plan的optimizer以及自适应执行专属的优化规则,例如分区合并、数据倾斜处理等。于是,我们就获得了最新优化过的执行计划和一些已经执行完成的stages,至此为一次循环。接着我们只需要继续重复上面的步骤,直到整个query都跑完。
在Spark 3.0中,AQE框架拥有三大特征:
动态折叠shuffle过程中的partition
动态选择join策略
动态优化存在数据倾斜的join
接下来我们就具体来看看这三大特征。
① 动态合并shuffle partitions
在我们处理的数据量级非常大时,shuffle通常来说是最影响性能的。因为shuffle是一个非常耗时的算子,它需要通过网络移动数据,分发给下游算子。
在shuffle中,partition的数量十分关键。partition的最佳数量取决于数据,而数据大小在不同的query不同stage都会有很大的差异,所以很难去确定一个具体的数目:
如果partition过少,每个partition数据量就会过多,可能就会导致大量数据要落到磁盘上,从而拖慢了查询。
如果partition过多,每个partition数据量就会很少,就会产生很多额外的网络开销,并且影响Spark task scheduler,从而拖慢查询。
为了解决该问题,我们在最开始设置相对较大的shuffle partition个数,通过执行过程中shuffle文件的数据来合并相邻的小partitions。
例如,假设我们执行
SELECT max(i) FROM tbl GROUP BY j
,表tbl只有2个partition并且数据量非常小。我们将初始shuffle partition设为5,因此在分组后会出现5个partitions。若不进行AQE优化,会产生5个tasks来做聚合结果,事实上有3个partitions数据量是非常小的。
然而在这种情况下,AQE只会生成3个reduce task。
② 动态切换join策略
在Spark所支持的众多join中,broadcast hash join性能是最好的。因此,如果需要广播的表的预估大小小于了广播限制阈值,那么我们就应该将其设为BHJ。但是,对于表的大小估计不当会导致决策错误,比如join表有很多的filter(容易把表估大)或者join表有很多其他算子(容易把表估小),而不仅仅是全量扫描一张表。
由于AQE拥有精确的上游统计数据,因此可以解决该问题。比如下面这个例子,右表的实际大小为15M,而在该场景下,经过filter过滤后,实际参与join的数据大小为8M,小于了默认broadcast阈值10M,应该被广播。
在我们执行过程中转化为BHJ的同时,我们甚至可以将传统shuffle优化为本地shuffle(例如shuffle读在mapper而不是基于reducer)来减小网络开销。
③ 动态优化数据倾斜
数据倾斜是由于集群上数据在分区之间分布不均匀所导致的,它会拉慢join场景下整个查询。AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。
我们可以看下这个场景,Table A join Table B,其中Table A的partition A0数据远大于其他分区。
AQE会将partition A0切分成2个子分区,并且让他们独自和Table B的partition B0进行join。
如果不做这个优化,SMJ将会产生4个tasks并且其中一个执行时间远大于其他。经优化,这个join将会有5个tasks,但每个task执行耗时差不多相同,因此个整个查询带来了更好的性能。
我们可以设置参数
spark.sql.adaptive.enabled
为true来开启AQE,在Spark 3.0中默认是false,并满足以下条件:
非流式查询
包含至少一个exchange(如join、聚合、窗口算子)或者一个子查询
AQE通过减少了对静态统计数据的依赖,成功解决了Spark CBO的一个难以处理的trade off(生成统计数据的开销和查询耗时)以及数据精度问题。相比之前具有局限性的CBO,现在就显得非常灵活 – 我们再也不需要提前去分析数据了!
大家都在看
spark sql源码系列:
是时候学习真正的spark技术了 丨 从0到1认识 spark sql 丨 spark sql 源码剖析 PushDownPredicate 丨 spark sql 源码剖析 OptimizeIn 篇
structured streaming 系列:
structured streaming 原理剖析 丨 structured streaming 碰上kafka 丨 structured streaming 是如何搞定乱序时间的
spark streaming 系列:
spark streaming 读取kafka各种姿势详解 丨 spark streaming流式计算中的困境与解决之道
spark core 系列:
彻底搞懂spark shuffle过程(1) 丨 彻底搞懂spark shuffle过程(2) 丨 spark内存管理-Tungsten框架探秘
spark 机器学习系列:
学习了 streaming 和 sql,别忘了还有 Mlib
UDF 只是在sql中简单的处理转换一些字段,类似默认的trim 函数把一个字符串类型的列的头尾空格去掉, 还有一种sql函数叫做UDAF,不同于UDF,这种是在sql聚合语句中使用的sql函数,必须配合 GROUP BY 一同使用,类似默认的count,sum函数,但是还有一种自定义函数叫做 UDWF, 这种一般人就不知道了,这种叫做窗口自定义函数,不了解窗口函数的,可以参考上一篇文章,或者官方的介绍
While aggregate functions work over a group, window functions work over a logical window of record and allow you to produce new columns from the combination of a record and one or more records in the window.
Describing what window functions are is beyond the scope of this article, so for that refer to
the previously mentioned article from Databricks
, but in particular, we are interested at the ‘previous event in time for a user’ in order to figure out sessions.
There is plenty of documentation on how to write UDFs and UDAFs, see for instance This link for UDFs or this link for UDAFs .
I was surprised to find out there’s not much info on how to build an custom window function, so I dug up the source code for spark and started looking at how window functions are implemented. That opened to me a whole new world, since Window functions, although conceptually similar to UDAFs, use a lower level Spark API than UDAFs, they are written using Catalyst expressions .
窗口函数是 SQL 中一类特别的函数。和聚合函数相似,窗口函数的输入也是多行记录。不 同的是,聚合函数的作用于由 GROUP BY 子句聚合的组,而窗口函数则作用于一个窗口
这里怎么理解一个窗口呢,spark君在这里得好好的解释解释,一个窗口是怎么定义的,
窗口语句中, partition by 用来指定分区的列,在同一个分区的行属于同一个窗口
order by 用来指定数据在一个窗口内的多行,如何排序
windowing_clause 用来指定开窗方式,在spark sql 中开窗方式有那么几种
- 一个分区中的所有行作为一个窗口: UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(上下都没有边界), 这种情况下,spark sql 会把所有行作为一个输入,进行一次求值
-
Growing frame:UNBOUNDED PRECEDING AND ….(上无边界),
这种就是不断的把当前行加入的窗口中,而不删除, 例子:
.rowsBetween(Long.MinValue, 0)
:窗口的大小是按照排序从最小值到当前行,在数据迭代过程中,不断的把当前行加入的窗口中。 - Shrinking frame:… AND UNBOUNDED FOLLOWING(下无边界) 和Growing frame 相反,窗口不断的把迭代到的当前行从窗口中删除掉。
-
Moving frame:滑动的窗口,
举例:
.rowsBetween(-1, 1) 就是指
窗口定义从 -1(当前行前一行)到 1(当前行后一行) ,每一个滑动的窗口总用有3行 -
Offset frame
窗口中只有一条数据,就是偏移当前行一定距离的哪一行,举例:
lag(field, n)
: 就是取从当前字段往前第n个值
这里就针对窗口函数就介绍这么多,如果不懂请参考相关文档,加强理解,我们在平时使用 spark sql 的过程中,会发现有很多教你自定义 UDF 和 UDAF 的教程,却没有针对UDWF的教程,这是为啥呢,这是因为 UDF 和UDAF 都作为上层API暴露给用户了,使用scala很简单就可以写一个函数出来,但是UDWF没有对上层用户暴露,只能使用 Catalyst expressions . 也就是Catalyst框架底层的表达式语句才可以定义,如果没有对源码有很深入的研究,根本就搞不出来。spark 君在工作中写了一些UDWF的函数,但是都比较复杂,不太好单独抽出来作为一个简明的例子给大家讲解,这里翻译一篇文章来做说明。
窗口函数的使用场景
Now, for what kind of problem do we need window functions in the first place?
A common problem when working on any kind of website, is to determine ‘user sessions’, periods of user activity. if an user is inactive for a certain time
T
, then it’s considered a new ‘session’. Statistics over sessions are used to determine for instance if the user is a bot, to find out what pages have the most activity, etc.
Let’s say that we consider a session over if we don’t see any activity for one hour (sixty minutes). Let’s see an example of user activity, where ‘event’ has the name of the page the user visited and time is the time of the event. I simplified it, since the event would be a URL , while the time would be a full timestamp , and the session id would be generated as a random UUID , but I put simpler names/times just to illustrate the logic.
我们来举个实际例子来说明 窗口函数的使用场景,在网站的统计指标中,有一个概念叫做用户会话,什么叫做用户会话呢,我来说明一下,我们在网站服务端使用用户session来管理用户状态,过程如下
1) 服务端session是用户第一次访问应用时,服务器就会创建的对象,代表用户的一次会话过程,可以用来存放数据。服务器为每一个session都分配一个唯一的sessionid,以保证每个用户都有一个不同的session对象。
2)服务器在创建完session后,会把sessionid通过cookie返回给用户所在的浏览器,这样当用户第二次及以后向服务器发送请求的时候,就会通过cookie把sessionid传回给服务器,以便服务器能够根据sessionid找到与该用户对应的session对象。
3)session通常有失效时间的设定,比如1个小时。当失效时间到,服务器会销毁之前的session,并创建新的session返回给用户。但是只要用户在失效时间内,有发送新的请求给服务器,通常服务器都会把他对应的session的失效时间根据当前的请求时间再延长1个小时。
也就是说如果用户在1个超过一个小时不产生用户事件,当前会话就结束了,如果后续再产生用户事件,就当做新的用户会话,我们现在就使用spark sql 来统计用户的会话数,这种场景就很适合使用窗口函数来做统计,因为判断当前是否是一个新会话的依据,需要依赖当前行的前一行的时间戳和当前行的时间戳的间隔来判断,下面的表格可以帮助你理解这个概念,例子中有3列数据,用户,event字段代表用户访问了一个页面,time字段代表访问页面的时间戳:
user | event | time | session |
---|---|---|---|
user1 | page1 | 10:12 | session1 (new session) |
user1 | page2 | 10:20 | session1 (same session, 8 minutes from last event) |
user1 | page1 | 11:13 | session1 (same session, 53 minutes from last event) |
user1 | page3 | 14:12 | session2 (new session, 3 hours after last event) |
Note that this is the activity for one user. We do have many users, and in fact partitioning by user is the job of the window function.
上面只有一个用户,如果多个用户,可以使用 partition by 来进行分区。
It’s better to use an example to illustrate how the function works in respect of the window definition.
Let’s assume we have a very simple user activity data, with a user ID called
user
, while
ts
is a numeric timestamp and
session
is a session ID, that may be already present. While we may start with no session whatsoever, in most practical cases, we may be processing data hourly, so at hour
N + 1
we want to continue the sessions
we calculated at hour
n
.
Let’s create some test data and show what we want to achieve.
我们来构造一些假数据:
val d = Array[UserActivityData]( UserActivityData("user1", st, "ss1"), UserActivityData("user2", st + 5*one_minute, null), UserActivityData("user1", st + 10*one_minute, null), UserActivityData("user1", st + 15*one_minute, null), UserActivityData("user2", st + 15*one_minute, null), UserActivityData("user1", st + 140*one_minute, null), UserActivityData("user1", st + 160*one_minute, null)) // creating the DataFrame val sqlContext = new SQLContext(sc) val df = sqlContext.createDataFrame(sc.parallelize(d)) // Window specification val specs = Window.partitionBy(f.col("user")).orderBy(f.col("ts").asc) // create the session val res = df.withColumn( "newsession", calculateSession(f.col("ts"), f.col("session")) over specs)