/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51899 instead
warnings.warn(
df = pd.DataFrame({
'A': ['foo', 'bar', 'baz', 'qux'],
'B': ['one', 'one', 'two', 'three'],
'C': [1, 2, 3, 4],
'D': [10, 20, 30, 40]
有序行索引
Dask DataFrame 由多个 pandas DataFrame 组成,但如何在全局维度维护整个 Dask DataFrame 行标签和行顺序是一个很大的挑战。Dask DataFrame 并没有刻意保留全局有序性,也使得它无法支持所有 pandas DataFrame 的功能。
如 图 4.3 所示,Dask DataFrame 在切分时有 divisions
。
图 4.3 Dask DataFrame 的 divisions
以 Dask 提供的样例数据函数 dask.datasets.timeseries
为例,它生成了时间序列,使用时间戳作为行标签,每个 Partition 的边界都被记录下来,存储在 .divisions
里。len(divisons)
等于 npartitions + 1
。
ts_df = dask.datasets.timeseries("2018-01-01", "2023-01-01")
print(f"df.npartitions: {ts_df.npartitions}")
print(f"df.divisions: {len(ts_df.divisions)}")
folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
flights_ddf = dd.read_csv(file_path,
parse_dates={'Date': [0, 1, 2]},
dtype={'TailNum': object,
'CRSElapsedTime': float,
'Cancelled': bool})
flights_ddf.divisions
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
(None, None, None, None, None, None, None)
因为没有记录每个 Partition 有多少条数据,Dask DataFrame 无法很好地支持一些操作,比如 median()
这样的百分位操作,因为这些操作需要:(1) 对数据排序;(2) 定位到特定的行。
try:
flights_ddf['DepDelay'].median()
except Exception as e:
print(f"{type(e).__name__}, {e}")
set_index()
在 Dask DataFrame 中,我们可以使用 set_index()
方法手动设置某一列为索引列,这个操作除了设置某个字段为索引列,还会根据这个字段对全局数据进行排序,它打乱了原来每个 Partition 的数据排序,因此会有很高的成本。
下面的例子展示了 set_index()
带来的变化:
def print_partitions(ddf):
for i in range(ddf.npartitions):
print(ddf.partitions[i].compute())
df = pd.DataFrame(
{"col1": ["01", "05", "02", "03", "04"], "col2": ["a", "b", "c", "d", "e"]}
ddf = dd.from_pandas(df, npartitions=2)
print_partitions(ddf)
2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911
2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'
01 a
2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912
2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'
02 c
03 d
04 e
05 b
这个例子设置 col1
列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。set_index()
也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 章节 4.4 中介绍 Shuffle 的计算过程和成本。
回到时间序列数据,该数据使用时间戳作为索引列。下面使用了两种方式对这份数据 set_index()
。第一种没有设置 divisions
,第二种设置了 divisions
。
第一种不设置 divisions
耗时很长,因为 Dask DataFrame 计算了所有 Partiton 的数据分布,并根据分布重排列了所有的 Partition,可以看到,Partition 的数目也发生了变化。
%%time
ts_df1 = ts_df.set_index("id")
nu = ts_df1.loc[[1001]].name.nunique().compute()
print(f"before set_index npartitions: {ts_df.npartitions}")
print(f"after set_index npartitions: {ts_df1.npartitions}")
2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'
before set_index npartitions: 1826
after set_index npartitions: 165
CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s
Wall time: 20.6 s
2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'
CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s
Wall time: 11.9 s
所以 Dask DataFrame 要慎重使用 set_index()
,如果 set_index()
之后有很多以下操作,可以考虑使用 set_index()
。
使用 loc
对索引列进行过滤
两个 Dask DataFrame 在索引列上合并(merge()
)
在索引列上进行分组聚合(groupby()
)
reset_index()
在 pandas 中,groupby
默认 as_index=True
,分组字段经过 groupby()
之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 Series
。比如下面 pandas 的例子,Origin
列就是分组字段,如果不设置 as_index=False
,groupby("Origin", as_index=False)["DepDelay"].mean()
生成的是一个 Series
。
# pandas
file_path = os.path.join(folder_path, "1991.csv")
pdf = pd.read_csv(file_path,
parse_dates={'Date': [0, 1, 2]},
dtype={'TailNum': object,
'CRSElapsedTime': float,
'Cancelled': bool})
uncancelled_pdf = pdf[pdf["Cancelled"] == False]
avg_pdf = uncancelled_pdf.groupby("Origin", as_index=False)["DepDelay"].mean()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
pdf = pd.read_csv(file_path,
avg_pdf = uncancelled_pdf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
uncancelled_ddf = flights_ddf[flights_ddf["Cancelled"] == False]
avg_ddf = uncancelled_ddf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_ddf.columns = ["Origin", "AvgDepDelay"]
avg_ddf = avg_ddf.compute()
# pandas 只使用了一年数据,因此结果不一样
avg_ddf.sort_values("AvgDepDelay")
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)