添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
import dask dask . config . set ({ 'dataframe.query-planning' : False }) import dask.dataframe as dd import pandas as pd from dask.distributed import LocalCluster , Client cluster = LocalCluster () client = Client ( cluster )
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51899 instead
  warnings.warn(
df = pd.DataFrame({
   'A': ['foo', 'bar', 'baz', 'qux'],
   'B': ['one', 'one', 'two', 'three'],
   'C': [1, 2, 3, 4],
   'D': [10, 20, 30, 40]

有序行索引#

Dask DataFrame 由多个 pandas DataFrame 组成,但如何在全局维度维护整个 Dask DataFrame 行标签和行顺序是一个很大的挑战。Dask DataFrame 并没有刻意保留全局有序性,也使得它无法支持所有 pandas DataFrame 的功能。

图 4.3 所示,Dask DataFrame 在切分时有 divisions

图 4.3 Dask DataFrame 的 divisions#

以 Dask 提供的样例数据函数 dask.datasets.timeseries 为例,它生成了时间序列,使用时间戳作为行标签,每个 Partition 的边界都被记录下来,存储在 .divisions 里。len(divisons) 等于 npartitions + 1

ts_df = dask.datasets.timeseries("2018-01-01", "2023-01-01")
print(f"df.npartitions: {ts_df.npartitions}")
print(f"df.divisions: {len(ts_df.divisions)}")
folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
flights_ddf = dd.read_csv(file_path,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
flights_ddf.divisions
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
(None, None, None, None, None, None, None)

因为没有记录每个 Partition 有多少条数据,Dask DataFrame 无法很好地支持一些操作,比如 median() 这样的百分位操作,因为这些操作需要:(1) 对数据排序;(2) 定位到特定的行。

try:
    flights_ddf['DepDelay'].median()
except Exception as e:
    print(f"{type(e).__name__}, {e}")

set_index()#

在 Dask DataFrame 中,我们可以使用 set_index() 方法手动设置某一列为索引列,这个操作除了设置某个字段为索引列,还会根据这个字段对全局数据进行排序,它打乱了原来每个 Partition 的数据排序,因此会有很高的成本。

下面的例子展示了 set_index() 带来的变化:

def print_partitions(ddf):
    for i in range(ddf.npartitions):
        print(ddf.partitions[i].compute())
df = pd.DataFrame(
    {"col1": ["01", "05", "02", "03", "04"], "col2": ["a", "b", "c", "d", "e"]}
ddf = dd.from_pandas(df, npartitions=2)
print_partitions(ddf)
2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911
2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'
01      a
2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912
2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'
02      c
03      d
04      e
05      b

这个例子设置 col1 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。set_index() 也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 章节 4.4 中介绍 Shuffle 的计算过程和成本。

回到时间序列数据,该数据使用时间戳作为索引列。下面使用了两种方式对这份数据 set_index()。第一种没有设置 divisions,第二种设置了 divisions

第一种不设置 divisions 耗时很长,因为 Dask DataFrame 计算了所有 Partiton 的数据分布,并根据分布重排列了所有的 Partition,可以看到,Partition 的数目也发生了变化。

%%time
ts_df1 = ts_df.set_index("id")
nu =  ts_df1.loc[[1001]].name.nunique().compute()
print(f"before set_index npartitions: {ts_df.npartitions}")
print(f"after set_index npartitions: {ts_df1.npartitions}")
2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'
before set_index npartitions: 1826
after set_index npartitions: 165
CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s
Wall time: 20.6 s
2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'
CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s
Wall time: 11.9 s

所以 Dask DataFrame 要慎重使用 set_index(),如果 set_index() 之后有很多以下操作,可以考虑使用 set_index()

  • 使用 loc 对索引列进行过滤

  • 两个 Dask DataFrame 在索引列上合并(merge()

  • 在索引列上进行分组聚合(groupby()

  • reset_index()#

    在 pandas 中,groupby 默认 as_index=True,分组字段经过 groupby() 之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 Series。比如下面 pandas 的例子,Origin 列就是分组字段,如果不设置 as_index=Falsegroupby("Origin", as_index=False)["DepDelay"].mean() 生成的是一个 Series

    # pandas
    file_path = os.path.join(folder_path, "1991.csv")
    pdf = pd.read_csv(file_path,
                     parse_dates={'Date': [0, 1, 2]},
                     dtype={'TailNum': object,
                            'CRSElapsedTime': float,
                            'Cancelled': bool})
    uncancelled_pdf = pdf[pdf["Cancelled"] == False]
    avg_pdf = uncancelled_pdf.groupby("Origin", as_index=False)["DepDelay"].mean()
    avg_pdf.columns = ["Origin", "AvgDepDelay"]
    avg_pdf.sort_values("AvgDepDelay")
    
    /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
      pdf = pd.read_csv(file_path,
    
    avg_pdf = uncancelled_pdf.groupby("Origin")["DepDelay"].mean().reset_index()
    avg_pdf.columns = ["Origin", "AvgDepDelay"]
    avg_pdf.sort_values("AvgDepDelay")
    
    uncancelled_ddf = flights_ddf[flights_ddf["Cancelled"] == False]
    avg_ddf = uncancelled_ddf.groupby("Origin")["DepDelay"].mean().reset_index()
    avg_ddf.columns = ["Origin", "AvgDepDelay"]
    avg_ddf = avg_ddf.compute()
    # pandas 只使用了一年数据,因此结果不一样
    avg_ddf.sort_values("AvgDepDelay")
    
    /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
      df = reader(bio, **kwargs)
    /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
      df = reader(bio, **kwargs)
    /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
      df = reader(bio, **kwargs)
    /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
      df = reader(bio, **kwargs)