In [1]: dm = store_dataframes_as_dataset(
...: store=store_url, dataset_uuid="partitioned_dataset", dfs=[df], partition_on="B"
...: )
In [2]: sorted(dm.partitions.keys())
Out[2]:
['B=2013-01-02%2000%3A00%3A00/7fcca366cdae4f1383a034ee586b0d6f',
'B=2013-01-03%2000%3A00%3A00/7fcca366cdae4f1383a034ee586b0d6f']
Appending Data
Now, we create another_df
with the same schema as our intial dataframe
df
and update it using the eager
backend by calling update_dataset_from_dataframes()
:
In [3]: from kartothek.api.dataset import update_dataset_from_dataframes
In [4]: another_df = pd.DataFrame(
...: {
...: "A": 5.0,
...: "B": [
...: pd.Timestamp("20110103"),
...: pd.Timestamp("20110103"),
...: pd.Timestamp("20110104"),
...: pd.Timestamp("20110104"),
...: ],
...: "C": pd.Series(2, index=list(range(4)), dtype="float32"),
...: "D": np.array([6] * 4, dtype="int32"),
...: "E": pd.Categorical(["prod", "dev", "prod", "dev"]),
...: "F": "bar",
...: }
...: )
In [5]: dm = update_dataset_from_dataframes([another_df], store=store_url, dataset_uuid=dm.uuid)
In [6]: sorted(dm.partitions.keys())
Out[6]:
['B=2011-01-03%2000%3A00%3A00/e3f533fed3154814974061bb409c33a9',
'B=2011-01-04%2000%3A00%3A00/e3f533fed3154814974061bb409c33a9',
'B=2013-01-02%2000%3A00%3A00/7fcca366cdae4f1383a034ee586b0d6f',
'B=2013-01-03%2000%3A00%3A00/7fcca366cdae4f1383a034ee586b0d6f']
Looking at dm.partitions
, we can see that another partition has
been added.
If we read the data again, we can see that the another_df
has been appended to the
previous contents.
In [7]: from kartothek.api.dataset import read_table
In [8]: updated_df = read_table(dataset_uuid=dm.uuid, store=store_url, table="table")
In [9]: updated_df
Out[9]:
B A C D E F
0 2013-01-02 1.0 1.0 3 test foo
1 2013-01-02 1.0 1.0 3 train foo
2 2013-01-03 1.0 1.0 3 test foo
3 2013-01-03 1.0 1.0 3 train foo
4 2011-01-03 5.0 2.0 6 prod bar
5 2011-01-03 5.0 2.0 6 dev bar
6 2011-01-04 5.0 2.0 6 prod bar
7 2011-01-04 5.0 2.0 6 dev bar
The way dataset updates work is that new partitions are added to a dataset
as long as they have the same tables as the existing partitions. A different
table cannot be introduced into an existing dataset with an update.
To illustrate this point better, let’s first create a dataset with two tables:
In [10]: df2 = pd.DataFrame(
....: {
....: "G": "foo",
....: "H": pd.Categorical(["test", "train", "test", "train"]),
....: "I": np.array([9] * 4, dtype="int32"),
....: "J": pd.Series(3, index=list(range(4)), dtype="float32"),
....: "K": pd.Timestamp("20190604"),
....: "L": 2.0,
....: }
....: )
....:
In [11]: df2
Out[11]:
G H I J K L
0 foo test 9 3.0 2019-06-04 2.0
1 foo train 9 3.0 2019-06-04 2.0
2 foo test 9 3.0 2019-06-04 2.0
3 foo train 9 3.0 2019-06-04 2.0
In [12]: dm_two_tables = store_dataframes_as_dataset(
....: store_url, "two_tables", dfs=[{"data": {"table1": df, "table2": df2}}]
....: )
....:
In [13]: dm_two_tables.tables
Out[13]: ['table1', 'table2']
In [14]: sorted(dm_two_tables.partitions.keys())
Out[14]: ['1b62765eb484468c8622056c47927279']
Partition identifiers
In the previous example a dictionary was used to pass the desired data to the store function. To label each
partition, by default Kartothek uses UUIDs to ensure that each partition is named uniquely. This is
necessary so that the update can properly work using copy-on-write
principles.
Below is an example where we update the existing dataset another_unique_dataset_identifier
with new data for table1
and table2
:
In [15]: another_df2 = pd.DataFrame(
....: {
....: "G": "bar",
....: "H": pd.Categorical(["prod", "dev", "prod", "dev"]),
....: "I": np.array([12] * 4, dtype="int32"),
....: "J": pd.Series(4, index=list(range(4)), dtype="float32"),
....: "K": pd.Timestamp("20190614"),
....: "L": 10.0,
....: }
....: )
....:
In [16]: another_df2
Out[16]:
G H I J K L
0 bar prod 12 4.0 2019-06-14 10.0
1 bar dev 12 4.0 2019-06-14 10.0
2 bar prod 12 4.0 2019-06-14 10.0
3 bar dev 12 4.0 2019-06-14 10.0
In [17]: dm_two_tables = update_dataset_from_dataframes(
....: {"data": {"table1": another_df, "table2": another_df2}},
....: store=store_url,
....: dataset_uuid=dm_two_tables.uuid,
....: )
....:
In [18]: dm_two_tables.tables
Out[18]: ['table1', 'table2']
In [19]: sorted(dm_two_tables.partitions.keys())
Out[19]: ['1b62765eb484468c8622056c47927279', '97f4fc6be39f401b84f6d4c6be4f61f9']
Trying to update only a subset of tables throws a ValueError
:
In [20]: update_dataset_from_dataframes(
....: {
....: "data":
....: {
....: "table2": another_df2
....: }
....: },
....: store=store_url,
....: dataset_uuid=dm_two_tables.uuid
....: )
....:
---------------------------------------------------------------------------
ValueError: Input partitions for update have different tables than dataset:
Input partition tables: {'table2'}
Tables of existing dataset: ['table1', 'table2']
Deleting Data
Adding data to an existing dataset is not the only functionality achievable within an update
operation, and it can also be used to remove data.
To do so we use the delete_scope
keyword argument as shown in the example below:
In [21]: dm = update_dataset_from_dataframes(
....: None,
....: store=store_url,
....: dataset_uuid=dm.uuid,
....: partition_on="B",
....: delete_scope=[{"B": pd.Timestamp("20130102")}],
....: )
....:
In [22]: sorted(dm.partitions.keys())
Out[22]:
['B=2011-01-03%2000%3A00%3A00/e3f533fed3154814974061bb409c33a9',
'B=2011-01-04%2000%3A00%3A00/e3f533fed3154814974061bb409c33a9',
'B=2013-01-03%2000%3A00%3A00/7fcca366cdae4f1383a034ee586b0d6f']
As we can see, we specified using a dictionary that data where the column B
has the
value pd.Timestamp("20130102")
should be removed. Looking at the partitions after the update, we see that
the partition B=2013-01-02[...]
has in fact been removed.
Warning
We defined delete_scope
over a value of B
, which is the column that
we partitioned on: delete_scope
only works on partitioned columns.
Thus, delete_scope
should only be used on partitioned columns due to their one-to-one mapping;
without the guarantee of one-to-one mappings, using delete_scope
could have unwanted
effects like accidentally removing data with different values.
Attempting to use delete_scope
will also work on datasets not previously partitioned
on any column(s); however this is not at all advised since the effect will simply be to
remove all previous partitions and replace them with the ones in the update.
If the intention of the user is to delete the entire dataset, using kartothek.io.eager.delete_dataset()
would be a much better, cleaner and safer way to go about doing so.
When using delete_scope
, multiple values for the same column cannot be defined as a
list but have to be specified instead as individual dictionaries, i.e.
[{"E": ["test", "train"]}]
will not work but [{"E": "test"}, {"E": "train"}]
will.
In [23]: duplicate_df = df.copy()
In [24]: duplicate_df.F = "bar"
In [25]: dm = store_dataframes_as_dataset(
....: store_url,
....: "another_partitioned_dataset",
....: [df, duplicate_df],
....: partition_on=["E", "F"],
....: )
....:
In [26]: sorted(dm.partitions.keys())
Out[26]:
['E=test/F=bar/3cfeecf714e1403696b86ff80548cfbe',
'E=test/F=foo/56fc9c2d24684753896d5517e33db639',
'E=train/F=bar/3cfeecf714e1403696b86ff80548cfbe',
'E=train/F=foo/56fc9c2d24684753896d5517e33db639']
In [27]: dm = update_dataset_from_dataframes(
....: None,
....: store=store_url,
....: dataset_uuid=dm.uuid,
....: partition_on=["E", "F"],
....: delete_scope=[{"E": "train", "F": "foo"}, {"E": "test", "F": "bar"}],
....: )
....:
In [28]: sorted(dm.partitions.keys()) # `E=train/F=foo` and `E=test/F=bar` are deleted
Out[28]:
['E=test/F=foo/56fc9c2d24684753896d5517e33db639',
'E=train/F=bar/3cfeecf714e1403696b86ff80548cfbe']
Replacing Data
Finally, an update step can be used to perform the two steps above, i.e. deleting and appending
together in an atomic operation. This is done simply by specifying a dataset to be appended while also defining
a delete_scope
over the partition. The following example illustrates how both can be performed
with one update:
In [29]: df # Column B includes 2 values for '2013-01-02' and another 2 for '2013-01-03'
Out[29]:
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
1 1.0 2013-01-02 1.0 3 train foo
2 1.0 2013-01-03 1.0 3 test foo
3 1.0 2013-01-03 1.0 3 train foo
In [30]: dm = store_dataframes_as_dataset(store_url, "replace_partition", [df], partition_on="B")
In [31]: sorted(dm.partitions.keys()) # two partitions, one for each value of `B`
Out[31]:
['B=2013-01-02%2000%3A00%3A00/bf94326e100946018b08ebf0af40e4f3',
'B=2013-01-03%2000%3A00%3A00/bf94326e100946018b08ebf0af40e4f3']
In [32]: modified_df = another_df.copy()
# set column E to have value 'train' for all rows in this dataframe
In [33]: modified_df.B = pd.Timestamp("20130103")
In [34]: dm = update_dataset_from_dataframes(
....: [
....: modified_df
....: ], # specify dataframe which has 'new' data for partition to be replaced
....: store=store_url,
....: dataset_uuid=dm.uuid,
....: partition_on="B", # don't forget to specify the partitioning column
....: delete_scope=[
....: {"B": pd.Timestamp("2013-01-03")}
....: ], # specify the partition to be deleted
....: )
....:
In [35]: sorted(dm.partitions.keys())
Out[35]:
['B=2013-01-02%2000%3A00%3A00/bf94326e100946018b08ebf0af40e4f3',
'B=2013-01-03%2000%3A00%3A00/95c4e15869214a23a8bd8cbce9a5f123']
In [36]: read_table(dm.uuid, store_url, table="table")
Out[36]:
B A C D E F
0 2013-01-02 1.0 1.0 3 test foo
1 2013-01-02 1.0 1.0 3 train foo
2 2013-01-03 5.0 2.0 6 prod bar
3 2013-01-03 5.0 2.0 6 dev bar
4 2013-01-03 5.0 2.0 6 prod bar
5 2013-01-03 5.0 2.0 6 dev bar
As can be seen in the example above, the resultant dataframe from read_table()
consists of two rows corresponding to B=2013-01-02
(from df
) and four rows corresponding to B=2013-01-03
from modified_df
.
Thus, the original partition with the two rows corresponding to B=2013-01-03
from df
has been completely replaced.
Garbage collection
When Kartothek is executing an operation, it makes sure to not
commit changes to the dataset until the operation has been succesfully completed. If a
write operation does not succeed for any reason, although there may be new files written
to storage, those files will not be used by the dataset as they will not be referenced in
the Kartothek metadata. Thus, when the user reads the dataset, no new data will
appear in the output.
Similarly, when deleting a partition, Kartothek only removes the reference of that file
from the metadata.
These temporary files will remain in storage until a Kartothek garbage collection
function is called on the dataset.
If a dataset is updated on a regular basis, it may be useful to run garbage collection
periodically to decrease unnecessary storage use.
An example of garbage collection is shown below.
A little above, near the end of the delete section,
we removed two partitions for the dataset with uuid replace_partition.
The removed files remain in storage but are untracked by Kartothek.
When garbage collection is called, the files are removed.
In [37]: from kartothek.api.dataset import garbage_collect_dataset
In [38]: from storefact import get_store_from_url
In [39]: store = get_store_from_url(store_url)
In [40]: files_before = set(store.keys())
In [41]: garbage_collect_dataset(store=store, dataset_uuid=dm.uuid)
In [42]: files_before.difference(store.keys()) # Show files removed
Out[42]: {'replace_partition/table/B=2013-01-03%2000%3A00%3A00/bf94326e100946018b08ebf0af40e4f3.parquet'}
Mutating indexed datasets
The mutating operation will update all indices that currently exist for the dataset. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example
In [43]: df = pd.DataFrame({"payload": range(10), "i1": 0, "i2": ["a"] * 5 + ["b"] * 5})
In [44]: dm = store_dataframes_as_dataset(
....: store_url, "indexed_dataset", [df], secondary_indices=["i1", "i2"]
....: )
....:
In [45]: dm = dm.load_all_indices(store_url)
In [46]: dm.indices["i1"].observed_values()
Out[46]: array([0])
In [47]: dm.indices["i2"].observed_values()
Out[47]: array(['b', 'a'], dtype=object)
In [48]: new_df = pd.DataFrame({"payload": range(10), "i1": 1, "i2": "c"})
If we do not specify anything, kartothek will infer the indices and update them correctly
In [49]: dm = update_dataset_from_dataframes([new_df], store=store_url, dataset_uuid=dm.uuid)
In [50]: dm = dm.load_all_indices(store_url)
In [51]: dm.indices["i1"].observed_values()
Out[51]: array([0, 1])
In [52]: dm.indices["i2"].observed_values()
Out[52]: array(['b', 'a', 'c'], dtype=object)
This is even true if only a subset is given
In [53]: new_df = pd.DataFrame({"payload": range(10), "i1": 2, "i2": "d"})
In [54]: dm = update_dataset_from_dataframes(
....: [new_df], store=store_url, dataset_uuid=dm.uuid, secondary_indices="i1"
....: )
....:
In [55]: dm = dm.load_all_indices(store_url)
In [56]: dm.indices["i1"].observed_values()
Out[56]: array([0, 1, 2])
In [57]: dm.indices["i2"].observed_values()
Out[57]: array(['b', 'a', 'c', 'd'], dtype=object)