添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

聚合操作

首先,我们可以使用 describe 函数,来查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差是多少。

>>> print(iris.describe())
    type  sepal_length  sepal_width  petal_length  petal_width
0  count    150.000000   150.000000    150.000000   150.000000
1   mean      5.843333     3.054000      3.758667     1.198667
2    std      0.828066     0.433594      1.764420     0.763161
3    min      4.300000     2.000000      1.000000     0.100000
4    max      7.900000     4.400000      6.900000     2.500000

我们可以使用单列来执行聚合操作:

>>> iris.sepallength.max()

如果要在消除重复后的列上进行聚合,可以先调用 unique 方法,再调用相应的聚合函数:

>>> iris.name.unique().cat(sep=',')
u'Iris-setosa,Iris-versicolor,Iris-virginica'

如果所有列支持同一种聚合操作,也可以直接在整个 DataFrame 上执行聚合操作:

>>> iris.exclude('category').mean()
   sepal_length  sepal_width  petal_length  petal_width
1      5.843333     3.054000      3.758667     1.198667

需要注意的是,在 DataFrame 上执行 count 获取的是 DataFrame 的总行数:

>>> iris.count()

PyODPS 支持的聚合操作包括:

count(或size)

nunique

不重复值数量

median

quantile(p)

p分位数,仅在整数值下可取得准确值

moment

n 阶中心矩(或 n 阶矩)

样本偏度(无偏估计)

kurtosis

样本峰度(无偏估计)

按sep做字符串连接操作

tolist

组合为 list

需要注意的是,与 Pandas 不同,对于列上的聚合操作,不论是在 ODPS 还是 Pandas 后端下,PyODPS DataFrame 都会忽略空值。这一逻辑与 SQL 类似。

分组聚合

DataFrame API提供了groupby来执行分组操作,分组后的一个主要操作就是通过调用agg或者aggregate方法,来执行聚合操作。

>>> iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
              name  sepallength_max  smin
0      Iris-setosa              5.8   4.3
1  Iris-versicolor              7.0   4.9
2   Iris-virginica              7.9   4.9

最终的结果列中会包含分组的列,以及聚合的列。

DataFrame 提供了一个value_counts操作,能返回按某列分组后,每个组的个数从大到小排列的操作。

我们使用 groupby 表达式可以写成:

>>> iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

使用value_counts就很简单了:

>>> iris['name'].value_counts().head(5)
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

需要注意的是,该方法返回的行数大小受到 ODPS 排序返回结果大小的限制,默认为 10000 行,可通过 options.df.odps.sort.limit 配置,详见 配置选项

对于聚合后的单列操作,我们也可以直接取出列名。但此时只能使用聚合函数。

>>> iris.groupby('name').petallength.sum()
   petallength_sum
0             73.2
1            213.0
2            277.6
>>> iris.groupby('name').agg(iris.petallength.notnull().sum())
              name  petallength_sum
0      Iris-setosa               50
1  Iris-versicolor               50
2   Iris-virginica               50

分组时也支持对常量进行分组,但是需要使用Scalar初始化。

>>> from odps.df import Scalar
>>> iris.groupby(Scalar(1)).petallength.sum()
   petallength_sum
0            563.8

对字段调用agg或者aggregate方法来调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个mutable的object(比如 list、dict),buffer大小不应随数据而递增。

  • __call__(buffer, *val):将值聚合到中间 buffer。

  • merge(buffer, pbuffer):将 pbuffer 聚合到 buffer 中。

  • getvalue(buffer):返回最终值。

  • 让我们看一个计算平均值的例子。

    class Agg(object):
        def buffer(self):
            return [0.0, 0]
        def __call__(self, buffer, val):
            buffer[0] += val
            buffer[1] += 1
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    
    >>> iris.sepalwidth.agg(Agg)
    3.0540000000000007
    

    如果最终类型和输入类型发生了变化,则需要指定类型。

    >>> iris.sepalwidth.agg(Agg, 'float')
    

    自定义聚合也可以用在分组聚合中。

    >>> iris.groupby('name').sepalwidth.agg(Agg)
       petallength_aggregation
    0                    3.418
    1                    2.770
    2                    2.974
    

    当对多列调用自定义聚合,可以使用agg方法。

    class Agg(object):
        def buffer(self):
            return [0.0, 0.0]
        def __call__(self, buffer, val1, val2):
            buffer[0] += val1
            buffer[1] += val2
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    
    >>> from odps.df import agg
    >>> to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 对两列调用自定义聚合
    >>> iris.groupby('name').agg(val=to_agg)
                  name       val
    0      Iris-setosa  0.682781
    1  Iris-versicolor  0.466644
    2   Iris-virginica  0.451427
    

    要调用 ODPS 上已经存在的 UDAF,指定函数名即可。

    >>> iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 对单列聚合
    >>> to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
    >>> iris.groupby('name').agg(to_agg.rename('val'))  # 对多列聚合
    

    目前,受限于 Python UDF,自定义聚合无法支持将 list / dict 类型作为初始输入或最终输出结果。

    HyperLogLog 计数

    DataFrame 提供了对列进行 HyperLogLog 计数的接口 hll_count,这个接口是个近似的估计接口, 当数据量很大时,能较快的对数据的唯一个数进行估计。

    这个接口在对比如海量用户UV进行计算时,能很快得出估计值。

    >>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
    >>> df.a.hll_count()
    63270
    >>> df.a.nunique()
    63250
    

    提供 splitter 参数会对每个字段进行分隔,再计算唯一数。