I am getting KeyError while using the
read_parquet
method. First, I save a Dask Dataframe by calling the
to_parquet
method, and then, I try to read the Parquet Dataset by calling the
read_parquet
method. This error occurs only when I use a column as a filter in the
read_parquet
method that I used to construct the directory-based partitioning in the
to_parquet
method. Below, I show part of the code
from datetime import date, datetime, time, timezone
from typing import Any, ClassVar, List, Tuple, Type, Optional, Literal, Union
import dask.dataframe as dd
import pandas as pd
import pyarrow
import sqlalchemy.types as sqlatypes
from pyarrow import Schema
from pyarrow.dataset import Partitioning, partitioning
from sqlalchemy import (
Boolean,
Column,
Date,
DateTime,
Float,
Integer,
MetaData,
String,
Table,
Time,
LoadFilterOperator = Literal["=", "==", "!=", ">", "<", ">=", "<=", "in", "not in"]
LoadFilterTerm = Tuple[str, LoadFilterOperator, Union[Any, List[Any]]]
LoadFilter = Union[List[LoadFilterTerm], List[List[LoadFilterTerm]]]
class Dataset:
name: ClassVar[str]
table: ClassVar[Table]
index_column: ClassVar[str]
partitions: ClassVar[List[str]]
class _dataset_test(Dataset):
name = "Test_dataset"
table = Table(
name,
MetaData,
Column("id", Integer, primary_key=True),
Column("integer", Integer),
Column("integer_null", Integer),
Column("string", String),
Column("string_null", String),
Column("float", Float),
Column("float_null", Float),
Column("boolean", Boolean),
Column("boolean_null", Boolean),
Column("date", Date),
Column("date_null", Date),
Column("date_null", Date),
Column("time", Time),
Column("time_null", Time),
Column("datetime", DateTime),
Column("datetime_null", DateTime),
index_column = "id"
partitions = ["integer", "string", "float"]
_df_test = (
pd.DataFrame(
"id": [1, 2, 3],
"integer": [1, 2, 3],
"integer_null": [1, 2, None],
"string": ["a", "b", "c"],
"string_null": ["a", "b", None],
"float": [1.0, 2.0, 3.8],
"float_null": [1.0, 2.4, None],
"boolean": [True, False, True],
"boolean_null": [True, False, None],
"date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
"date_null": [date(2020, 1, 1), date(2020, 1, 2), None],
"time": [time(1, 0), time(2, 0), time(3, 0)],
"time_null": [time(1, 0), time(2, 0), None],
"datetime": [
datetime(2020, 1, 1, 0, 0),
datetime(2020, 1, 2, 0, 0),
datetime(2020, 1, 3, 0, 0),
"datetime_null": [
datetime(2020, 1, 1, 0, 0),
datetime(2020, 1, 2, 0, 0),
None,
.convert_dtypes()
.set_index("id")
_df_test["datetime"] = _df_test["datetime"].dt.tz_localize("UTC")
_df_test["datetime_null"] = _df_test["datetime_null"].dt.tz_localize("UTC")
def extract_pyarrow_schema(
table: Table, partitions: List[str] | None = None
) -> Tuple[Schema, Partitioning | None]:
pyarrow_types [
pyarrow.field(column.name, from_sqlalchemy_to_pyarrow(column.type))
for column in table.columns
pyarrow_schema = pyarrow.schema(pyarrow_types)
if partitions:
partition_types = [
pyarrow.field(field.name, field.type)
for field in pyarrow_types
if field.name in partitions
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
else:
partition_schema = None
return pyarrow_schema, partition_schema
def from_sqlalchemy_to_pyarrow(sqltype: sqlatypes.TypeEngine) -> pyarrow.DataType:
if isinstance(sqltype, sqlatypes.Integer):
return pyarrow.int64()
if isinstance(sqltype, sqlatypes.String):
return pyarrow.string()
if isinstance(sqltype, sqlatypes.Float):
return pyarrow.float64()
if isinstance(sqltype, sqlatypes.Boolean):
return pyarrow.bool_()
if isinstance(sqltype, sqlatypes.DateTime):
return pyarrow.timestamp("ns", tz=timezone.utc)
if isinstance(sqltype, sqlatypes.Date):
return pyarrow.date32()
if isinstance(sqltype, sqlatypes.Time):
return pyarrow.time64("ns")
raise NotImplementedError(f"Unsupported type: {sqltype}")
def save(df: pd.DataFrame, dataset: Type[Dataset]) -> None:
dataframe = dd.from_pandas(df, npartitions=3)
dataframe = dataframe.reset_index()
dataframe = dataframe.persist()
kwargs = dict(
engine="pyarrow",
write_index=False,
append=True,
write_metadat_file=False,
if dataset.partitions:
kwargs["partition_on"] = dataset.partitions
if dataset.table is not None:
schema, _ = extract_pyarrow_schema(dataset.table)
kwargs["schema"] = schema
dataframe.to_parquet("path/to/output", **kwargs)
except FileNotFoundError:
kwargs["append"] = False
dataframe.to_parquet("path/to/output", **kwargs)
def read(
dataset: Type[Dataset],
filters: Optional[LoadFilter] = None,
columns: Optional[List[str]]
) -> dd.DataFrame:
kwargs = dict(
"engine": "pyarrow",
if filters:
kwargs["filters"] = filters
if columns:
kwargs["columns"] = columns
if dataset.table is not None:
_, partitioning = extract_pyarrow_schema(dataset.table, dataset.partitions)
if partitioning:
kwargs["dataset"] = {
"partitioning": {"flavor": "hive", "schema": partitioning.schema}
df = dd.read_parquet("path/to/output", **kwargs)
except (FileNotFoundError, ValueError) as e:
print("The error is: ",e)
return df
The save
and read
functions are the most important part.
When I run the following code:
save(_df_test, _dataset_test)
I get the following directory structure:
path/to/output/
├── integer=1/
│ ├── string=a/
│ │ ├── float=1.0/
│ │ │ ├── part.0.parquet
├── integer=2/
│ ├── string=b/
│ │ ├── float=2.0/
│ │ │ ├── part.1.parquet
├── integer=1/
│ ├── string=c/
│ │ ├── float=3.8/
│ │ │ ├── part.3.parquet
But, when I try to read the Parquet Dataset, I get the KeyError:
df = read(_dataset_test, [("integer", ">", 2)], ["id"])
df = df.compute()
I should receive the following dataframe:
pd.DataFrame({"id": [3]}, dtype="Int64").set_index("id")
The error occurs in the _get_rg_statistics(row_group, col_name)
method. This method is in dask/dataframe/io/parquet/arrow.py
file.
Note that the integer
field was used to filter the data and to construct the directory-based partitioning. I am using the 2023.4.1
version of the dask library.
Hi @lgfc, welcome to Dask community!
Thanks for the detailed post and the effort you put into it. Unfortunately, when copy pasting your code, I’m running onto syntax problem due to indentation. Moreover, it’s a bit long. Could you extract from it a minimal reproducer so that we could play with it?
It would be nice to have the entire stack trace too.
And finally, does it works as expected using Pandas only?
Hi @guillaumeeb!
I’d like to thank you for your help. I’ve changed some parts of the code in order to make it more readable.
Below, I show the new code
import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = (
pd.DataFrame(
"id": [1, 2, 3],
"integer": [1, 2, 3],
"integer_null": [1, 2, None],
"string": ["a", "b", "c"],
"string_null": ["a", "b", None],
"float": [1.0, 2.0, 3.8],
"float_null": [1.0, 2.4, None],
.convert_dtypes()
.set_index("id")
def extract_pyarrow_schema(partitions=None):
pyarrow_types = [
pyarrow.field("id", pyarrow.int64()),
pyarrow.field("integer", pyarrow.int64()),
pyarrow.field("integer_null", pyarrow.int64()),
pyarrow.field("string", pyarrow.string()),
pyarrow.field("string_null", pyarrow.string()),
pyarrow.field("float", pyarrow.float64()),
pyarrow.field("float_null", pyarrow.float64()),
pyarrow_schema = pyarrow.schema(pyarrow_types)
if partitions:
partition_types = [
pyarrow.field(field.name, field.type)
for field in pyarrow_types
if field.name in partitions
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
else:
partition_schema = None
return pyarrow_schema, partition_schema
def save(df):
data = dd.from_pandas(df, npartitions=3)
data = data.reset_index()
data = data.persist()
kwargs = dict(
engine="pyarrow",
write_index=False,
append=True,
write_metadata_file=False,
kwargs["partition_on"] = ["integer", "string", "float"]
schema, _ = extract_pyarrow_schema()
kwargs["schema"] = schema
data.to_parquet("path/to/output/", **kwargs)
except FileNotFoundError:
kwargs["append"] = False
data.to_parquet("path/to/output/", **kwargs)
def read(filters=None, columns=None):
kwargs = dict(engine="pyarrow")
if filters:
kwargs["filters"] = filters
if columns:
kwargs["columns"] = columns
_, partition_schema = extract_pyarrow_schema(["integer", "string", "float"])
if partition_schema:
kwargs["dataset"] = {
"partitioning": {"flavor": "hive", "schema": partition_schema.schema}
df = dd.read_parquet("path/to/output", **kwargs)
except (FileNotFoundError, ValueError) as e:
print("The error is ", e)
return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df.compute())
Below, I show the entire stack trace
The code works as expected using Pandas only.
Hi, @guillaumeeb ! The codes were a little bit long. I’ve tried to simplify as most as possible. Below, I’ll show the code with Pandas and the code with Dask again.
The code with Dask:
import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
def save(df):
data = dd.from_pandas(df, npartitions=3)
data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=pyarrow_schema)
def read(filters, columns):
df = dd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}})
return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df.compute())
Now, the code using Pandas.
import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
def save(df):
data = dd.from_pandas(df, npartitions=1)
data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=partition_schema)
def read(filters=None, columns=None):
df = pd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, partitioning=partition_schema)
return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df)
In the code with Pandas, I use Dask to save the dataset to ensure that to_parquet
method works similarly for both Pandas and Dask Dataframes. The code with Pandas is very similar to the code with Dask. There are two important differences between the codes. These two differences are in the read
method. First, the way I build the partitioning parameter is slightly different, and the second is which read_parquet
method I call.
There seems to be a typo error in the save
function on Pandas side: I just replaced schema=partition_schema
by the code from the Dask code block.
Then, I’m not sure why you are not using the same set of parameters for the Dask read_parquet
method as in Pandas? I just did that (so replacing dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}}
) and it worked as with Pandas.
Did I miss something?
guillaumeeb:
There seems to be a typo error in the save
function on Pandas side: I just replaced schema=partition_schema
by the code from the Dask code block.
Yes, there is a typo error. The correct one would be schema=pyarrow_schema
.
guillaumeeb:
Then, I’m not sure why you are not using the same set of parameters for the Dask read_parquet
method as in Pandas? I just did that (so replacing dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}}
) and it worked as with Pandas.
Did I miss something?
I’m not using the same set of parameters for Dask as in Pandas because doing that I’ve got another error. The Dask is converting the partitioned columns to categorical dtypes even when they are not categorical. For example, if I replace df = read([("integer", ">", 2)], ["id"])
to df = read([("float", ">", 1.0), ("float", ">", 3.0)], ["id"])
, I get the following error:
The Dask documentation informs that partitioned columns will not be converted to categorical dtypes when a custom partitioning schema is specified in this way, i.e., dataset={"partitioning": {"flavor": "hive", "schema": ...}}
I’m having the exact same issue, it seems that Dask fails to filter by columns that are partitions when we pass dataset={"partitioning": {"flavor": "hive", "schema": pa.schema(...)}}
) to read_parquet
.
Have you found any solution, @guillaumeeb?
@lgfc @bressanmarcos - Do you still have problems with the latest version of dask? I ran the reproducer above and got the answer I’d expect:
0 3
Handling of hive partitions has certainly changed a lot since the April release, but I’ll be happy to investigate/fix anything that is still broken.