添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
温柔的豆浆  ·  An Experimental ...·  1 周前    · 
狂野的白开水  ·  Backup job completes ...·  1 周前    · 
高大的骆驼  ·  万方数据知识服务平台·  1 月前    · 
鬼畜的铁链  ·  GitHub - ...·  4 月前    · 
发财的脆皮肠  ·  Perforce DTG ...·  4 月前    · 
爱旅游的荔枝  ·  GitHub - ...·  5 月前    · 
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
row = Row("name", "date", "score")
rdd = sc.parallelize([
    row("Ali", "2020-01-01", 10.0),
    row("Ali", "2020-01-02", 15.0),
    row("Ali", "2020-01-03", 20.0),
    row("Ali", "2020-01-04", 25.0),
    row("Ali", "2020-01-05", 30.0),
    row("Bob", "2020-01-01", 15.0),
    row("Bob", "2020-01-02", 20.0),
    row("Bob", "2020-01-03", 30.0)
df = rdd.toDF().withColumn("date", col("date").cast("date"))

我们使用分组的形式计算每个人的平均分,其他数据保留,则可用如下代码:

w1 = Window().partitionBy(col("name"))
df.withColumn("mean1", mean("score").over(w1)).show()
+----+----------+-----+------------------+
|name|      date|score|             mean1|
+----+----------+-----+------------------+
| Bob|2020-01-01| 15.0|21.666666666666668|
| Bob|2020-01-02| 20.0|21.666666666666668|
| Bob|2020-01-03| 30.0|21.666666666666668|
| Ali|2020-01-02| 15.0|              20.0|
| Ali|2020-01-05| 30.0|              20.0|
| Ali|2020-01-01| 10.0|              20.0|
| Ali|2020-01-03| 20.0|              20.0|
| Ali|2020-01-04| 25.0|              20.0|
+----+----------+-----+------------------+

从结果来看,新增加的一列mean1表示每个人所在的分组中所有分数的平均值。当然,你也可以求最大值、最小值或者方差之类的统计值。

下面我们来看一组变形的分组窗:

days = lambda i: i * 86400  # 一天转化为秒单位
w1 = Window().partitionBy(col("name"))
w2 = Window().partitionBy(col("name")).orderBy("date")
w3 = Window().partitionBy(col("name")).orderBy((col("date").cast("timestamp").cast("bigint")/3600/24)).rangeBetween(-4, 0)
w4 = Window().partitionBy(col("name")).orderBy("date").rowsBetween(Window.currentRow, 1)

w1就是常规的按照名字进行分组;w2在按照名字分组的基础上,对其组内的日期按照从早到晚进行排序;w3是在w2的基础上,增加了范围限制,限制在从前4天到当前日期的范围内;w4则是在w2的基础上增加了行参数的限制,在当前行到下一行范围内。

是不是还是有些迷糊,不慌,来看下按照这些分组窗统计的结果:

df.withColumn("mean1", mean("score").over(w1))\
  .withColumn("mean2", mean("score").over(w2))\
  .withColumn("mean3", mean("score").over(w3))\
  .withColumn("mean4", mean("score").over(w4))\
  .show()
+----+----------+-----+-----+------------------+------------------+-----+
|name|      date|score|mean1|             mean2|             mean3|mean4|
+----+----------+-----+-----+------------------+------------------+-----+
| Bob|2020-01-01| 15.0| 30.0|              15.0|              15.0| 17.5|
| Bob|2020-01-02| 20.0| 30.0|              17.5|              17.5| 25.0|
| Bob|2020-01-03| 30.0| 30.0|21.666666666666668|21.666666666666668| 32.5|
| Bob|2020-01-04| 35.0| 30.0|              25.0|              25.0| 37.5|
| Bob|2020-01-05| 40.0| 30.0|              28.0|              28.0| 40.0|
| Bob|2020-01-06| 40.0| 30.0|              30.0|              33.0| 40.0|
| Ali|2020-01-01| 10.0| 20.0|              10.0|              10.0| 12.5|
| Ali|2020-01-02| 15.0| 20.0|              12.5|              12.5| 17.5|
| Ali|2020-01-03| 20.0| 20.0|              15.0|              15.0| 22.5|
| Ali|2020-01-04| 25.0| 20.0|              17.5|              17.5| 27.5|
| Ali|2020-01-05| 30.0| 20.0|              20.0|              20.0| 30.0|
+----+----------+-----+-----+------------------+------------------+-----+

我们来逐个分析一下,首先mean1列很简单,就是每个name分组内所有分数的平均值。mean2比较有意思,分组窗是按照name分组后按照日期进行了排序,于是均值是在当前行及前面所有行的范围内进行计算,这个可以看每组最后一个mean2均值,都与mean1均值相等。

mean3列是在当前行及往前数4天范围内计算均值,如Bob的最后一个mean3值是33,就是从2020-01-02开始计算的。

mean4列每次只统计当前行和下一行的数值,如果没有下一行则是其本身。
Window.unboundedPreceding, Window.unboundedFollowing, 以及Window.currentRow分别用来表示前面所有行、后面所有行以及当前行,而数值的正负表示往前或往后,大小表示行数。

到这里可以稍微做一个总结:
1 单独的Window做聚合统计,仅对分组内所有数值进行计算;
2 添加orderBy排序的Window分组窗,统计时默认是从前面所有行到当前行进行计算;
3 rangeBetween结合orderBy可用来限制指定范围内的数据,例如统计一周内数据的场景;
4 rowsBetween用来限定前后指定行范围内的数据进行统计

Spark SQL 中,Window 函数是一种用于在查询结果集中执行聚合、排序和分析操作的强大工具。它允许你在查询中创建一个口,然后对口内的数据进行聚合计算。 文章目录spark1 累加历史1.1 spark sql 使用口函数累加历史数据1.2 使用Column提供的over 函数,传入口操作1.3 累加一段时间范围内2 统计全部2.1 spark sql 使用rollup添加all统计2.2 spark sql 使用rollup添加all统计3 行转列 ->pivot4 空值处理4.1 对指定的列空值填充4.2 删除某列的非空且非NaN的低... 口函数的主要作用是对数据进行分组排序、求和、求平均值、计数等。 1.口函数的基本语法 <分析函数> OVER ([PARTITION BY <列清单>] ORDER BY <排序用列清单> [ROWS BETWEEN 开始位置 AND 结束位置]) 理解口函数的基本语法: ​ over()函数中包括三个函数:包括分区 partition by 列名、排序 order by 列名、指定 口范围 rows between 开始位置 and 结束位置。我们在使用ove 项目github地址:bitcarmanlee easy-algorithm-interview-and-practice 欢迎大家star,留言,一起学习进步 1.为什么需要口函数 在1.4以前,Spark SQL支持两种类型的函数用来计算单个的返回值。第一种是内置函数或者UDF函数,他们将单个行中的值作为输入,并且他们为每个输入行生成单个返回值。另外一种是聚合函数,典型的是SUM, MAX, AVG这种,是对一组行数据进行操作,并且为每个组计算一个返回值。 上面提到的两种函数,实际当中使用非常广泛,但 口函数(Window Functions)中的Window子句用于指定计算的范围,即函数对哪些行进行操作。其默认范围取决于是否同时使用PARTITION BY和ORDER BY子句。 当只有ORDER BY子句,没有PARTITION BY和Window子句时: 默认的Window范围是从整个结果集的第一行到当前...