添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
温文尔雅的蘑菇  ·  A connection was ...·  5 小时前    · 
满身肌肉的海豚  ·  Cannot reuse ...·  6 小时前    · 
俊逸的香菇  ·  go-redis文档 | 六松岛-福小林·  6 小时前    · 
含蓄的自行车  ·  Jetlink Crafts Big ...·  昨天    · 
卖萌的葡萄酒  ·  IStandaloneCodeEditor ...·  2 周前    · 
眼睛小的番茄  ·  PYQT ...·  2 月前    · 
朝气蓬勃的猴子  ·  operator --- ...·  3 月前    · 
踏实的汉堡包  ·  springboot ...·  4 月前    · 

I am getting KeyError while using the read_parquet method. First, I save a Dask Dataframe by calling the to_parquet method, and then, I try to read the Parquet Dataset by calling the read_parquet method. This error occurs only when I use a column as a filter in the read_parquet method that I used to construct the directory-based partitioning in the to_parquet method. Below, I show part of the code

from datetime import date, datetime, time, timezone
from typing import Any, ClassVar, List, Tuple, Type, Optional, Literal, Union
import dask.dataframe as dd
import pandas as pd
import pyarrow
import sqlalchemy.types as sqlatypes
from pyarrow import Schema
from pyarrow.dataset import Partitioning, partitioning
from sqlalchemy import (
    Boolean,
    Column,
    Date,
    DateTime,
    Float,
    Integer,
    MetaData,
    String,
    Table,
    Time,
LoadFilterOperator = Literal["=", "==", "!=", ">", "<", ">=", "<=", "in", "not in"]
LoadFilterTerm = Tuple[str, LoadFilterOperator, Union[Any, List[Any]]]
LoadFilter = Union[List[LoadFilterTerm], List[List[LoadFilterTerm]]]
class Dataset:
    name: ClassVar[str]
    table: ClassVar[Table]
    index_column: ClassVar[str]
    partitions: ClassVar[List[str]]
class _dataset_test(Dataset):
    name = "Test_dataset"
    table = Table(
          name,
          MetaData,
          Column("id", Integer, primary_key=True),
          Column("integer", Integer),
          Column("integer_null", Integer),
          Column("string", String),
          Column("string_null", String),
          Column("float", Float),
          Column("float_null", Float),
          Column("boolean", Boolean),
          Column("boolean_null", Boolean),
          Column("date", Date),
          Column("date_null", Date),
          Column("date_null", Date),
          Column("time",  Time),
          Column("time_null", Time),
          Column("datetime", DateTime),
          Column("datetime_null", DateTime),
    index_column = "id"
    partitions = ["integer", "string", "float"]
_df_test = (
       pd.DataFrame(
                      "id": [1, 2, 3],
                      "integer": [1, 2, 3],
                      "integer_null": [1, 2, None],
                      "string": ["a", "b", "c"],
                      "string_null": ["a", "b", None],
                      "float": [1.0, 2.0, 3.8],
                      "float_null": [1.0, 2.4, None],
                      "boolean": [True, False, True],
                      "boolean_null": [True, False, None],
                      "date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
                      "date_null": [date(2020, 1, 1), date(2020, 1, 2), None],
                      "time": [time(1, 0), time(2, 0), time(3, 0)],
                      "time_null": [time(1, 0), time(2, 0), None],
                      "datetime": [
                            datetime(2020, 1, 1, 0, 0),
                            datetime(2020, 1, 2, 0, 0),
                            datetime(2020, 1, 3, 0, 0),
                      "datetime_null": [
                            datetime(2020, 1, 1, 0, 0),
                            datetime(2020, 1, 2, 0, 0),
                            None,
    .convert_dtypes()
    .set_index("id")
_df_test["datetime"] = _df_test["datetime"].dt.tz_localize("UTC")
_df_test["datetime_null"] = _df_test["datetime_null"].dt.tz_localize("UTC")     
def extract_pyarrow_schema(
    table: Table, partitions: List[str] | None = None
) -> Tuple[Schema, Partitioning | None]:
    pyarrow_types [
           pyarrow.field(column.name, from_sqlalchemy_to_pyarrow(column.type))
           for column in table.columns
    pyarrow_schema = pyarrow.schema(pyarrow_types)    
    if partitions:
        partition_types = [
              pyarrow.field(field.name, field.type)
              for field in pyarrow_types
              if field.name in partitions
       partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
    else:
       partition_schema = None
    return pyarrow_schema, partition_schema       
def from_sqlalchemy_to_pyarrow(sqltype: sqlatypes.TypeEngine) -> pyarrow.DataType:
    if isinstance(sqltype, sqlatypes.Integer):
        return pyarrow.int64()
    if isinstance(sqltype, sqlatypes.String):
       return pyarrow.string()
    if isinstance(sqltype, sqlatypes.Float):
        return pyarrow.float64()
    if isinstance(sqltype, sqlatypes.Boolean):
        return pyarrow.bool_()
    if isinstance(sqltype, sqlatypes.DateTime):
        return pyarrow.timestamp("ns", tz=timezone.utc)
    if isinstance(sqltype, sqlatypes.Date):
        return pyarrow.date32()
    if isinstance(sqltype, sqlatypes.Time):
        return pyarrow.time64("ns")
    raise NotImplementedError(f"Unsupported type: {sqltype}")
def save(df: pd.DataFrame, dataset: Type[Dataset]) -> None:
    dataframe = dd.from_pandas(df, npartitions=3)
    dataframe = dataframe.reset_index()
    dataframe = dataframe.persist()
    kwargs = dict(
        engine="pyarrow",
        write_index=False,
        append=True,
        write_metadat_file=False,
    if dataset.partitions:
        kwargs["partition_on"] = dataset.partitions
    if dataset.table is not None:
        schema, _ = extract_pyarrow_schema(dataset.table)
        kwargs["schema"] = schema
        dataframe.to_parquet("path/to/output", **kwargs)
    except FileNotFoundError:
       kwargs["append"] = False
       dataframe.to_parquet("path/to/output", **kwargs)
def read(
      dataset: Type[Dataset],
      filters: Optional[LoadFilter] = None,
      columns: Optional[List[str]]
) -> dd.DataFrame:
    kwargs = dict(
         "engine": "pyarrow",
    if filters:
        kwargs["filters"] = filters
    if columns:
        kwargs["columns"] = columns
    if dataset.table is not None:
        _, partitioning = extract_pyarrow_schema(dataset.table, dataset.partitions)
       if partitioning:
           kwargs["dataset"] = {
                   "partitioning": {"flavor": "hive", "schema": partitioning.schema}
        df = dd.read_parquet("path/to/output", **kwargs)
    except (FileNotFoundError, ValueError) as e:
        print("The error is: ",e)
    return df

The save and read functions are the most important part.

When I run the following code:

save(_df_test, _dataset_test)

I get the following directory structure:

path/to/output/
├── integer=1/
│   ├── string=a/
│   │   ├── float=1.0/
│   │   │   ├── part.0.parquet
├── integer=2/
│   ├── string=b/
│   │   ├── float=2.0/
│   │   │   ├── part.1.parquet
├── integer=1/
│   ├── string=c/
│   │   ├── float=3.8/
│   │   │   ├── part.3.parquet

But, when I try to read the Parquet Dataset, I get the KeyError:

df = read(_dataset_test, [("integer", ">", 2)], ["id"])
df = df.compute()

I should receive the following dataframe:

pd.DataFrame({"id": [3]}, dtype="Int64").set_index("id")

The error occurs in the _get_rg_statistics(row_group, col_name) method. This method is in dask/dataframe/io/parquet/arrow.py file.

Note that the integer field was used to filter the data and to construct the directory-based partitioning. I am using the 2023.4.1 version of the dask library.

Hi @lgfc, welcome to Dask community!

Thanks for the detailed post and the effort you put into it. Unfortunately, when copy pasting your code, I’m running onto syntax problem due to indentation. Moreover, it’s a bit long. Could you extract from it a minimal reproducer so that we could play with it?

It would be nice to have the entire stack trace too.

And finally, does it works as expected using Pandas only?

Hi @guillaumeeb!

I’d like to thank you for your help. I’ve changed some parts of the code in order to make it more readable.

Below, I show the new code

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = (
    pd.DataFrame(
            "id": [1, 2, 3],
            "integer": [1, 2, 3],
            "integer_null": [1, 2, None],
            "string": ["a", "b", "c"],
            "string_null": ["a", "b", None],
            "float": [1.0, 2.0, 3.8],
            "float_null": [1.0, 2.4, None],
    .convert_dtypes()
    .set_index("id")
def extract_pyarrow_schema(partitions=None):
    pyarrow_types = [
        pyarrow.field("id", pyarrow.int64()),
        pyarrow.field("integer", pyarrow.int64()),
        pyarrow.field("integer_null", pyarrow.int64()),
        pyarrow.field("string", pyarrow.string()),
        pyarrow.field("string_null", pyarrow.string()),
        pyarrow.field("float", pyarrow.float64()),
        pyarrow.field("float_null", pyarrow.float64()),
    pyarrow_schema = pyarrow.schema(pyarrow_types)
    if partitions:
        partition_types = [
            pyarrow.field(field.name, field.type)
            for field in pyarrow_types
            if field.name in partitions
        partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
    else:
        partition_schema = None
    return pyarrow_schema, partition_schema
def save(df):
    data = dd.from_pandas(df, npartitions=3)
    data = data.reset_index()
    data = data.persist()
    kwargs = dict(
        engine="pyarrow",
        write_index=False,
        append=True,
        write_metadata_file=False,
    kwargs["partition_on"] = ["integer", "string", "float"]
    schema, _ = extract_pyarrow_schema()
    kwargs["schema"] = schema
        data.to_parquet("path/to/output/", **kwargs)
    except FileNotFoundError:
        kwargs["append"] = False
        data.to_parquet("path/to/output/", **kwargs)
def read(filters=None, columns=None):
    kwargs = dict(engine="pyarrow")
    if filters:
        kwargs["filters"] = filters
    if columns:
        kwargs["columns"] = columns
    _, partition_schema = extract_pyarrow_schema(["integer", "string", "float"])
    if partition_schema:
        kwargs["dataset"] = {
            "partitioning": {"flavor": "hive", "schema": partition_schema.schema}
        df = dd.read_parquet("path/to/output", **kwargs)
    except (FileNotFoundError, ValueError) as e:
        print("The error is ", e)
    return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df.compute())

Below, I show the entire stack trace

The code works as expected using Pandas only.

Hi, @guillaumeeb ! The codes were a little bit long. I’ve tried to simplify as most as possible. Below, I’ll show the code with Pandas and the code with Dask again.

The code with Dask:

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
def save(df):
    data = dd.from_pandas(df, npartitions=3)
    data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=pyarrow_schema)
def read(filters, columns):
    df = dd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}})
    return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df.compute())

Now, the code using Pandas.

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning
_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
def save(df):
    data = dd.from_pandas(df, npartitions=1)
    data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=partition_schema)
def read(filters=None, columns=None):
    df = pd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, partitioning=partition_schema)
    return df
save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df)

In the code with Pandas, I use Dask to save the dataset to ensure that to_parquet method works similarly for both Pandas and Dask Dataframes. The code with Pandas is very similar to the code with Dask. There are two important differences between the codes. These two differences are in the read method. First, the way I build the partitioning parameter is slightly different, and the second is which read_parquet method I call.

There seems to be a typo error in the save function on Pandas side: I just replaced schema=partition_schema by the code from the Dask code block.

Then, I’m not sure why you are not using the same set of parameters for the Dask read_parquet method as in Pandas? I just did that (so replacing dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}}) and it worked as with Pandas.

Did I miss something?

guillaumeeb:

There seems to be a typo error in the save function on Pandas side: I just replaced schema=partition_schema by the code from the Dask code block.

Yes, there is a typo error. The correct one would be schema=pyarrow_schema.

guillaumeeb:

Then, I’m not sure why you are not using the same set of parameters for the Dask read_parquet method as in Pandas? I just did that (so replacing dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}}) and it worked as with Pandas.

Did I miss something?

I’m not using the same set of parameters for Dask as in Pandas because doing that I’ve got another error. The Dask is converting the partitioned columns to categorical dtypes even when they are not categorical. For example, if I replace df = read([("integer", ">", 2)], ["id"]) to df = read([("float", ">", 1.0), ("float", ">", 3.0)], ["id"]), I get the following error:

The Dask documentation informs that partitioned columns will not be converted to categorical dtypes when a custom partitioning schema is specified in this way, i.e., dataset={"partitioning": {"flavor": "hive", "schema": ...}}

I’m having the exact same issue, it seems that Dask fails to filter by columns that are partitions when we pass dataset={"partitioning": {"flavor": "hive", "schema": pa.schema(...)}} ) to read_parquet.

Have you found any solution, @guillaumeeb?

@lgfc @bressanmarcos - Do you still have problems with the latest version of dask? I ran the reproducer above and got the answer I’d expect:

0 3

Handling of hive partitions has certainly changed a lot since the April release, but I’ll be happy to investigate/fix anything that is still broken.