添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

排序操作只能作用于Collection。我们只需要调用sort或者sort_values方法。

>>> iris.sort('sepalwidth').head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.2         2.2          4.5         1.5  Iris-versicolor
2          6.0         2.2          5.0         1.5   Iris-virginica
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          5.5         2.3          4.0         1.3  Iris-versicolor

如果想要降序排列,则可以使用参数ascending,并设为False。

>>> iris.sort('sepalwidth', ascending=False).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

也可以这样调用,来进行降序排列:

>>> iris.sort(-iris.sepalwidth).head(5)
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.7         4.4          1.5         0.4  Iris-setosa
1          5.5         4.2          1.4         0.2  Iris-setosa
2          5.2         4.1          1.5         0.1  Iris-setosa
3          5.8         4.0          1.2         0.2  Iris-setosa
4          5.4         3.9          1.3         0.4  Iris-setosa

多字段排序也很简单:

>>> iris.sort(['sepalwidth', 'petallength']).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          4.0         1.0  Iris-versicolor
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          5.0         1.5   Iris-virginica
4          4.5         2.3          1.3         0.3      Iris-setosa

多字段排序时,如果是升序降序不同,ascending参数可以传入一个列表,长度必须等同于排序的字段,它们的值都是boolean类型

>>> iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

下面效果是一样的:

>>> iris.sort(['sepalwidth', -iris.petallength]).head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          5.0         2.0          3.5         1.0  Iris-versicolor
1          6.0         2.2          5.0         1.5   Iris-virginica
2          6.2         2.2          4.5         1.5  Iris-versicolor
3          6.0         2.2          4.0         1.0  Iris-versicolor
4          6.3         2.3          4.4         1.3  Iris-versicolor

由于 ODPS 要求排序必须指定个数,所以在 ODPS 后端执行时, 会通过 options.df.odps.sort.limit 指定排序个数,这个值默认是 10000, 如果要排序尽量多的数据,可以把这个值设到较大的值。不过注意,此时可能会导致 OOM。

去重在Collection上,用户可以调用distinct方法。

>>> iris[['name']].distinct()
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name')
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica
>>> iris.distinct('name', 'sepallength').head(3)
          name  sepallength
0  Iris-setosa          4.3
1  Iris-setosa          4.4
2  Iris-setosa          4.5

在Sequence上,用户可以调用unique,但是记住,调用unique的Sequence不能用在列选择中。

>>> iris.name.unique()
0      Iris-setosa
1  Iris-versicolor
2   Iris-virginica

下面的代码是错误的用法。

>>> iris[iris.name, iris.name.unique()]  # 错误的

