@feature_processor Decorator
Feature Processor Data Source
Feature Processor Scheduler and Triggers
Training APIs
Distributed Training APIs
Inference APIs
Governance APIs
Utility APIs
class
sagemaker.feature_store.feature_group.
FeatureGroup
(
name
=
_Nothing.NOTHING
,
sagemaker_session
=
_Nothing.NOTHING
,
feature_definitions
=
_Nothing.NOTHING
)
Bases:
object
FeatureGroup definition.
This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup,
session instance, and a list of feature definition objects i.e., FeatureDefinition.
Parameters
name
(
str
) –
sagemaker_session
(
Session
) –
feature_definitions
(
Sequence
[
FeatureDefinition
]
) –
sagemaker_session
session instance to perform boto calls.
If None, a new Session will be created.
Session
create
(
s3_uri
,
record_identifier_name
,
event_time_feature_name
,
role_arn
=
None
,
online_store_kms_key_id
=
None
,
enable_online_store
=
False
,
ttl_duration
=
None
,
offline_store_kms_key_id
=
None
,
disable_glue_table_creation
=
False
,
data_catalog_config
=
None
,
description
=
None
,
tags
=
None
,
table_format
=
None
,
online_store_storage_type
=
None
,
throughput_config
=
None
)
Create a SageMaker FeatureStore FeatureGroup.
Parameters
s3_uri
(
Union
[
str
,
bool
]
) – S3 URI of the offline store, set to
False
to disable offline store.
record_identifier_name
(
str
) – name of the record identifier feature.
event_time_feature_name
(
str
) – name of the event time feature.
role_arn
(
str
) – ARN of the role used to call CreateFeatureGroup.
online_store_kms_key_id
(
str
) – KMS key ARN for online store (default: None).
ttl_duration
(
TtlDuration
) – Default time to live duration for records (default: None).
enable_online_store
(
bool
) – whether to enable online store or not (default: False).
offline_store_kms_key_id
(
str
) – KMS key ARN for offline store (default: None).
If a KMS encryption key is not specified, SageMaker encrypts all data at
rest using the default AWS KMS key. By defining your bucket-level key for
SSE, you can reduce the cost of AWS KMS requests.
For more information, see
Bucket Key
in the Amazon S3 User Guide.
disable_glue_table_creation
(
bool
) – whether to turn off Glue table creation
or not (default: False).
data_catalog_config
(
DataCatalogConfig
) – configuration for
Metadata store (default: None).
description
(
str
) – description of the FeatureGroup (default: None).
tags
(
Optional
[
Tags
]
) – Tags for labeling a FeatureGroup (default: None).
table_format
(
TableFormatEnum
) – format of the offline store table (default: None).
online_store_storage_type
(
OnlineStoreStorageTypeEnum
) – storage type for the
online store (default: None).
throughput_config
(
ThroughputConfig
) – throughput configuration of the
feature group (default: None).
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
update
(
feature_additions
=
None
,
online_store_config
=
None
,
throughput_config
=
None
)
Update a FeatureGroup and add new features from the given feature definitions.
Parameters
feature_additions
(
Sequence
[
Dict
[
str
,
str
]
) – list of feature definitions to be updated.
online_store_config
(
OnlineStoreConfigUpdate
) – online store config to be updated.
throughput_config
(
ThroughputConfigUpdate
) – target throughput configuration
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
update_feature_metadata
(
feature_name
,
description
=
None
,
parameter_additions
=
None
,
parameter_removals
=
None
)
Update a feature metadata and add/remove metadata.
Parameters
feature_name
(
str
) – name of the feature to update.
description
(
str
) – description of the feature to update.
parameter_additions
(
Sequence
[
Dict
[
str
,
str
]
) – list of feature parameter to be added.
parameter_removals
(
Sequence
[
str
]
) – list of feature parameter key to be removed.
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
list_parameters_for_feature_metadata
(
feature_name
)
List all parameters for a feature metadata.
Parameters
feature_name
(
str
) – name of the feature.
Returns
list of key, value pair of the parameters.
Return type
Sequence
[
Dict
[
str
,
str
]]
load_feature_definitions
(
data_frame
,
online_storage_type
=
None
)
Load feature definitions from a Pandas DataFrame.
Column name is used as feature name. Feature type is inferred from the dtype
of the column. Dtype
int_
, int8, int16, int32, int64, uint8, uint16, uint32
and uint64 are mapped to Integral feature type. Dtype
float_
, float16, float32
and float64 are mapped to Fractional feature type. string dtype is mapped to
String feature type.
No feature definitions will be loaded if the given data_frame contains
unsupported dtypes.
For IN_MEMORY online_storage_type all collection type columns within DataFrame
will be inferred as a List,
instead of a String. Due to performance limitations,
only first 1,000 values of the column will be sampled,
when inferring collection Type.
Customers can manually update the inferred collection type as needed.
Parameters
data_frame
(
DataFrame
) – A Pandas DataFrame containing features.
online_storage_type
(
OnlineStoreStorageTypeEnum
) – Optional. Online storage type for the feature group.
The value can be either STANDARD or IN_MEMORY
If not specified,STANDARD will be used by default.
If specified as IN_MEMORY,
we will infer any collection type column within DataFrame as a List instead of a
String.
All, collection types (List, Set and Vector) will be inferred as List.
We will only sample the first 1,000 values of the column when inferring
collection Type.
Returns
list of FeatureDefinition
Return type
Sequence
[
FeatureDefinition
]
get_record
(
record_identifier_value_as_string
,
feature_names
=
None
)
Get a single record in a FeatureGroup
Parameters
record_identifier_value_as_string
(
String
) – a String representing the value of the record identifier.
feature_names
(
Sequence
[
String
]
) – a list of Strings representing feature names.
Return type
Sequence
[
Dict
[
str
,
str
]]
put_record
(
record
,
target_stores
=
None
,
ttl_duration
=
None
)
Put a single record in the FeatureGroup.
Parameters
record
(
Sequence
[
FeatureValue
]
) – a list contains feature values.
target_stores
(
Sequence
[
str
]
) – a list of target stores.
ttl_duration
(
TtlDuration
) – customer specified ttl duration.
delete_record
(
record_identifier_value_as_string
,
event_time
,
deletion_mode
=
DeletionModeEnum.SOFT_DELETE
)
Delete a single record from a FeatureGroup.
Parameters
record_identifier_value_as_string
(
String
) – a String representing the value of the record identifier.
event_time
(
String
) – a timestamp format String indicating when the deletion event occurred.
deletion_mode
(
DeletionModeEnum
) – deletion mode for deleting record. (default: DetectionModeEnum.SOFT_DELETE)
ingest
(
data_frame
,
target_stores
=
None
,
max_workers
=
1
,
max_processes
=
1
,
wait
=
True
,
timeout
=
None
,
profile_name
=
None
)
Ingest the content of a pandas DataFrame to feature store.
max_worker
the number of threads created to work on different partitions of the
data_frame
in parallel.
max_processes
the number of processes will be created to work on different
partitions of the
data_frame
in parallel, each with
max_worker
threads.
The ingest function attempts to ingest all records in the data frame. SageMaker
Feature Store throws an exception if it fails to ingest any records.
If
wait
is
True
, Feature Store runs the
ingest
function synchronously.
You receive an
IngestionError
if there are any records that can’t be ingested.
If
wait
is
False
, Feature Store runs the
ingest
function asynchronously.
Instead of setting
wait
to
True
in the
ingest
function, you can invoke
the
wait
function on the returned instance of
IngestionManagerPandas
to run
the
ingest
function synchronously.
To access the rows that failed to ingest, set
wait
to
False
. The
IngestionError.failed_rows
object saves all the rows that failed to ingest.
profile_name
argument is an optional one. It will use the default credential if None is
passed. This
profile_name
is used in the sagemaker_featurestore_runtime client only. See
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
for more
about the default credential.
Parameters
data_frame
(
DataFrame
) – data_frame to be ingested to feature store.
target_stores
(
Sequence
[
TargetStoreEnum
]
) – target stores to be used for
ingestion. (default: None).
max_workers
(
int
) – number of threads to be created.
max_processes
(
int
) – number of processes to be created. Each process spawns
max_worker
number of threads.
wait
(
bool
) – whether to wait for the ingestion to finish or not.
timeout
(
Union
[
int
,
float
]
) –
concurrent.futures.TimeoutError
will be raised
if timeout is reached.
profile_name
(
str
) – the profile credential should be used for
PutRecord
(default: None).
Returns
An instance of IngestionManagerPandas.
Return type
IngestionManagerPandas
as_hive_ddl
(
database
=
'sagemaker_featurestore'
,
table_name
=
None
)
Generate Hive DDL commands to define or change structure of tables or databases in Hive.
Schema of the table is generated based on the feature definitions. Columns are named
after feature name and data-type are inferred based on feature type. Integral feature
type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type.
String feature type is mapped to STRING data-type.
Parameters
database
(
str
) – name of the database. If not set “sagemaker_featurestore” will be used.
table_name
(
Optional
[
str
]
) – name of the table. If not set the name of this feature group will be
used.
Returns
Generated create table DDL string.
Return type
class
sagemaker.feature_store.feature_group.
AthenaQuery
(
catalog
,
database
,
table_name
,
sagemaker_session
)
Bases:
object
Class to manage querying of feature store data with AWS Athena.
This class instantiates a AthenaQuery object that is used to retrieve data from feature store
via standard SQL queries.
Parameters
catalog
(
str
) –
database
(
str
) –
table_name
(
str
) –
sagemaker_session
(
Session
) –
run
(
query_string
,
output_location
,
kms_key
=
None
,
workgroup
=
None
)
Execute a SQL query given a query string, output location and kms key.
This method executes the SQL query using Athena and outputs the results to output_location
and returns the execution id of the query.
Parameters
query_string
(
str
) – SQL query string.
output_location
(
str
) – S3 URI of the query result.
kms_key
(
Optional
[
str
]
) – KMS key id. If set, will be used to encrypt the query result file.
workgroup
(
str
) – The name of the workgroup in which the query is being started.
Returns
Execution id of the query.
Return type
as_dataframe
(
**
kwargs
)
Download the result of the current query and load it into a DataFrame.
Parameters
**kwargs
(
object
) – key arguments used for the method pandas.read_csv to be able to
have a better tuning on data. For more info read:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
Returns
A pandas DataFrame contains the query result.
Return type
DataFrame
class
sagemaker.feature_store.feature_group.
IngestionManagerPandas
(
feature_group_name
,
feature_definitions
,
sagemaker_fs_runtime_client_config
=
None
,
sagemaker_session
=
None
,
max_workers
=
1
,
max_processes
=
1
,
profile_name
=
None
,
async_result
=
None
,
processing_pool
=
None
,
failed_indices
=
_Nothing.NOTHING
)
Bases:
object
Class to manage the multi-threaded data ingestion process.
This class will manage the data ingestion process which is multi-threaded.
Parameters
feature_group_name
(
str
) –
feature_definitions
(
Dict
[
str
,
Dict
[
Any
,
Any
]
]
) –
sagemaker_fs_runtime_client_config
(
Config
) –
sagemaker_session
(
Session
) –
max_workers
(
int
) –
max_processes
(
int
) –
profile_name
(
str
) –
async_result
(
AsyncResult
) –
processing_pool
(
ProcessingPool
) –
failed_indices
(
List
[
int
]
) –
feature_definitions
dictionary of feature definitions.
where the key is the feature name and the value is the FeatureDefinition.
The FeatureDefinition contains the data type of the feature.
Dict[
str
, Dict[Any, Any]]
Parameters
timeout
(
Union
[
int
,
float
]
) –
concurrent.futures.TimeoutError
will be raised
if timeout is reached.
run
(
data_frame
,
target_stores
=
None
,
wait
=
True
,
timeout
=
None
)
Start the ingestion process.
Parameters
data_frame
(
DataFrame
) – source DataFrame to be ingested.
target_stores
(
Sequence
[
TargetStoreEnum
]
) – list of target stores to be used for
the ingestion. If None, the default target store is used.
wait
(
bool
) – whether to wait for the ingestion to finish or not.
timeout
(
Union
[
int
,
float
]
) –
concurrent.futures.TimeoutError
will be raised
if timeout is reached.
class
sagemaker.feature_store.feature_definition.
FeatureDefinition
(
feature_name
,
feature_type
,
collection_type
=
None
)
Bases:
Config
Feature definition.
This instantiates a Feature Definition object where FeatureDefinition is a subclass of Config.
Parameters
feature_name
(
str
) –
feature_type
(
FeatureTypeEnum
) –
collection_type
(
CollectionType
) –
class
sagemaker.feature_store.feature_definition.
FractionalFeatureDefinition
(
feature_name
,
collection_type
=
None
)
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a FractionalFeatureDefinition object, a subclass of FeatureDefinition
where the data type of the feature being defined is a Fractional.
feature_name
The name of the feature
class
sagemaker.feature_store.feature_definition.
IntegralFeatureDefinition
(
feature_name
,
collection_type
=
None
)
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a IntegralFeatureDefinition object, a subclass of FeatureDefinition
where the data type of the feature being defined is a Integral.
feature_name
the name of the feature.
class
sagemaker.feature_store.feature_definition.
StringFeatureDefinition
(
feature_name
,
collection_type
=
None
)
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a StringFeatureDefinition object, a subclass of FeatureDefinition
where the data type of the feature being defined is a String.
feature_name
the name of the feature.
class
sagemaker.feature_store.feature_definition.
FeatureTypeEnum
(
value
)
Bases:
Enum
Enum of feature types.
The data type of a feature can be Fractional, Integral or String.
class
sagemaker.feature_store.feature_definition.
CollectionTypeEnum
(
value
)
Bases:
Enum
Enum of collection types.
The collection type of a feature can be List, Set or Vector.
class
sagemaker.feature_store.feature_definition.
CollectionType
(
collection_type
,
collection_config
)
Bases:
Config
Collection type and its configuration.
This initiates a collectiontype object where CollectionType is a subclass of Config.
Parameters
collection_type
(
CollectionTypeEnum
) –
collection_config
(
Dict
[
str
,
Any
]
) –
class
sagemaker.feature_store.feature_definition.
ListCollectionType
Bases:
CollectionType
List collection type
This class instantiates a ListCollectionType object, as subclass of CollectionType
where the collection type is defined as List.
Construct an instance of ListCollectionType.
class
sagemaker.feature_store.feature_definition.
SetCollectionType
Bases:
CollectionType
Set collection type
This class instantiates a SetCollectionType object, as subclass of CollectionType
where the collection type is defined as Set.
Construct an instance of SetCollectionType.
class
sagemaker.feature_store.feature_definition.
VectorCollectionType
(
dimension
)
Bases:
CollectionType
Vector collection type
This class instantiates a VectorCollectionType object, as subclass of CollectionType
where the collection type is defined as Vector.
Parameters
dimension
(
int
) –
class
sagemaker.feature_store.inputs.
Config
Bases:
ABC
Base config object for FeatureStore.
Configs must implement the to_dict method.
abstract
to_dict
(
)
Get the dictionary from attributes.
Returns
dict contains the attributes.
Return type
Dict
[
str
,
Any
]
class
sagemaker.feature_store.inputs.
DataCatalogConfig
(
table_name
=
_Nothing.NOTHING
,
catalog
=
_Nothing.NOTHING
,
database
=
_Nothing.NOTHING
)
Bases:
Config
DataCatalogConfig for FeatureStore.
Parameters
table_name
(
str
) –
catalog
(
str
) –
database
(
str
) –
class
sagemaker.feature_store.inputs.
OfflineStoreConfig
(
s3_storage_config
,
disable_glue_table_creation
=
False
,
data_catalog_config
=
None
,
table_format
=
None
)
Bases:
Config
OfflineStoreConfig for FeatureStore.
Parameters
s3_storage_config
(
S3StorageConfig
) –
disable_glue_table_creation
(
bool
) –
data_catalog_config
(
DataCatalogConfig
) –
table_format
(
TableFormatEnum
) –
class
sagemaker.feature_store.inputs.
ThroughputConfig
(
mode
=
None
,
provisioned_read_capacity_units
=
None
,
provisioned_write_capacity_units
=
None
)
Bases:
Config
Throughput configuration of the feature group.
Throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
read and write capacity units. ON_DEMAND works best for less predictable traffic,
while PROVISIONED works best for consistent and predictable traffic.
Parameters
mode
(
ThroughputModeEnum
) –
provisioned_read_capacity_units
(
int
) –
provisioned_write_capacity_units
(
int
) –
provisioned_read_capacity_units
For provisioned feature groups, this indicates
the read throughput you are billed for and can consume without throttling.
provisioned_write_capacity_units
For provisioned feature groups, this indicates
the write throughput you are billed for and can consume without throttling.
class
sagemaker.feature_store.inputs.
ThroughputConfigUpdate
(
mode
=
None
,
provisioned_read_capacity_units
=
None
,
provisioned_write_capacity_units
=
None
)
Bases:
Config
Target throughput configuration for the feature group.
Target throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
read and write capacity units. ON_DEMAND works best for less predictable traffic,
while PROVISIONED works best for consistent and predictable traffic.
Parameters
mode
(
ThroughputModeEnum
) –
provisioned_read_capacity_units
(
int
) –
provisioned_write_capacity_units
(
int
) –
provisioned_read_capacity_units
For provisioned feature groups, this indicates
the read throughput you are billed for and can consume without throttling.
provisioned_write_capacity_units
For provisioned feature groups, this indicates
the write throughput you are billed for and can consume without throttling.
class
sagemaker.feature_store.inputs.
OnlineStoreConfig
(
enable_online_store
=
True
,
online_store_security_config
=
None
,
ttl_duration
=
None
,
storage_type
=
None
)
Bases:
Config
OnlineStoreConfig for FeatureStore.
Parameters
enable_online_store
(
bool
) –
online_store_security_config
(
OnlineStoreSecurityConfig
) –
ttl_duration
(
TtlDuration
) –
storage_type
(
OnlineStoreStorageTypeEnum
) –
class
sagemaker.feature_store.inputs.
OnlineStoreSecurityConfig
(
kms_key_id
=
_Nothing.NOTHING
)
Bases:
Config
OnlineStoreSecurityConfig for FeatureStore.
Parameters
kms_key_id
(
str
) –
class
sagemaker.feature_store.inputs.
TtlDuration
(
unit
,
value
)
Bases:
Config
TtlDuration for records in online FeatureStore.
Parameters
unit
(
str
) –
value
(
int
) –
class
sagemaker.feature_store.inputs.
S3StorageConfig
(
s3_uri
,
kms_key_id
=
None
)
Bases:
Config
S3StorageConfig for FeatureStore.
Parameters
s3_uri
(
str
) –
kms_key_id
(
str
) –
class
sagemaker.feature_store.inputs.
FeatureValue
(
feature_name
=
None
,
value_as_string
=
None
,
value_as_string_list
=
None
)
Bases:
Config
FeatureValue for FeatureStore.
Parameters
feature_name
(
str
) –
value_as_string
(
str
) –
value_as_string_list
(
List
[
str
]
) –
class
sagemaker.feature_store.inputs.
TableFormatEnum
(
value
)
Bases:
Enum
Enum of table formats.
The offline store table formats can be Glue or Iceberg.
class
sagemaker.feature_store.inputs.
OnlineStoreStorageTypeEnum
(
value
)
Bases:
Enum
Enum of storage types for online store.
The online store storage types can be Standard or InMemory.
class
sagemaker.feature_store.inputs.
ThroughputModeEnum
(
value
)
Bases:
Enum
Enum of throughput modes supported by feature group.
Throughput mode of feature group can be ON_DEMAND or PROVISIONED.
class
sagemaker.feature_store.inputs.
ResourceEnum
(
value
)
Bases:
Enum
Enum of resources.
The data type of resource can be
FeatureGroup
or
FeatureMetadata
.
class
sagemaker.feature_store.inputs.
SearchOperatorEnum
(
value
)
Bases:
Enum
Enum of search operators.
The data type of search operator can be
And
or
Or
.
class
sagemaker.feature_store.inputs.
SortOrderEnum
(
value
)
Bases:
Enum
Enum of sort orders.
The data type of sort order can be
Ascending
or
Descending
.
class
sagemaker.feature_store.inputs.
FilterOperatorEnum
(
value
)
Bases:
Enum
Enum of filter operators.
The data type of filter operator can be
Equals
,
NotEquals
,
GreaterThan
,
GreaterThanOrEqualTo
,
LessThan
,
LessThanOrEqualTo
,
Contains
,
Exists
,
NotExists
, or
In
.
class
sagemaker.feature_store.inputs.
Filter
(
name
,
value
,
operator
=
None
)
Bases:
Config
Filter for FeatureStore search.
Parameters
name
(
str
) –
value
(
str
) –
operator
(
FilterOperatorEnum
) –
value
A value used with
Name
and
Operator
to determine which resources
satisfy the filter’s condition.
class
sagemaker.feature_store.inputs.
Identifier
(
feature_group_name
,
record_identifiers_value_as_string
,
feature_names
=
None
)
Bases:
Config
Identifier of batch get record API.
Parameters
feature_group_name
(
str
) –
record_identifiers_value_as_string
(
List
[
str
]
) –
feature_names
(
List
[
str
]
) –
class
sagemaker.feature_store.inputs.
FeatureParameter
(
key
=
None
,
value
=
None
)
Bases:
Config
FeatureParameter for FeatureStore.
Parameters
key
(
str
) –
value
(
str
) –
class
sagemaker.feature_store.dataset_builder.
DatasetBuilder
(
sagemaker_session
,
base
,
output_path
,
record_identifier_feature_name
=
None
,
event_time_identifier_feature_name
=
None
,
included_feature_names
=
None
,
kms_key_id
=
None
,
event_time_identifier_feature_type
=
None
)
Bases:
object
DatasetBuilder definition.
This class instantiates a DatasetBuilder object that comprises a base, a list of feature names,
an output path and a KMS key ID.
Parameters
sagemaker_session
(
Session
) –
base
(
Union
[
FeatureGroup
,
DataFrame
]
) –
output_path
(
str
) –
record_identifier_feature_name
(
str
) –
event_time_identifier_feature_name
(
str
) –
included_feature_names
(
List
[
str
]
) –
kms_key_id
(
str
) –
event_time_identifier_feature_type
(
FeatureTypeEnum
) –
_base
A base which can be either a FeatureGroup or a
pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.
Union[
FeatureGroup
, DataFrame]
_record_identifier_feature_name
A string representing the record identifier feature
if base is a DataFrame (default: None).
_event_time_identifier_feature_name
A string representing the event time identifier
feature if base is a DataFrame (default: None).
_included_feature_names
A list of strings representing features to be
included in the output. If not set, all features will be included in the output.
(default: None).
List[
str
]
_kms_key_id
A KMS key id. If set, will be used to encrypt the result file
(default: None).
_point_in_time_accurate_join
A boolean representing if point-in-time join
is applied to the resulting dataframe when calling “to_dataframe”.
When set to True, users can retrieve data using “row-level time travel”
according to the event times provided to the DatasetBuilder. This requires that the
entity dataframe with event times is submitted as the base in the constructor
(default: False).
_include_duplicated_records
A boolean representing whether the resulting dataframe
when calling “to_dataframe” should include duplicated records (default: False).
_include_deleted_records
A boolean representing whether the resulting
dataframe when calling “to_dataframe” should include deleted records (default: False).
_number_of_recent_records
An integer representing how many records will be
returned for each record identifier (default: 1).
_number_of_records
An integer representing the number of records that should be
returned in the resulting dataframe when calling “to_dataframe” (default: None).
_write_time_ending_timestamp
A datetime that represents the latest
write time for a record to be included in the resulting dataset. Records with a
newer write time will be omitted from the resulting dataset. (default: None).
datetime.datetime
_event_time_starting_timestamp
A datetime that represents the earliest
event time for a record to be included in the resulting dataset. Records
with an older event time will be omitted from the resulting dataset. (default: None).
datetime.datetime
_event_time_ending_timestamp
A datetime that represents the latest
event time for a record to be included in the resulting dataset. Records
with a newer event time will be omitted from the resulting dataset. (default: None).
datetime.datetime
_feature_groups_to_be_merged
A list of
FeatureGroupToBeMerged which will be joined to base (default: []).
List[FeatureGroupToBeMerged]
_event_time_identifier_feature_type
A FeatureTypeEnum representing the
type of event time identifier feature (default: None).
FeatureTypeEnum
with_feature_group
(
feature_group
,
target_feature_name_in_base
=
None
,
included_feature_names
=
None
,
feature_name_in_target
=
None
,
join_comparator
=
JoinComparatorEnum.EQUALS
,
join_type
=
JoinTypeEnum.INNER_JOIN
)
Join FeatureGroup with base.
Parameters
feature_group
(
FeatureGroup
) – A target FeatureGroup which will be joined to base.
target_feature_name_in_base
(
str
) – A string representing the feature name in base which
will be used as a join key (default: None).
included_feature_names
(
List
[
str
]
) – A list of strings representing features to be
included in the output (default: None).
feature_name_in_target
(
str
) – A string representing the feature name in the target
feature group that will be compared to the target feature in the base feature group.
If None is provided, the record identifier feature will be used in the
SQL join. (default: None).
join_comparator
(
JoinComparatorEnum
) – A JoinComparatorEnum representing the comparator
used when joining the target feature in the base feature group and the feature
in the target feature group. (default: JoinComparatorEnum.EQUALS).
join_type
(
JoinTypeEnum
) – A JoinTypeEnum representing the type of join between
the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).
Returns
– This DatasetBuilder object.
with_number_of_recent_records_by_record_identifier
(
number_of_recent_records
)
Set number_of_recent_records field with provided input.
Parameters
number_of_recent_records
(
int
) – An int that how many recent records will be returned for
each record identifier.
Returns
This DatasetBuilder object.
with_number_of_records_from_query_results
(
number_of_records
)
Set number_of_records field with provided input.
Parameters
number_of_records
(
int
) – An int that how many records will be returned.
Returns
This DatasetBuilder object.
Parameters
timestamp
(
datetime.datetime
) – A datetime that all records’ write time in dataset will
be before it.
Returns
This DatasetBuilder object.
with_event_time_range
(
starting_timestamp
=
None
,
ending_timestamp
=
None
)
Set event_time_starting_timestamp and event_time_ending_timestamp with provided inputs.
Parameters
starting_timestamp
(
datetime.datetime
) – A datetime that all records’ event time in
dataset will be after it (default: None).
ending_timestamp
(
datetime.datetime
) – A datetime that all records’ event time in dataset
will be before it (default: None).
Returns
This DatasetBuilder object.
class
sagemaker.feature_store.feature_store.
FeatureStore
(
sagemaker_session=<class
'sagemaker.session.Session'>
)
Bases:
object
FeatureStore definition.
This class instantiates a FeatureStore object that comprises a SageMaker session instance.
Parameters
sagemaker_session
(
Session
) –
create_dataset
(
base
,
output_path
,
record_identifier_feature_name
=
None
,
event_time_identifier_feature_name
=
None
,
included_feature_names
=
None
,
kms_key_id
=
None
)
Create a Dataset Builder for generating a Dataset.
Parameters
base
(
Union
[
FeatureGroup
,
DataFrame
]
) – A base which can be either a FeatureGroup or a
pandas.DataFrame and will be used to merge other FeatureGroups and generate a
Dataset.
output_path
(
str
) – An S3 URI which stores the output .csv file.
record_identifier_feature_name
(
str
) – A string representing the record identifier
feature if base is a DataFrame (default: None).
event_time_identifier_feature_name
(
str
) – A string representing the event time
identifier feature if base is a DataFrame (default: None).
included_feature_names
(
List
[
str
]
) – A list of features to be included in the output
(default: None).
kms_key_id
(
str
) – An KMS key id. If set, will be used to encrypt the result file
(default: None).
Raises
ValueError
– Base is a Pandas DataFrame but no record identifier feature name nor event
time identifier feature name is provided.
Return type
DatasetBuilder
list_feature_groups
(
name_contains
=
None
,
feature_group_status_equals
=
None
,
offline_store_status_equals
=
None
,
creation_time_after
=
None
,
creation_time_before
=
None
,
sort_order
=
None
,
sort_by
=
None
,
max_results
=
None
,
next_token
=
None
)
List all FeatureGroups satisfying given filters.
Parameters
name_contains
(
str
) – A string that partially matches one or more FeatureGroups’ names.
Filters FeatureGroups by name.
feature_group_status_equals
(
str
) – A FeatureGroup status.
Filters FeatureGroups by FeatureGroup status.
offline_store_status_equals
(
str
) – An OfflineStore status.
Filters FeatureGroups by OfflineStore status.
creation_time_after
(
datetime.datetime
) – Use this parameter to search for FeatureGroups
created after a specific date and time.
creation_time_before
(
datetime.datetime
) – Use this parameter to search for FeatureGroups
created before a specific date and time.
sort_order
(
str
) – The order in which FeatureGroups are listed.
sort_by
(
str
) – The value on which the FeatureGroup list is sorted.
max_results
(
int
) – The maximum number of results returned by ListFeatureGroups.
next_token
(
str
) – A token to resume pagination of ListFeatureGroups results.
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
batch_get_record
(
identifiers
,
expiration_time_response
=
None
)
Get record in batch from FeatureStore
Parameters
identifiers
(
Sequence
[
Identifier
]
) – A list of identifiers to uniquely identify records
in FeatureStore.
expiration_time_response
(
str
) – the field of expiration time response
to toggle returning of expiresAt.
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
search
(
resource
,
filters
=
None
,
operator
=
None
,
sort_by
=
None
,
sort_order
=
None
,
next_token
=
None
,
max_results
=
None
)
Search for FeatureGroups or FeatureMetadata satisfying given filters.
Parameters
resource
(
ResourceEnum
) – The name of the Amazon SageMaker resource to search for.
Valid values are
FeatureGroup
or
FeatureMetadata
.
filters
(
Sequence
[
Filter
]
) – A list of filter objects (Default: None).
operator
(
SearchOperatorEnum
) – A Boolean operator used to evaluate the filters.
Valid values are
And
or
Or
. The default is
And
(Default: None).
sort_by
(
str
) – The name of the resource property used to sort the
SearchResults
.
The default is
LastModifiedTime
.
sort_order
(
SortOrderEnum
) – How
SearchResults
are ordered.
Valid values are
Ascending
or
Descending
. The default is
Descending
.
next_token
(
str
) – If more than
MaxResults
resources match the specified
filters, the response includes a
NextToken
. The
NextToken
can be passed to
the next
SearchRequest
to continue retrieving results (Default: None).
max_results
(
int
) – The maximum number of results to return (Default: None).
Returns
Response dict from service.
Return type
Dict
[
str
,
Any
]
@
sagemaker.feature_store.feature_processor.
feature_processor
(
inputs
,
output
,
target_stores
=
None
,
parameters
=
None
,
enable_ingestion
=
True
,
spark_config
=
None
)
Decorator to facilitate feature engineering for Feature Groups.
If the decorated function is executed without arguments then the decorated function’s arguments
are automatically loaded from the input data sources. Outputs are ingested to the output Feature
Group. If arguments are provided to this function, then arguments are not automatically loaded
(for testing).
Decorated functions must conform to the expected signature. Parameters: one parameter of type
pyspark.sql.DataFrame for each DataSource in ‘inputs’; followed by the optional parameters with
names and types in [params: Dict[str, Any], spark: SparkSession]. Outputs: a single return
value of type pyspark.sql.DataFrame. The function can have any name.
Example:
@feature_processor(
inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)],
output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg'
def transform(
input_feature_group: DataFrame, input_csv: DataFrame, params: Dict[str, Any],
spark: SparkSession
) -> DataFrame:
return ...
More concisely:
@feature_processor(
inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)],
output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg'
def transform(input_feature_group, input_csv):
return ...
Parameters
inputs (Sequence[Union[FeatureGroupDataSource, CSVDataSource, ParquetDataSource, BaseDataSource]]) – A list of data sources.
output (str) – A Feature Group ARN to write results of this function to.
target_stores (Optional[list[str]], optional) – A list containing at least one of
‘OnlineStore’ or ‘OfflineStore’. If unspecified, data will be ingested to the enabled
stores of the output feature group. Defaults to None.
parameters (Optional[Dict[str, Union[str, Dict]]], optional) – Parameters to be provided to
the decorated function, available as the ‘params’ argument. Useful for parameterized
functions. The params argument also contains the set of system provided parameters
under the key ‘system’. E.g. ‘scheduled_time’: a timestamp representing the time that
the execution was scheduled to execute at, if triggered by a Scheduler, otherwise, the
current time.
enable_ingestion (bool, optional) – A boolean indicating whether the decorated function’s
return value is ingested to the ‘output’ Feature Group. This flag is useful during the
development phase to ensure that data is not used until the function is ready. It also
useful for users that want to manage their own data ingestion. Defaults to True.
spark_config (Dict[str, str]) – A dict contains the key-value paris for Spark configurations.
Raises
IngestionError – If any rows are not ingested successfully then a sample of the records,
with failure reasons, is logged.
Returns
The decorated function.
Return type
Callable
class sagemaker.feature_store.feature_processor.FeatureGroupDataSource(name, input_start_offset=None, input_end_offset=None)
Bases: object
A Feature Group data source definition for a FeatureProcessor.
Parameters
name (str) –
input_start_offset (Optional[str]) –
input_end_offset (Optional[str]) –
input_start_offset
A duration specified as a string in the
format ‘<no> <unit>’ where ‘no’ is a number and ‘unit’ is a unit of time in [‘hours’,
‘days’, ‘weeks’, ‘months’, ‘years’] (plural and singular forms). Inputs contain data
with event times no earlier than input_start_offset in the past. Offsets are relative
to the function execution time. If the function is executed by a Schedule, then the
offset is relative to the scheduled start time. Defaults to None.
Optional[str], optional
input_end_offset
The ‘end’ (as opposed to start) counterpart for
the ‘input_start_offset’. Inputs will contain records with event times no later than
‘input_end_offset’ in the past. Defaults to None.
Optional[str], optional
class sagemaker.feature_store.feature_processor.CSVDataSource(s3_uri, csv_header=True, csv_infer_schema=False)
Bases: object
An CSV data source definition for a FeatureProcessor.
Parameters
s3_uri (str) –
csv_header (bool) –
csv_infer_schema (bool) –
csv_header
Whether to read the first line of the CSV file as column names. This
option is only valid when file_format is set to csv. By default the value of this
option is true, and all column types are assumed to be a string.
infer_schema
Whether to infer the schema of the CSV data source. This option is only
valid when file_format is set to csv. If set to true, two passes of the data is required
to load and infer the schema.
class sagemaker.feature_store.feature_processor.ParquetDataSource(s3_uri)
Bases: object
An parquet data source definition for a FeatureProcessor.
Parameters
s3_uri (str) –
class sagemaker.feature_store.feature_processor.PySparkDataSource
Bases: BaseDataSource
[DataFrame
], ABC
Abstract base class for feature processor data sources.
Provides a skeleton for customization requiring the overriding of the method to read data from
data source and return the Spark DataFrame.
Method generated by attrs for class PySparkDataSource.
abstract read_data(spark, params=None)
Read data from data source and convert the data to Spark DataFrame.
Parameters
spark (SparkSession) – The Spark session to read the data.
params (Optional[Dict[str, Union[str, Dict]]]) – Parameters provided to the
feature_processor decorator.
Returns
The Spark DataFrame as an abstraction on the data source.
Return type
DataFrame
feature_processor.to_pipeline(step, role=None, transformation_code=None, max_retries=None, tags=None, sagemaker_session=None)
Creates a sagemaker pipeline that takes in a callable as a training step.
To configure training step used in sagemaker pipeline, input argument step needs to be wrapped
by remote decorator in module sagemaker.remote_function. If not wrapped by remote decorator,
default configurations in sagemaker.remote_function.job._JobSettings will be used to create
training step.
Parameters
pipeline_name (str) – The name of the pipeline.
step (Callable) – A user provided function wrapped by feature_processor and optionally
wrapped by remote_decorator.
role (Optional[str]) – The Amazon Resource Name (ARN) of the role used by the pipeline to
access and create resources. If not specified, it will default to the credentials
provided by the AWS configuration chain.
transformation_code (Optional[str]) – The data source for a reference to the transformation
code for Lineage tracking. This code is not used for actual transformation.
max_retries (Optional[int]) – The number of times to retry sagemaker pipeline step.
If not specified, sagemaker pipline step will not retry.
tags (List[Tuple[str, str]) – A list of tags attached to the pipeline and all corresponding
lineage resources that support tags. If not specified, no custom tags will be attached.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
SageMaker Pipeline ARN.
Return type
feature_processor.schedule(schedule_expression, role_arn=None, state='ENABLED', start_date=None, sagemaker_session=None)
Creates an EventBridge Schedule that schedules executions of a sagemaker pipeline.
The pipeline created will also have a pipeline parameter scheduled-time indicating when the
pipeline is scheduled to run.
Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be scheduled.
schedule_expression (str) – The expression that defines when the schedule runs. It supports
at expression, rate expression and cron expression. See the
CreateSchedule API
for more details.
state (str) – Specifies whether the schedule is enabled or disabled. Valid values are
ENABLED and DISABLED. See the State request parameter
for more details. If not specified, it will default to ENABLED.
start_date (Optional[datetime]) – The date, in UTC, after which the schedule can begin
invoking its target. Depending on the schedule’s recurrence expression, invocations
might occur on, or after, the StartDate you specify.
role_arn (Optional[str]) – The Amazon Resource Name (ARN) of the IAM role that EventBridge
Scheduler will assume for this target when the schedule is invoked.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
The EventBridge Schedule ARN.
Return type
feature_processor.execute(execution_time=None, sagemaker_session=None)
Starts an execution of a SageMaker Pipeline created by feature_processor
Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
execution_time (datetime) – The date, in UTC, will be used as a sagemaker pipeline parameter
indicating the time which at which the execution is scheduled to execute. If not
specified, it will default to the current timestamp.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
The pipeline execution ARN.
Return type
feature_processor.delete_schedule(sagemaker_session=None)
Delete EventBridge Schedule corresponding to a SageMaker Pipeline if there is one.
Parameters
pipeline_name (str) – The name of the SageMaker Pipeline that needs to be deleted
sagemaker_session (Optional[Session]) – (Optional[Session], optional): Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Return type
feature_processor.describe(sagemaker_session=None)
Describe feature processor and other related resources.
This API will include details related to the feature processor including SageMaker Pipeline and
EventBridge Schedule.
Parameters
pipeline_name (str) – Name of the pipeline.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
Return information for resources related to feature processor.
Return type
Dict[str, Union[int, str]]
feature_processor.list_pipelines()
Lists all SageMaker Pipelines created by Feature Processor SDK.
Parameters
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
Return list of SageMaker Pipeline metadata created forfeature_processor.
feature_processor.put_trigger(target_pipeline, target_pipeline_parameters=None, state='ENABLED', event_pattern=None, role_arn=None, sagemaker_session=None)
Creates an event based trigger that triggers executions of a sagemaker pipeline.
Parameters
source_pipeline_events (List[FeatureProcessorPipelineEvents]) – The list of
FeatureProcessorPipelineEvents that will trigger the target_pipeline.
target_pipeline (str) – The name of the SageMaker Pipeline that will be triggered.
target_pipeline_parameters (Optional[Dict[str, str]]) – The list of parameters to start
execution of a pipeline.
state (Optional[str]) – Indicates whether the rule is enabled or disabled.
If not specified, it will default to ENABLED.
event_pattern (Optional[str]) – The EventBridge EventPattern that triggers the
target_pipeline. If specified, will override source_pipeline_events. For more
information, see
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
in the Amazon EventBridge User Guide.
role_arn (Optional[str]) – The Amazon Resource Name (ARN) of the IAM role that EventBridge
Scheduler will assume for this target when the schedule is invoked.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Returns
The EventBridge Rule ARN.
Return type
feature_processor.enable_trigger(sagemaker_session=None)
Enable the EventBridge Rule that is associated with the pipeline.
Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Return type
feature_processor.disable_trigger(sagemaker_session=None)
Disable the EventBridge Rule that is associated with the pipeline.
Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
sagemaker_session (Optional[Session]) – Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Return type
feature_processor.delete_trigger(sagemaker_session=None)
Delete EventBridge Rule corresponding to a SageMaker Pipeline if there is one.
Parameters
pipeline_name (str) – The name of the SageMaker Pipeline that needs to be deleted
sagemaker_session (Optional[Session]) – (Optional[Session], optional): Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Return type