>>> iris.sort('sepalwidth').head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.2 2.2 4.5 1.5 Iris-versicolor
2 6.0 2.2 5.0 1.5 Iris-virginica
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 5.5 2.3 4.0 1.3 Iris-versicolor
如果想要降序排列,则可以使用参数ascending
,并设为False。
>>> iris.sort('sepalwidth', ascending=False).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.5 4.2 1.4 0.2 Iris-setosa
2 5.2 4.1 1.5 0.1 Iris-setosa
3 5.8 4.0 1.2 0.2 Iris-setosa
4 5.4 3.9 1.3 0.4 Iris-setosa
也可以这样调用,来进行降序排列:
>>> iris.sort(-iris.sepalwidth).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.7 4.4 1.5 0.4 Iris-setosa
1 5.5 4.2 1.4 0.2 Iris-setosa
2 5.2 4.1 1.5 0.1 Iris-setosa
3 5.8 4.0 1.2 0.2 Iris-setosa
4 5.4 3.9 1.3 0.4 Iris-setosa
多字段排序也很简单:
>>> iris.sort(['sepalwidth', 'petallength']).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 4.0 1.0 Iris-versicolor
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 5.0 1.5 Iris-virginica
4 4.5 2.3 1.3 0.3 Iris-setosa
多字段排序时,如果是升序降序不同,ascending
参数可以传入一个列表,长度必须等同于排序的字段,它们的值都是boolean类型
>>> iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 5.0 1.5 Iris-virginica
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 6.3 2.3 4.4 1.3 Iris-versicolor
下面效果是一样的:
>>> iris.sort(['sepalwidth', -iris.petallength]).head(5)
sepallength sepalwidth petallength petalwidth name
0 5.0 2.0 3.5 1.0 Iris-versicolor
1 6.0 2.2 5.0 1.5 Iris-virginica
2 6.2 2.2 4.5 1.5 Iris-versicolor
3 6.0 2.2 4.0 1.0 Iris-versicolor
4 6.3 2.3 4.4 1.3 Iris-versicolor
由于 ODPS 要求排序必须指定个数,所以在 ODPS 后端执行时,
会通过 options.df.odps.sort.limit
指定排序个数,这个值默认是 10000,
如果要排序尽量多的数据,可以把这个值设到较大的值。不过注意,此时可能会导致 OOM。
去重在Collection上,用户可以调用distinct方法。
>>> iris[['name']].distinct()
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
>>> iris.distinct('name')
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
>>> iris.distinct('name', 'sepallength').head(3)
name sepallength
0 Iris-setosa 4.3
1 Iris-setosa 4.4
2 Iris-setosa 4.5
在Sequence上,用户可以调用unique,但是记住,调用unique的Sequence不能用在列选择中。
>>> iris.name.unique()
0 Iris-setosa
1 Iris-versicolor
2 Iris-virginica
下面的代码是错误的用法。
>>> iris[iris.name, iris.name.unique()] # 错误的
除了按份数采样外,其余方法如果要在 ODPS DataFrame 上执行,需要 Project 支持 XFlow,否则,这些方法只能在
Pandas DataFrame 后端上执行。
按份数采样
在这种采样方式下,数据被分为 parts
份,可选择选取的份数序号。
>>> iris.sample(parts=10) # 分成10份,默认取第0份
>>> iris.sample(parts=10, i=0) # 手动指定取第0份
>>> iris.sample(parts=10, i=[2, 5]) # 分成10份,取第2和第5份
>>> iris.sample(parts=10, columns=['name', 'sepalwidth']) # 根据name和sepalwidth的值做采样
按比例 / 条数采样
在这种采样方式下,用户指定需要采样的数据条数或采样比例。指定 replace
参数为 True 可启用放回采样。
>>> iris.sample(n=100) # 选取100条数据
>>> iris.sample(frac=0.3) # 采样30%的数据
按权重列采样
在这种采样方式下,用户指定权重列和数据条数 / 采样比例。指定 replace
参数为 True 可启用放回采样。
>>> iris.sample(n=100, weights='sepal_length')
>>> iris.sample(n=100, weights='sepal_width', replace=True)
在这种采样方式下,用户指定用于分层的标签列,同时为需要采样的每个标签指定采样比例( frac
参数)或条数
( n
参数)。暂不支持放回采样。
>>> iris.sample(strata='category', n={'Iris Setosa': 10, 'Iris Versicolour': 10})
>>> iris.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})
min_max_scale 还支持使用 feature_range 参数指定输出值的范围,例如,如果我们需要使输出值在 (-1, 1)
范围内,可使用
>>> df.min_max_scale(columns=['fid'], feature_range=(-1, 1))
name id fid
0 name1 4 1.000000
1 name2 2 0.052632
2 name2 3 -1.000000
3 name1 4 0.421053
4 name1 3 -0.631579
5 name1 3 0.368421
如果需要保留原始值,可以使用 preserve 参数。此时,缩放后的数据将会以新增列的形式追加到数据中,
列名默认为原列名追加“_scaled”后缀,该后缀可使用 suffix 参数更改。例如,
>>> df.min_max_scale(columns=['fid'], preserve=True)
name id fid fid_scaled
0 name1 4 5.3 1.000000
1 name2 2 3.5 0.526316
2 name2 3 1.5 0.000000
3 name1 4 4.2 0.710526
4 name1 3 2.2 0.184211
5 name1 3 4.1 0.684211
min_max_scale 也支持使用 group 参数指定一个或多个分组列,在分组列中分别取最值进行缩放。例如,
>>> df.min_max_scale(columns=['fid'], group=['name'])
name id fid
0 name1 4 1.000000
1 name1 4 0.645161
2 name1 3 0.000000
3 name1 3 0.612903
4 name2 2 1.000000
5 name2 3 0.000000
可见结果中,name1 和 name2 两组均按组中的最值进行了缩放。
std_scale 可依照标准正态分布对数据进行调整。例如,
>>> df.std_scale(columns=['fid'])
name id fid
0 name1 4 1.436467
1 name2 2 0.026118
2 name2 3 -1.540938
3 name1 4 0.574587
4 name1 3 -0.992468
5 name1 3 0.496234
std_scale 同样支持 preserve 参数保留原始列以及使用 group 进行分组,具体请参考 min_max_scale,此处不再赘述。
空值处理
DataFrame 支持筛去空值以及填充空值的功能。例如,对数据
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
使用 dropna 可删除 subset 中包含空值的行:
>>> df.dropna(subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 5 name1 1.0 2.0 3.0 4.0
如果行中包含非空值则不删除,可以使用 how=’all’:
>>> df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
你也可以使用 thresh 参数来指定行中至少要有多少个非空值。例如:
>>> df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
使用 fillna 可使用常数或已有的列填充未知值。下面给出了使用常数填充的例子:
>>> df.fillna(100, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 100.0 3.0 4.0
1 1 name1 2.0 100.0 100.0 1.0
2 2 name1 3.0 4.0 1.0 100.0
3 3 name1 100.0 1.0 2.0 3.0
4 4 name1 1.0 100.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 100.0 100.0 100.0 100.0
你也可以使用一个已有的列来填充未知值。例如:
>>> df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 NaN 3.0 4.0
1 1 name1 2.0 NaN NaN 1.0
2 2 name1 3.0 4.0 1.0 4.0
3 3 name1 1.0 1.0 2.0 3.0
4 4 name1 1.0 NaN 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
特别地,DataFrame 提供了向前 / 向后填充的功能。通过指定 method 参数为下列值可以达到目的:
>>> df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 3.0 3.0 4.0
1 1 name1 2.0 1.0 1.0 1.0
2 2 name1 3.0 4.0 1.0 NaN
3 3 name1 1.0 1.0 2.0 3.0
4 4 name1 1.0 3.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
>>> df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
id name f1 f2 f3 f4
0 0 name1 1.0 1.0 3.0 4.0
1 1 name1 2.0 2.0 2.0 1.0
2 2 name1 3.0 4.0 1.0 1.0
3 3 name1 NaN 1.0 2.0 3.0
4 4 name1 1.0 1.0 3.0 4.0
5 5 name1 1.0 2.0 3.0 4.0
6 6 name1 NaN NaN NaN NaN
你也可以使用 ffill / bfill 函数来简化代码。ffill 等价于 fillna(method=’ffill’),
bfill 等价于 fillna(method=’bfill’)
对所有行/列调用自定义函数
对一行数据使用自定义函数
要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。
apply 的自定义函数接收一个参数,为上一步 Collection 的一行数据,用户可以通过属性、或者偏移取得一个字段的数据。
>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
sepaladd
0 8.6
1 7.9
2 7.9
reduce
为 True 时,表示返回结果为Sequence,否则返回结果为Collection。
names
和 types
参数分别指定返回的Sequence或Collection的字段名和类型。
如果类型不指定,将会默认为string类型。
在 apply 的自定义函数中,reduce 为 False 时,也可以使用 yield
关键字来返回多行结果。
>>> iris.count()
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
我们也可以在函数上来注释返回的字段和类型,这样就不需要在函数调用时再指定。
>>> from odps.df import output
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris.apply(handle, axis=1).count()
>>> iris.apply(handle, axis=1).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
也可以使用 map-only 的 map_reduce,和 axis=1 的apply操作是等价的。
>>> iris.map_reduce(mapper=handle).count()
>>> iris.map_reduce(mapper=handle).head(5)
iris_add iris_sub
0 1.6 8.6
1 1.2 1.6
2 1.9 7.9
3 1.2 1.6
4 1.5 7.9
如果想调用 ODPS 上已经存在的 UDTF,则函数指定为函数名即可。
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
使用 apply 对行操作,且 reduce
为 False 时,可以使用 并列多行输出
与已有的行结合,用于后续聚合等操作。
>>> from odps.df import output
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris[iris.category, iris.apply(handle, axis=1)]
对所有列调用自定义聚合
调用apply方法,当我们不指定axis,或者axis为0的时候,我们可以通过传入一个自定义聚合类来对所有sequence进行聚合操作。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
目前,受限于 Python UDF,自定义函数无法支持将 list / dict 类型作为初始输入或最终输出结果。
由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。
请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源,如下文所述。
引用资源
类似于对 map 方法的resources参数,每个resource可以是ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。
对于axis为1,也就是在行上操作,我们需要写一个函数闭包或者callable的类。
而对于列上的聚合操作,我们只需在 __init__ 函数里读取资源即可。
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
可以看到这里的stop_words是存放于本地,但在真正执行时会被上传到ODPS作为资源引用。
使用第三方Python库
现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件,
即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。
PyODPS 提供了 pyodps-pack
工具,可在安装完 PyODPS 后打包三方包及其依赖。同时,execute / persist / to_pandas
方法支持增加 libraries
参数以使用这些资源。如何制作及使用三方包的说明请参考 这里。
由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from
)时,代码在使用 Python 2.7 的 ODPS
Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常
MapReduce API
PyODPS DataFrame也支持MapReduce API,用户可以分别编写map和reduce函数(map_reduce可以只有mapper或者reducer过程)。
我们来看个简单的wordcount的例子。
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>> def reducer(keys):
>>> # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>> cnt = [0]
>>> def h(row, done): # done表示这个key已经迭代结束
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>> mapper_output_names=['word', 'cnt'],
>>> mapper_output_types=['string', 'int'],
>>> reducer_output_names=['word', 'cnt'],
>>> reducer_output_types=['string', 'int'])
word cnt
0 hello 2
1 i 1
2 is 1
3 life 1
4 python 2
5 short 1
6 use 1
7 world 1
group参数用来指定reduce按哪些字段做分组,如果不指定,会按全部字段做分组。
其中对于reducer来说,会稍微有些不同。它需要接收聚合的keys初始化,并能继续处理按这些keys聚合的每行数据。
第2个参数表示这些keys相关的所有行是不是都迭代完成。
这里写成函数闭包的方式,主要为了方便,当然我们也能写成callable的类。
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done表示这个key已经迭代结束
self.cnt += row.cnt
if done:
yield row.word, self.cnt
使用 output
来注释会让代码更简单些。
>>> from odps.df import output
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>> cnt = [0]
>>> def h(row, done):
# done表示这个key已经迭代结束
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>> words_df.map_reduce(mapper, reducer, group='word')
word cnt
0 hello 2
1 i 1
2 is 1
3 life 1
4 python 2
5 short 1
6 use 1
7 world 1
有时候我们在迭代的时候需要按某些列排序,则可以使用 sort
参数,来指定按哪些列排序,升序降序则通过 ascending
参数指定。
ascending
参数可以是一个bool值,表示所有的 sort
字段是相同升序或降序,
也可以是一个列表,长度必须和 sort
字段长度相同。
指定combiner
combiner表示在map_reduce API里表示在mapper端,就先对数据进行聚合操作,它的用法和reducer是完全一致的,但不能引用资源。
并且,combiner的输出的字段名和字段类型必须和mapper完全一致。
上面的例子,我们就可以使用reducer作为combiner来先在mapper端对数据做初步的聚合,减少shuffle出去的数据量。
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
使用第三方Python库
现在用户可以把第三方 Wheel 包作为资源上传到 MaxCompute。在全局或者在立即执行的方法时,指定需要使用的包文件,
即可以在自定义函数中使用第三方库。值得注意的是,第三方库的依赖库也必须指定,否则依然会有导入错误。
PyODPS 提供了 pyodps-pack
工具,可在安装完 PyODPS 后打包三方包及其依赖。同时,execute / persist / to_pandas
方法支持增加 libraries
参数以使用这些资源。如何制作及使用三方包的说明请参考 这里。
由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from
)时,代码在使用 Python 2.7 的 ODPS
Worker 上执行时会发生错误。因而建议在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常
由于 PyODPS DataFrame 默认 Collection / Sequence 等对象均为分布式对象,故不支持在自定义函数内部引用这些对象。
请考虑改用 Join 等方法 引用多个 DataFrame 的数据,或者引用 Collection 作为资源。
引用资源
在MapReduce API里,我们能分别指定mapper和reducer所要引用的资源。
如下面的例子,我们对mapper里的单词做停词过滤,在reducer里对白名单的单词数量加5。
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>> stop_words = set(r[0].strip() for r in resources[0])
>>> def h(row):
>>> for word in row[0].split():
>>> if word not in stop_words:
>>> yield word, 1
>>> return h
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>> d = dict()
>>> d['white_list'] = set(word.strip() for word in resources[0])
>>> d['cnt'] = 0
>>> def inner(keys):
>>> d['cnt'] = 0
>>> def h(row, done):
>>> d['cnt'] += row.cnt
>>> if done:
>>> if row.word in d['white_list']:
>>> d['cnt'] += 5
>>> yield keys.word, d['cnt']
>>> return h
>>> return inner
>>> words_df.map_reduce(mapper, reducer, group='word',
>>> mapper_resources=[stop_words], reducer_resources=[white_list_file])
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1
布隆过滤器
PyODPS DataFrame提供了 bloom_filter
接口来进行布隆过滤器的计算。
给定某个collection,和它的某个列计算的sequence1,我们能对另外一个sequence2进行布隆过滤,sequence1不在sequence2中的一定会过滤,
但可能不能完全过滤掉不存在于sequence2中的数据,这也是一种近似的方法。
这样的好处是能快速对collection进行快速过滤一些无用数据。
这在大规模join的时候,一边数据量远大过另一边数据,而大部分并不会join上的场景很有用。
比如,我们在join用户的浏览数据和交易数据时,用户的浏览大部分不会带来交易,我们可以利用交易数据先对浏览数据进行布隆过滤,
然后再join能很好提升性能。
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
0 name1 1
1 name2 2
2 name3 3
3 name1 4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
0 name1
>>> df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'
0 name1 1
1 name1 4
这里由于数据量很小,df1中的a为name2和name3的行都被正确过滤掉了,当数据量很大的时候,可能会有一定的数据不能被过滤。
如之前提的join场景中,少量不能过滤并不能并不会影响正确性,但能较大提升join的性能。
我们可以传入 capacity
和 error_rate
来设置数据的量以及错误率,默认值是 3000
和 0.01
。
要注意,调大 capacity
或者减小 error_rate
会增加内存的使用,所以应当根据实际情况选择一个合理的值。
透视表(pivot_table)
PyODPS DataFrame提供透视表的功能。我们通过几个例子来看使用。
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1
最简单的透视表必须提供一个 rows
参数,表示按一个或者多个字段做取平均值的操作。
>>> df['A', 'D', 'E'].pivot_table(rows='A')
A D_mean E_mean
0 bar 5.5 2.75
1 foo 2.2 4.40
rows可以提供多个,表示按多个字段做聚合。
>>> df.pivot_table(rows=['A', 'B', 'C'])
A B C D_mean E_mean
0 bar one large 4.0 5.0
1 bar one small 5.0 3.0
2 bar two large 7.0 1.0
3 bar two small 6.0 2.0
4 foo one large 2.0 4.5
5 foo one small 1.0 3.0
6 foo two small 3.0 5.0
我们可以指定 values
来显示指定要计算的列。
>>> df.pivot_table(rows=['A', 'B'], values='D')
A B D_mean
0 bar one 4.500000
1 bar two 6.500000
2 foo one 1.666667
3 foo two 3.000000
计算值列时,默认会计算平均值,用户可以指定一个或者多个聚合函数。
>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
A B D_mean D_count D_sum
0 bar one 4.500000 2 9
1 bar two 6.500000 2 13
2 foo one 1.666667 3 5
3 foo two 3.000000 2 6
我们也可以把原始数据的某一列的值,作为新的collection的列。 这也是透视表最强大的地方。
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
A B large_D_mean small_D_mean
0 bar one 4.0 5.0
1 bar two 7.0 6.0
2 foo one 2.0 1.0
3 foo two NaN 3.0
我们可以提供 fill_value
来填充空值。
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
A B large_D_mean small_D_mean
0 bar one 4 5
1 bar two 7 6
2 foo one 2 1
3 foo two 0 3
可以通过 extract_kv 方法将 Key-Value 字段展开:
>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
其中,需要展开的字段名由 columns 指定,Key 和 Value 之间的分隔符,以及 Key-Value 对之间的分隔符分别由
kv_delim 和 item_delim 这两个参数指定,默认分别为半角冒号和半角逗号。输出的字段名为原字段名和 Key
值的组合,通过“_”相连。缺失值默认为 None,可通过 fill_value
选择需要填充的值。例如,相同的 df,
>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 0.0 10.0 0.0 0.0
1 name1 7.0 0.0 0.0 0.0 8.2 0.0
2 name2 0.0 1.2 1.5 0.0 0.0 0.0
3 name2 0.0 1.0 0.0 0.0 0.0 1.1
extract_kv 默认输出类型为 float
。如果需要输出其他类型,可以指定 dtype
参数,例如
df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0, dtype='string')
DataFrame 也支持将多列数据转换为一个 Key-Value 列。例如,
name k1 k2 k3 k5 k7 k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
可通过 to_kv 方法转换为 Key-Value 表示的格式:
>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1