除了按份数采样外,其余方法如果要在 ODPS DataFrame 上执行,需要 Project 支持 XFlow,否则,这些方法只能在 Pandas DataFrame 后端上执行。

  • 按份数采样
  • 在这种采样方式下,数据被分为 parts 份,可选择选取的份数序号。

    >>> iris.sample(parts=10)  # 分成10份,默认取第0份
    >>> iris.sample(parts=10, i=0)  # 手动指定取第0份
    >>> iris.sample(parts=10, i=[2, 5])   # 分成10份,取第2和第5份
    >>> iris.sample(parts=10, columns=['name', 'sepalwidth'])  # 根据name和sepalwidth的值做采样
    
  • 按比例 / 条数采样
  • 在这种采样方式下,用户指定需要采样的数据条数或采样比例。指定 replace 参数为 True 可启用放回采样。

    >>> iris.sample(n=100)  # 选取100条数据
    >>> iris.sample(frac=0.3)  # 采样30%的数据
    
  • 按权重列采样
  • 在这种采样方式下,用户指定权重列和数据条数 / 采样比例。指定 replace 参数为 True 可启用放回采样。

    >>> iris.sample(n=100, weights='sepal_length')
    >>> iris.sample(n=100, weights='sepal_width', replace=True)
    

    在这种采样方式下,用户指定用于分层的标签列,同时为需要采样的每个标签指定采样比例( frac 参数)或条数 ( n 参数)。暂不支持放回采样。

    >>> iris.sample(strata='category', n={'Iris Setosa': 10, 'Iris Versicolour': 10})
    >>> iris.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})
    

    min_max_scale 还支持使用 feature_range 参数指定输出值的范围,例如,如果我们需要使输出值在 (-1, 1) 范围内,可使用

    >>> df.min_max_scale(columns=['fid'], feature_range=(-1, 1))
        name  id       fid
    0  name1   4  1.000000
    1  name2   2  0.052632
    2  name2   3 -1.000000
    3  name1   4  0.421053
    4  name1   3 -0.631579
    5  name1   3  0.368421
    

    如果需要保留原始值,可以使用 preserve 参数。此时,缩放后的数据将会以新增列的形式追加到数据中, 列名默认为原列名追加“_scaled”后缀,该后缀可使用 suffix 参数更改。例如,

    >>> df.min_max_scale(columns=['fid'], preserve=True)
        name  id  fid  fid_scaled
    0  name1   4  5.3    1.000000
    1  name2   2  3.5    0.526316
    2  name2   3  1.5    0.000000
    3  name1   4  4.2    0.710526
    4  name1   3  2.2    0.184211
    5  name1   3  4.1    0.684211
    

    min_max_scale 也支持使用 group 参数指定一个或多个分组列,在分组列中分别取最值进行缩放。例如,

    >>> df.min_max_scale(columns=['fid'], group=['name'])
        name  id       fid
    0  name1   4  1.000000
    1  name1   4  0.645161
    2  name1   3  0.000000
    3  name1   3  0.612903
    4  name2   2  1.000000
    5  name2   3  0.000000
    

    可见结果中,name1 和 name2 两组均按组中的最值进行了缩放。

    std_scale 可依照标准正态分布对数据进行调整。例如,

    >>> df.std_scale(columns=['fid'])
        name  id       fid
    0  name1   4  1.436467
    1  name2   2  0.026118
    2  name2   3 -1.540938
    3  name1   4  0.574587
    4  name1   3 -0.992468
    5  name1   3  0.496234
    

    std_scale 同样支持 preserve 参数保留原始列以及使用 group 进行分组,具体请参考 min_max_scale,此处不再赘述。

    空值处理

    DataFrame 支持筛去空值以及填充空值的功能。例如,对数据

       id   name   f1   f2   f3   f4
    
    
    
    
        
    
    0   0  name1  1.0  NaN  3.0  4.0
    1   1  name1  2.0  NaN  NaN  1.0
    2   2  name1  3.0  4.0  1.0  NaN
    3   3  name1  NaN  1.0  2.0  3.0
    4   4  name1  1.0  NaN  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    6   6  name1  NaN  NaN  NaN  NaN
    

    使用 dropna 可删除 subset 中包含空值的行:

    >>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   5  name1  1.0  2.0  3.0  4.0
    

    如果行中包含非空值则不删除,可以使用 how=’all’:

    >>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   0  name1  1.0  NaN  3.0  4.0
    1   1  name1  2.0  NaN  NaN  1.0
    2   2  name1  3.0  4.0  1.0  NaN
    3   3  name1  NaN  1.0  2.0  3.0
    4   4  name1  1.0  NaN  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    

    你也可以使用 thresh 参数来指定行中至少要有多少个非空值。例如:

    >>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   0  name1  1.0  NaN  3.0  4.0
    2   2  name1  3.0  4.0  1.0  NaN
    3   3  name1  NaN  1.0  2.0  3.0
    4   4  name1  1.0  NaN  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    

    使用 fillna 可使用常数或已有的列填充未知值。下面给出了使用常数填充的例子:

    >>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
       id   name     f1     f2     f3     f4
    0   0  name1    1.0  100.0    3.0    4.0
    1   1  name1    2.0  100.0  100.0    1.0
    2   2  name1    3.0    4.0    1.0  100.0
    3   3  name1  100.0    1.0    2.0    3.0
    4   4  name1    1.0  100.0    3.0    4.0
    5   5  name1    1.0    2.0    3.0    4.0
    6   6  name1  100.0  100.0  100.0  100.0
    

    你也可以使用一个已有的列来填充未知值。例如:

    >>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   0  name1  1.0  NaN  3.0  4.0
    1   1  name1  2.0  NaN  NaN  1.0
    2   2  name1  3.0  4.0  1.0  4.0
    3   3  name1  1.0  1.0  2.0  3.0
    4   4  name1  1.0  NaN  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    6   6  name1  NaN  NaN  NaN  NaN
    

    特别地,DataFrame 提供了向前 / 向后填充的功能。通过指定 method 参数为下列值可以达到目的:

    >>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   0  name1  1.0  3.0  3.0  4.0
    1   1  name1  2.0  1.0  1.0  1.0
    2   2  name1  3.0  4.0  1.0  NaN
    3   3  name1  1.0  1.0  2.0  3.0
    4   4  name1  1.0  3.0  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    6   6  name1  NaN  NaN  NaN  NaN
    >>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
       id   name   f1   f2   f3   f4
    0   0  name1  1.0  1.0  3.0  4.0
    1   1  name1  2.0  2.0  2.0  1.0
    2   2  name1  3.0  4.0  1.0  1.0
    3   3  name1  NaN  1.0  2.0  3.0
    4   4  name1  1.0  1.0  3.0  4.0
    5   5  name1  1.0  2.0  3.0  4.0
    6   6  name1  NaN  NaN  NaN  NaN
    

    你也可以使用 ffill / bfill 函数来简化代码。ffill 等价于 fillna(method=’ffill’), bfill 等价于 fillna(method=’bfill’)

    对所有行/列调用自定义函数

    对一行数据使用自定义函数

    要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。

    apply 的自定义函数接收一个参数,为上一步 Collection 的一行数据,用户可以通过属性、或者偏移取得一个字段的数据。

    >>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
       sepaladd
    0       8.6
    1       7.9
    2       7.9
    

    reduce为 True 时,表示返回结果为Sequence,否则返回结果为Collection。 namestypes参数分别指定返回的Sequence或Collection的字段名和类型。 如果类型不指定,将会默认为string类型。

    在 apply 的自定义函数中,reduce 为 False 时,也可以使用 yield关键字来返回多行结果。

    >>> iris.count()
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).head(5)
       iris_add  iris_sub
    0       1.6       8.6
    1       1.2       1.6
    2       1.9       7.9
    3       1.2       1.6
    4       1.5       7.9
    

    我们也可以在函数上来注释返回的字段和类型,这样就不需要在函数调用时再指定。

    >>> from odps.df import output
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris.apply(handle, axis=1).count()
    >>> iris.apply(handle, axis=1).head(5)
       iris_add  iris_sub
    0       1.6       8.6
    1       1.2       1.6
    2       1.9       7.9
    3       1.2       1.6
    4       1.5       7.9
    

    也可以使用 map-only 的 map_reduce,和 axis=1 的apply操作是等价的。

    >>> iris.map_reduce(mapper=handle).count()
    >>> iris.map_reduce(mapper=handle).head(5)
       iris_add  iris_sub
    0       1.6       8.6
    1       1.2       1.6
    2       1.9       7.9
    3       1.2       1.6
    4       1.5       7.9
    

    如果想调用 ODPS 上已经存在的 UDTF,则函数指定为函数名即可。

    >>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
    

    使用 apply 对行操作,且 reduce为 False 时,可以使用 并列多行输出 与已有的行结合,用于后续聚合等操作。

    >>> from odps.df import output
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris[iris.category, iris.apply(handle, axis=1)]
    

    对所有列调用自定义聚合

    调用apply方法,当我们不指定axis,或者axis为0的时候,我们可以通过传入一个自定义聚合类来对所有sequence进行聚合操作。

    class Agg(object):
        def buffer(self):
            return [0.0, 0]
        def __call__(self, buffer, val):
            buffer[0] += val
            buffer[1] += 1
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    
    >>> iris.exclude('name').apply(Agg)
       sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
    0                 5.843333                   3.054                 3.758667                1.198667
    

    目前,受限于 Python UDF,自定义函数无法支持将 list / dict 类型作为初始输入或最终输出结果。

    由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。 请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源,如下文所述。

    引用资源

    类似于对 map 方法的resources参数,每个resource可以是ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。

    对于axis为1,也就是在行上操作,我们需要写一个函数闭包或者callable的类。 而对于列上的聚合操作,我们只需在 __init__ 函数里读取资源即可。

    >>> words_df
                         sentence
    0                 Hello World
    1                Hello Python
    2  Life is short I use Python
    >>> import pandas as pd
    >>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
    >>> @output(['sentence'], ['string'])
    >>> def filter_stops(resources):
    >>>     stop_words = set([r[0] for r in resources[0]])
    >>>     def h(row):
    >>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
    >>>     return h
    >>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                    sentence
    0            Hello World
    1           Hello Python
    2  Life short use Python
    

    可以看到这里的stop_words是存放于本地,但在真正执行时会被上传到ODPS作为资源引用。

    使用第三方Python库

    现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件, 即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。

    PyODPS 提供了 pyodps-pack 工具,可在安装完 PyODPS 后打包三方包及其依赖。同时,execute / persist / to_pandas 方法支持增加 libraries 参数以使用这些资源。如何制作及使用三方包的说明请参考 这里

    由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常

    MapReduce API

    PyODPS DataFrame也支持MapReduce API,用户可以分别编写map和reduce函数(map_reduce可以只有mapper或者reducer过程)。 我们来看个简单的wordcount的例子。

    >>> def mapper(row):
    >>>     for word in row[0].split():
    >>>         yield word.lower(), 1
    >>> def reducer(keys):
    >>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
    >>>     cnt = [0]
    >>>     def h(row, done):  # done表示这个key已经迭代结束
    >>>         cnt[0] += row[1]
    >>>         if done:
    >>>             yield keys[0], cnt[0]
    >>>     return h
    >>> words_df.map_reduce(mapper, reducer, group=['word', ],
    >>>                     mapper_output_names=['word', 'cnt'],
    >>>                     mapper_output_types=['string', 'int'],
    >>>                     reducer_output_names=['word', 'cnt'],
    >>>                     reducer_output_types=['string', 'int'])
         word  cnt
    0   hello    2
    1       i    1
    2      is    1
    3    life    1
    4  python    2
    5   short    1
    6     use    1
    7   world    1
    

    group参数用来指定reduce按哪些字段做分组,如果不指定,会按全部字段做分组。

    其中对于reducer来说,会稍微有些不同。它需要接收聚合的keys初始化,并能继续处理按这些keys聚合的每行数据。 第2个参数表示这些keys相关的所有行是不是都迭代完成。

    这里写成函数闭包的方式,主要为了方便,当然我们也能写成callable的类。

    class reducer(object):
        def __init__(self, keys):
            self.cnt = 0
        def __call__(self, row, done):  # done表示这个key已经迭代结束
            self.cnt += row.cnt
            if done:
                yield row.word, self.cnt
    

    使用 output来注释会让代码更简单些。

    >>> from odps.df import output
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def mapper(row):
    >>>     for word in row[0].split():
    >>>         yield word.lower(), 1
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def reducer(keys):
    >>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
    >>>     cnt = [0]
    >>>     def h(row, done):
    
    
    
    
        
      # done表示这个key已经迭代结束
    >>>         cnt[0] += row.cnt
    >>>         if done:
    >>>             yield keys.word, cnt[0]
    >>>     return h
    >>> words_df.map_reduce(mapper, reducer, group='word')
         word  cnt
    0   hello    2
    1       i    1
    2      is    1
    3    life    1
    4  python    2
    5   short    1
    6     use    1
    7   world    1
    

    有时候我们在迭代的时候需要按某些列排序,则可以使用 sort参数,来指定按哪些列排序,升序降序则通过 ascending参数指定。 ascending 参数可以是一个bool值,表示所有的 sort字段是相同升序或降序, 也可以是一个列表,长度必须和 sort字段长度相同。

    指定combiner

    combiner表示在map_reduce API里表示在mapper端,就先对数据进行聚合操作,它的用法和reducer是完全一致的,但不能引用资源。 并且,combiner的输出的字段名和字段类型必须和mapper完全一致。

    上面的例子,我们就可以使用reducer作为combiner来先在mapper端对数据做初步的聚合,减少shuffle出去的数据量。

    >>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
    

    使用第三方Python库

    现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件, 即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。

    PyODPS 提供了 pyodps-pack 工具,可在安装完 PyODPS 后打包三方包及其依赖。同时,execute / persist / to_pandas 方法支持增加 libraries 参数以使用这些资源。如何制作及使用三方包的说明请参考 这里

    由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 的 ODPS Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常

    由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。 请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源。

    引用资源

    在MapReduce API里,我们能分别指定mapper和reducer所要引用的资源。

    如下面的例子,我们对mapper里的单词做停词过滤,在reducer里对白名单的单词数量加5。

    >>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def mapper(resources):
    >>>     stop_words = set(r[0].strip() for r in resources[0])
    >>>     def h(row):
    >>>         for word in row[0].split():
    >>>             if word not in stop_words:
    >>>                 yield word, 1
    >>>     return h
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def reducer(resources):
    >>>     d = dict()
    >>>     d['white_list'] = set(word.strip() for word in resources[0])
    >>>     d['cnt'] = 0
    >>>     def inner(keys):
    >>>         d['cnt'] = 0
    >>>         def h(row, done):
    >>>             d['cnt'] += row.cnt
    >>>             if done:
    >>>                 if row.word in d['white_list']:
    >>>                     d['cnt'] += 5
    >>>                 yield keys.word, d['cnt']
    >>>         return h
    >>>     return inner
    >>> words_df.map_reduce(mapper, reducer, group='word',
    >>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
         word  cnt
    0   hello    2
    1    life    1
    2  python    7
    3   world    6
    4   short    1
    5     use    1
    

    布隆过滤器

    PyODPS DataFrame提供了 bloom_filter 接口来进行布隆过滤器的计算。

    给定某个collection,和它的某个列计算的sequence1,我们能对另外一个sequence2进行布隆过滤,sequence1不在sequence2中的一定会过滤, 但可能不能完全过滤掉不存在于sequence2中的数据,这也是一种近似的方法。

    这样的好处是能快速对collection进行快速过滤一些无用数据。

    这在大规模join的时候,一边数据量远大过另一边数据,而大部分并不会join上的场景很有用。 比如,我们在join用户的浏览数据和交易数据时,用户的浏览大部分不会带来交易,我们可以利用交易数据先对浏览数据进行布隆过滤, 然后再join能很好提升性能。

    >>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
    0  name1  1
    1  name2  2
    2  name3  3
    3  name1  4
    >>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
    0  name1
    >>> df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'
    0  name1  1
    1  name1  4
    

    这里由于数据量很小,df1中的a为name2和name3的行都被正确过滤掉了,当数据量很大的时候,可能会有一定的数据不能被过滤。

    如之前提的join场景中,少量不能过滤并不能并不会影响正确性,但能较大提升join的性能。

    我们可以传入 capacityerror_rate 来设置数据的量以及错误率,默认值是 30000.01

    要注意,调大 capacity 或者减小 error_rate 会增加内存的使用,所以应当根据实际情况选择一个合理的值。

    透视表(pivot_table)

    PyODPS DataFrame提供透视表的功能。我们通过几个例子来看使用。

    A B C D E 0 foo one small 1 3 1 foo one large 2 4 2 foo one large 2 5 3 foo two small 3 6 4 foo two small 3 4 5 bar one large 4 5 6 bar one small 5 3 7 bar two small 6 2 8 bar two large 7 1

    最简单的透视表必须提供一个 rows 参数,表示按一个或者多个字段做取平均值的操作。

    >>> df['A', 'D', 'E'].pivot_table(rows='A')
         A  D_mean  E_mean
    0  bar     5.5    2.75
    1  foo     2.2    4.40
    

    rows可以提供多个,表示按多个字段做聚合。

    >>> df.pivot_table(rows=['A', 'B', 'C'])
         A    B      C  D_mean  E_mean
    0  bar  one  large     4.0     5.0
    1  bar  one  small     5.0     3.0
    2  bar  two  large     7.0     1.0
    3  bar  two  small     6.0     2.0
    4  foo  one  large     2.0     4.5
    5  foo  one  small     1.0     3.0
    6  foo  two  small     3.0     5.0
    

    我们可以指定 values 来显示指定要计算的列。

    >>> df.pivot_table(rows=['A', 'B'], values='D')
         A    B    D_mean
    0  bar  one  4.500000
    1  bar  two  6.500000
    2  foo  one  1.666667
    3  foo  two  3.000000
    

    计算值列时,默认会计算平均值,用户可以指定一个或者多个聚合函数。

    >>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
         A    B    D_mean  D_count  D_sum
    0  bar  one  4.500000        2      9
    1  bar  two  6.500000        2     13
    2  foo  one  1.666667        3      5
    3  foo  two  3.000000        2      6
    

    我们也可以把原始数据的某一列的值,作为新的collection的列。 这也是透视表最强大的地方。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
         A    B  large_D_mean  small_D_mean
    0  bar  one           4.0           5.0
    1  bar  two           7.0           6.0
    2  foo  one           2.0           1.0
    3  foo  two           NaN           3.0
    

    我们可以提供 fill_value 来填充空值。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
         A    B  large_D_mean  small_D_mean
    0  bar  one             4             5
    1  bar  two             7             6
    2  foo  one             2             1
    3  foo  two             0             3
    

    可以通过 extract_kv 方法将 Key-Value 字段展开:

    >>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    NaN   10.0    NaN    NaN
    1  name1    7.0    NaN    NaN    NaN    8.2    NaN
    2  name2    NaN    1.2    1.5    NaN    NaN    NaN
    3  name2    NaN    1.0    NaN    NaN    NaN    1.1
    

    其中,需要展开的字段名由 columns 指定,Key 和 Value 之间的分隔符,以及 Key-Value 对之间的分隔符分别由 kv_delim 和 item_delim 这两个参数指定,默认分别为半角冒号和半角逗号。输出的字段名为原字段名和 Key 值的组合,通过“_”相连。缺失值默认为 None,可通过 fill_value 选择需要填充的值。例如,相同的 df,

    >>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    0.0   10.0    0.0    0.0
    1  name1    7.0    0.0    0.0    0.0    8.2    0.0
    2  name2    0.0    1.2    1.5    0.0    0.0    0.0
    3  name2    0.0    1.0    0.0    0.0    0.0    1.1
    

    extract_kv 默认输出类型为 float。如果需要输出其他类型,可以指定 dtype 参数,例如

    df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0, dtype='string')
    

    DataFrame 也支持将多列数据转换为一个 Key-Value 列。例如,

    name k1 k2 k3 k5 k7 k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1

    可通过 to_kv 方法转换为 Key-Value 表示的格式:

    >>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1