Feast
zenml.integrations.feast
special
Initialization for Feast integration.
The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.
FeastIntegration (Integration)
Definition of Feast integration for ZenML.
Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
"""Definition of Feast integration for ZenML."""
NAME = FEAST
REQUIREMENTS = ["feast[redis]>=0.26.0", "redis-server>=6.0.9"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor
return [FeastFeatureStoreFlavor]
flavors()
classmethod
Declare the stack component flavors for the Feast integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/feast/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor
return [FeastFeatureStoreFlavor]
feature_stores
special
Feast Feature Store integration for ZenML.
Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.
feast_feature_store
Implementation of the Feast Feature Store for ZenML.
FeastFeatureStore (BaseFeatureStore)
Class to interact with the Feast feature store.
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
"""Class to interact with the Feast feature store."""
@property
def config(self) -> FeastFeatureStoreConfig:
"""Returns the `FeastFeatureStoreConfig` config.
Returns:
The configuration.
"""
return cast(FeastFeatureStoreConfig, self._config)
def _validate_connection(self) -> None:
"""Validates the connection to the feature store.
Raises:
ConnectionError: If the online component (Redis) is not available.
"""
client = redis.Redis(
host=self.config.online_host, port=self.config.online_port
)
try:
client.ping()
except redis.exceptions.ConnectionError as e:
raise redis.exceptions.ConnectionError(
"Could not connect to feature store's online component. "
"Please make sure that Redis is running."
) from e
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_entities()]
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.project)
def get_registry(self) -> BaseRegistry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
return fs.registry
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.version())
config: FeastFeatureStoreConfig
property
readonly
Returns the FeastFeatureStoreConfig
config.
Returns:
Type | Description |
---|---|
FeastFeatureStoreConfig |
The configuration. |
get_data_sources(self)
Returns the data sources' names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The data sources' names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
get_entities(self)
Returns the entity names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The entity names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_entities()]
get_feast_version(self)
Returns the version of Feast used.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The version of Feast currently being used. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.version())
get_feature_services(self)
Returns the feature service names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature service names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)
Returns the feature view names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature view names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)
Returns the historical features for training or batch scoring.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_df |
Union[pandas.core.frame.DataFrame, str] |
The entity DataFrame or entity name. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
DataFrame |
The historical features as a Pandas DataFrame. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)
Returns the latest online feature data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_rows |
List[Dict[str, Any]] |
The entity rows to retrieve. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The latest online feature data as a dictionary. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
get_project(self)
Returns the project name.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The project name. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.project)
get_registry(self)
Returns the feature store registry.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
BaseRegistry |
The registry. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> BaseRegistry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
return fs.registry
flavors
special
Feast integration flavors.
feast_feature_store_flavor
Feast feature store flavor.
FeastFeatureStoreConfig (BaseFeatureStoreConfig)
pydantic-model
Config for Feast feature store.
Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreConfig(BaseFeatureStoreConfig):
"""Config for Feast feature store."""
online_host: str = "localhost"
online_port: int = 6379
feast_repo: str
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return (
self.online_host == "localhost" or self.online_host == "127.0.0.1"
)
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
FeastFeatureStoreFlavor (BaseFeatureStoreFlavor)
Feast Feature store flavor.
Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreFlavor(BaseFeatureStoreFlavor):
"""Feast Feature store flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return FEAST_FEATURE_STORE_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/feature_store/feast.png"
@property
def config_class(self) -> Type[FeastFeatureStoreConfig]:
"""Returns FeastFeatureStoreConfig config class.
Returns:
The config class.
"""
"""Config class for this flavor."""
return FeastFeatureStoreConfig
@property
def implementation_class(self) -> Type["FeastFeatureStore"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.feast.feature_stores import FeastFeatureStore
return FeastFeatureStore
config_class: Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig]
property
readonly
Returns FeastFeatureStoreConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[FeastFeatureStore]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[FeastFeatureStore] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |