添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

使用joblib库,通过并发提升pandas计算效率

对于计算密集型在使用了apply等效率优化方式后,通过并发进行优化,可以进一步提升效率。 joblib是一种同步阻塞式的多进程库,可以实现并发和大量磁盘数据的读写。适用于分块后各块相对独立的场景。

一、使用场景:

由于建立多进程存在开销和各进程间的协助以及通信问题问题,因此满足以下场景使用多进程使用才能加速,否则效率可能低于非多进程方法或者结果错误。

1、数据集较大。通常大于1w行/列,至少大于5000行才有必要使用多进程加速。

2、数据集可以分割,且分割后的各个块计算逻辑不交叉,典型的应用场景是按行分割或者按列分割。按行列分块后,如果各块计算逻辑独立也可以应用多进程。

3、计算逻辑耗时。对于简单的判断、四则运算、内置函数计算等,在数据集不是超大的情况下,使用多进程无法加速,甚至会降低效率。

二、使用方法

多进程流程如下图

joblib库多进程使用方法为

详细版:

list_np = df_dflist(df1,9)
tasks = [delayed(函数)(参数) for 参数 in list_np]
res=Parallel(n_jobs=3)(tasks)
data = pd.concat(res)

简化版:

res = Parallel(n_jobs=3)([delayed(f)(x,y) for x,y in zip(x_list,y_list)] ) 
data = pd.concat(res)

函数版:

#以3个核分为9个块为例
def func_pal(df,func):
    list_np = df_dflist(df,9)
    tasks = [delayed(func)(参数) for 参数 in list_np]
    works_pal = Parallel(n_jobs=3)
    res = works_pal(tasks)
    data = pd.concat(res)
    return data

具体步骤如下,其中步骤一和二实现拆分任务功能,步骤三实现多cpu并行分别完成任务功能,步骤四完成结果合并功能。

步骤一、数据集分割,形成可以并发的数据集列表。使用numpy的array_split函数,尽量对原始数据进行均匀切割,否则这种同步阻塞式的多进程库的时间要等到子任务中耗时最长的任务的结束后,所有子任务才会一并返回结果。

函数1、将DataFrame分割后,将各块数据放入列表,列表元素为DataFrame。

def df_dflist(pf_sp,n):
    col=df.columns
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)   
    ls_df=[pd.DataFrame(i,columns=col) for i in ls_np]
    return ls_df  


函数2、将DataFrame分割后,将各块数据放入列表,列表元素为np。用于后续处理数据为np的场景。

def df_nplist(pf_sp,n):
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)
    return ls_np

步骤二、建立任务列表。使用delayed函数函数建立任务列表。

tasks = [delayed(func)(i) for i in data_list ]

delayed(func)(i) :func为要多进程执行的函数,i为func的参数。delayed(func)(i) 为链式调用,即delayed(func)返回一个可迭代对象(这里的可迭代对象是函数,结果还可以包括其他不可迭代的返回值),i是返回的可迭代对象的参数。delayed(func)(i)通过内部构造返回一个(函数,args,kwargs)这样的元组,但是并不执行函数,留待后续执行,猜测这也是函数命名为delayed的意义。

[delayed(func)(i) for i in data_list ]通过列表推导式,我们可以为列表中的每一个元素(func,args,kwargs),通过Parallel分配给不同的进程,在每个进程里面执行一个func(args,kwargs)。其中data_list表示存放步骤一种各分块数据集的列表。

步骤三、并发计算。启动并执行并发

通过Parallel函数完成初始并发配置、启动(进程的创建)和执行并发。

启动并发

works_pal = Parallel(n_jobs=3)

执行并发

res = works_pal(tasks)

启动和执行并发可以合并为:

res=Parallel(n_jobs=3)(tasks)

步骤四、合并结果

data = pd.concat(res)


三、参数选择

参数的选择决定了并发是否能够执行以及效果,重要的参数有以下几个

3.1、任务数量。任务列表中的任务数量要与cpu数量匹配效果最好,例如我的windows系统i5 4核处理器经过测试,使用joblib一般要设置为3*并发cpu数时效果最好(与默认的pre_dispatch数量一致)。

3.2、并发cpu数量。Parallel函数中n_jobs设置了并发cpu数量,n_jobs为正表示用几个cpu并发,n_jobs=-1表示使用全部cpu,n_jobs=-2表示使用比全部cpu少一个,依次类推。

3.3、并行实现方式。通过backend(backend: str, default: 'loky') 参数指定并行化后端的实现方法。本机测试时,默认值loky效果最好;取值'threading'时没有启动多进程,为多线程,对于计算密集型任务执行时间与单进程一致或者更低;取值multiprocessing时进程卡死无法执行。

3.4、pre_dispatch和batch_size。均使用默认值,其中pre_dispatch默认值是3*n_jobs(经过测试3*n_jobs效果最好,这也是任务数量要设置成3*n_jobs的原因),batch_size默认是'auto'。

四、多个参数时使用zip打包函数,输入参数

多个参数都并行

results = Parallel(n_jobs=num_cores)(delayed(function)(i,j) for i,j in zip(a,b))

只有部分参数并行

在链式调用的参数元组中(i,j,常量)写上全部参数,在生成列表推导式的for 后面只加上需要并行的参数。

results = Parallel(n_jobs=num_cores)(delayed(function)(i,j,常量) for i,j in zip(a,b))


五、典型应用

使用多进程加速pandas中按行/列遍历的效率。

应用一:加速apply函数

本案例中,在一个4万行100列的数据集上通过曼—肯德尔法计算各列数据的变化趋势。本场景满足以下3个条件,可以用多进程进行加速

1、数据集较大;

2、数据集可以分割,按行分块后,各块计算逻辑独立。

3、计算逻辑耗时,曼—肯德尔法计算需要消耗一定的时间。

具体步骤为:

1、构建数据集分块函数

2、构建业务函数,包括业务逻辑函数、apply应用函数以及将apply过程并行的函数

3、五步法实现并行计算,其中第一步分割数据,后四部进行并行计算。

import pandas as pd
import math
from scipy.stats import norm, mstats
import numpy as np
import pymannkendall as mk
import time
from joblib import Parallel, delayed
#构建数据集分块函数
def df_dflist(pf_sp,n):
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)   
    ls_df=[pd.DataFrame(i) for i in ls_np]
    return ls_df
#曼—肯德尔法计算
def mk_(data):
    result=mk.original_test(data)
    return (result.trend,result.z,result.slope)
#构建apply中需要在各行应用的函数。
def srmk(sr):
    sr_1 = sr[1:100]
    n=len(sr_1)
    if n>1:    #只有一行的数据大于等于2时才可以计算趋势和相关性等。空行或单值不计算。
        srnp=sr_1.values
        trd=mk_(srnp)
        ls=[trd[0],trd[1],trd[2]]
        return ls
#构建apply多进程函数
def srmk_apply_pal(df):
    df[["趋势","强度","斜率"]]=df.apply(lambda x:srmk(x),axis=1,result_type="expand")
    return df
#读取数据
df1=pd.read_excel(r"D:\data\xx.xlsx")
#五步法实现并行计算
list_np = df_dflist(df1,9)
tasks = [delayed(srmk_apply_pal)(group) for group in list_np]
works_2 = Parallel(n_jobs=3)
res = works_2(tasks)
data = pd.concat(res)
#写入数据
data.to_excel(r"D:\data\xx.xlsx")

在以上案例中,单核约运行150s,使用4核时可以达到40s左右,使用3核时非最佳配置约为55s左后,使用最佳配置 任务数=3*n_jobs 也即分为9个块时,执行最快可以达到45s左右。


六、与multiprocessing pool的对比

multiprocessing pool

优点:使用更灵活,性能基本与joblib相同,略有优势。

缺点:代码繁琐。需要注意的点多,如get()阻塞进程,需要统一获取值 close()需要放在jion前等。

multiprocessing代码:

pool = Pool(4) #线程池中的同时执行的进程数为3
#res=pool.map_async(read_excel_pal,lst_r).get()
res=[]
for i in lst_r:
    p1=pool.apply_async(read_excel_pal,args=(path_xlsx,i,))
    res.append(p1)
#统一get获得结果,否则会get会阻塞,只有一个核心运行。