Spark
        zenml.integrations.spark
  
      special
  
    The Spark integration module to enable distributed processing for steps.
        
SparkIntegration            (Integration)
        
    Definition of Spark integration for ZenML.
Source code in zenml/integrations/spark/__init__.py
          class SparkIntegration(Integration):
    """Definition of Spark integration for ZenML."""
    NAME = SPARK
    REQUIREMENTS = ["pyspark==3.2.1"]
    @classmethod
    def activate(cls) -> None:
        """Activating the corresponding Spark materializers."""
        from zenml.integrations.spark import materializers  # noqa
    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Spark integration.
        Returns:
            The flavor wrapper for the step operator flavor
        """
        from zenml.integrations.spark.flavors import (
            KubernetesSparkStepOperatorFlavor,
        )
        return [KubernetesSparkStepOperatorFlavor]
activate()
  
      classmethod
  
    Activating the corresponding Spark materializers.
Source code in zenml/integrations/spark/__init__.py
          @classmethod
def activate(cls) -> None:
    """Activating the corresponding Spark materializers."""
    from zenml.integrations.spark import materializers  # noqa
flavors()
  
      classmethod
  
    Declare the stack component flavors for the Spark integration.
Returns:
| Type | Description | 
|---|---|
| List[Type[zenml.stack.flavor.Flavor]] | The flavor wrapper for the step operator flavor | 
Source code in zenml/integrations/spark/__init__.py
          @classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Spark integration.
    Returns:
        The flavor wrapper for the step operator flavor
    """
    from zenml.integrations.spark.flavors import (
        KubernetesSparkStepOperatorFlavor,
    )
    return [KubernetesSparkStepOperatorFlavor]
        flavors
  
      special
  
    Spark integration flavors.
        spark_on_kubernetes_step_operator_flavor
    Spark on Kubernetes step operator flavor.
        
KubernetesSparkStepOperatorConfig            (SparkStepOperatorConfig)
        
    Config for the Kubernetes Spark step operator.
Attributes:
| Name | Type | Description | 
|---|---|---|
| namespace | Optional[str] | the namespace under which the driver and executor pods will run. | 
| service_account | Optional[str] | the service account that will be used by various Spark components (to create and watch the pods). | 
Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
          class KubernetesSparkStepOperatorConfig(SparkStepOperatorConfig):
    """Config for the Kubernetes Spark step operator.
    Attributes:
        namespace: the namespace under which the driver and executor pods
            will run.
        service_account: the service account that will be used by various Spark
            components (to create and watch the pods).
    """
    namespace: Optional[str] = None
    service_account: Optional[str] = None
        
KubernetesSparkStepOperatorFlavor            (SparkStepOperatorFlavor)
        
    Flavor for the Kubernetes Spark step operator.
Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
          class KubernetesSparkStepOperatorFlavor(SparkStepOperatorFlavor):
    """Flavor for the Kubernetes Spark step operator."""
    @property
    def name(self) -> str:
        """Name of the flavor.
        Returns:
            The name of the flavor.
        """
        return SPARK_KUBERNETES_STEP_OPERATOR
    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.
        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()
    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.
        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()
    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.
        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/spark.png"
    @property
    def config_class(self) -> Type[KubernetesSparkStepOperatorConfig]:
        """Returns `KubernetesSparkStepOperatorConfig` config class.
        Returns:
                The config class.
        """
        return KubernetesSparkStepOperatorConfig
    @property
    def implementation_class(self) -> Type["KubernetesSparkStepOperator"]:
        """Implementation class for this flavor.
        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators import (
            KubernetesSparkStepOperator,
        )
        return KubernetesSparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig]
  
      property
      readonly
  
    Returns KubernetesSparkStepOperatorConfig config class.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig] | The config class. | 
docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor docs url. | 
implementation_class: Type[KubernetesSparkStepOperator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[KubernetesSparkStepOperator] | The implementation class. | 
logo_url: str
  
      property
      readonly
  
    A url to represent the flavor in the dashboard.
Returns:
| Type | Description | 
|---|---|
| str | The flavor logo. | 
name: str
  
      property
      readonly
  
    Name of the flavor.
Returns:
| Type | Description | 
|---|---|
| str | The name of the flavor. | 
sdk_docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at SDK docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor SDK docs url. | 
        spark_step_operator_flavor
    Spark step operator flavor.
        
SparkStepOperatorConfig            (BaseStepOperatorConfig, SparkStepOperatorSettings)
        
    Spark step operator config.
Attributes:
| Name | Type | Description | 
|---|---|---|
| master | str | is the master URL for the cluster. You might see different schemes for different cluster managers which are supported by Spark like Mesos, YARN, or Kubernetes. Within the context of this PR, the implementation supports Kubernetes as a cluster manager. | 
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
          class SparkStepOperatorConfig(
    BaseStepOperatorConfig, SparkStepOperatorSettings
):
    """Spark step operator config.
    Attributes:
        master: is the master URL for the cluster. You might see different
            schemes for different cluster managers which are supported by Spark
            like Mesos, YARN, or Kubernetes. Within the context of this PR,
            the implementation supports Kubernetes as a cluster manager.
    """
    master: str
        
SparkStepOperatorFlavor            (BaseStepOperatorFlavor)
        
    Spark step operator flavor.
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
          class SparkStepOperatorFlavor(BaseStepOperatorFlavor):
    """Spark step operator flavor."""
    @property
    def name(self) -> str:
        """Name of the flavor.
        Returns:
            The name of the flavor.
        """
        return "spark"
    @property
    def config_class(self) -> Type[SparkStepOperatorConfig]:
        """Returns `SparkStepOperatorConfig` config class.
        Returns:
                The config class.
        """
        return SparkStepOperatorConfig
    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.
        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()
    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.
        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()
    @property
    def implementation_class(self) -> Type["SparkStepOperator"]:
        """Implementation class for this flavor.
        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators.spark_step_operator import (
            SparkStepOperator,
        )
        return SparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig]
  
      property
      readonly
  
    Returns SparkStepOperatorConfig config class.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig] | The config class. | 
docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor docs url. | 
implementation_class: Type[SparkStepOperator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[SparkStepOperator] | The implementation class. | 
name: str
  
      property
      readonly
  
    Name of the flavor.
Returns:
| Type | Description | 
|---|---|
| str | The name of the flavor. | 
sdk_docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at SDK docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor SDK docs url. | 
        
SparkStepOperatorSettings            (BaseSettings)
        
    Spark step operator settings.
Attributes:
| Name | Type | Description | 
|---|---|---|
| deploy_mode | str | can either be 'cluster' (default) or 'client' and it decides where the driver node of the application will run. | 
| submit_kwargs | Optional[Dict[str, Any]] | is the JSON string of a dict, which will be used to define additional params if required (Spark has quite a lot of different parameters, so including them, all in the step operator was not implemented). | 
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
          class SparkStepOperatorSettings(BaseSettings):
    """Spark step operator settings.
    Attributes:
        deploy_mode: can either be 'cluster' (default) or 'client' and it
            decides where the driver node of the application will run.
        submit_kwargs: is the JSON string of a dict, which will be used
            to define additional params if required (Spark has quite a
            lot of different parameters, so including them, all in the step
            operator was not implemented).
    """
    deploy_mode: str = "cluster"
    submit_kwargs: Optional[Dict[str, Any]] = None
        materializers
  
      special
  
    Spark Materializers.
        spark_dataframe_materializer
    Implementation of the Spark Dataframe Materializer.
        
SparkDataFrameMaterializer            (BaseMaterializer)
        
    Materializer to read/write Spark dataframes.
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
          class SparkDataFrameMaterializer(BaseMaterializer):
    """Materializer to read/write Spark dataframes."""
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (DataFrame,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    def load(self, data_type: Type[Any]) -> DataFrame:
        """Reads and returns a spark dataframe.
        Args:
            data_type: The type of the data to read.
        Returns:
            A loaded spark dataframe.
        """
        # Create the Spark session
        spark = SparkSession.builder.getOrCreate()
        # Read the data
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        return spark.read.parquet(path)
    def save(self, df: DataFrame) -> None:
        """Writes a spark dataframe.
        Args:
            df: A spark dataframe object.
        """
        # Write the dataframe to the artifact store
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        df.write.parquet(path)
    def extract_metadata(self, df: DataFrame) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `DataFrame` object.
        Args:
            df: The `DataFrame` object to extract metadata from.
        Returns:
            The extracted metadata as a dictionary.
        """
        return {
            "shape": (df.count(), len(df.columns)),
        }
extract_metadata(self, df)
    Extract metadata from the given DataFrame object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| df | pyspark.sql.DataFrame | The  | required | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, MetadataType] | The extracted metadata as a dictionary. | 
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
          def extract_metadata(self, df: DataFrame) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `DataFrame` object.
    Args:
        df: The `DataFrame` object to extract metadata from.
    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "shape": (df.count(), len(df.columns)),
    }
load(self, data_type)
    Reads and returns a spark dataframe.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| data_type | Type[Any] | The type of the data to read. | required | 
Returns:
| Type | Description | 
|---|---|
| pyspark.sql.DataFrame | A loaded spark dataframe. | 
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
          def load(self, data_type: Type[Any]) -> DataFrame:
    """Reads and returns a spark dataframe.
    Args:
        data_type: The type of the data to read.
    Returns:
        A loaded spark dataframe.
    """
    # Create the Spark session
    spark = SparkSession.builder.getOrCreate()
    # Read the data
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    return spark.read.parquet(path)
save(self, df)
    Writes a spark dataframe.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| df | pyspark.sql.DataFrame | A spark dataframe object. | required | 
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
          def save(self, df: DataFrame) -> None:
    """Writes a spark dataframe.
    Args:
        df: A spark dataframe object.
    """
    # Write the dataframe to the artifact store
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    df.write.parquet(path)
        spark_model_materializer
    Implementation of the Spark Model Materializer.
        
SparkModelMaterializer            (BaseMaterializer)
        
    Materializer to read/write Spark models.
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
          class SparkModelMaterializer(BaseMaterializer):
    """Materializer to read/write Spark models."""
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        Transformer,
        Estimator,
        Model,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL
    def load(
        self, model_type: Type[Any]
    ) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
        """Reads and returns a Spark ML model.
        Args:
            model_type: The type of the model to read.
        Returns:
            A loaded spark model.
        """
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        return model_type.load(path)  # type: ignore[no-any-return]
    def save(
        self,
        model: Union[Transformer, Estimator, Model],  # type: ignore[type-arg]
    ) -> None:
        """Writes a spark model.
        Args:
            model: A spark model.
        """
        # Write the dataframe to the artifact store
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        model.save(path)  # type: ignore[union-attr]
load(self, model_type)
    Reads and returns a Spark ML model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model_type | Type[Any] | The type of the model to read. | required | 
Returns:
| Type | Description | 
|---|---|
| Union[pyspark.ml.Transformer, pyspark.ml.Estimator, pyspark.ml.Model] | A loaded spark model. | 
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
          def load(
    self, model_type: Type[Any]
) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
    """Reads and returns a Spark ML model.
    Args:
        model_type: The type of the model to read.
    Returns:
        A loaded spark model.
    """
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    return model_type.load(path)  # type: ignore[no-any-return]
save(self, model)
    Writes a spark model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model | Union[pyspark.ml.Transformer, pyspark.ml.Estimator, pyspark.ml.Model] | A spark model. | required | 
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
          def save(
    self,
    model: Union[Transformer, Estimator, Model],  # type: ignore[type-arg]
) -> None:
    """Writes a spark model.
    Args:
        model: A spark model.
    """
    # Write the dataframe to the artifact store
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    model.save(path)  # type: ignore[union-attr]
        step_operators
  
      special
  
    Spark Step Operators.
        kubernetes_step_operator
    Implementation of the Kubernetes Spark Step Operator.
        
KubernetesSparkStepOperator            (SparkStepOperator)
        
    Step operator which runs Steps with Spark on Kubernetes.
Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
          class KubernetesSparkStepOperator(SparkStepOperator):
    """Step operator which runs Steps with Spark on Kubernetes."""
    @property
    def config(self) -> KubernetesSparkStepOperatorConfig:
        """Returns the `KubernetesSparkStepOperatorConfig` config.
        Returns:
            The configuration.
        """
        return cast(KubernetesSparkStepOperatorConfig, self._config)
    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.
        Returns:
            A validator that checks that the stack contains a remote container
            registry and a remote artifact store.
        """
        def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the Spark "
                    "step operator."
                )
            container_registry = stack.container_registry
            assert container_registry is not None
            if container_registry.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "Spark step operator."
                )
            return True, ""
        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )
    @property
    def application_path(self) -> Any:
        """Provides the application path in the corresponding docker image.
        Returns:
            The path to the application entrypoint within the docker image
        """
        return f"local://{DOCKER_IMAGE_WORKDIR}/{ENTRYPOINT_NAME}"
    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.
        Args:
            deployment: The pipeline deployment for which to get the builds.
        Returns:
            The required Docker builds.
        """
        from zenml.config.build_configuration import BuildConfiguration
        builds = []
        extra_files = {ENTRYPOINT_NAME: LOCAL_ENTRYPOINT}
        for step_name, step in deployment.step_configurations.items():
            if step.config.step_operator == self.name:
                build = BuildConfiguration(
                    key=SPARK_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                    extra_files=extra_files,
                )
                builds.append(build)
        return builds
    def _backend_configuration(
        self,
        spark_config: SparkConf,
        info: "StepRunInfo",
        environment: Dict[str, str],
    ) -> None:
        """Configures Spark to run on Kubernetes.
        This method will build and push a docker image for the drivers and
        executors and adjust the config accordingly.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            info: Information about the step run.
            environment: Environment variables to set in the executor
                environment.
        """
        image_name = info.get_image(key=SPARK_DOCKER_IMAGE_KEY)
        # Adjust the spark configuration
        spark_config.set("spark.kubernetes.container.image", image_name)
        if self.config.namespace:
            spark_config.set(
                "spark.kubernetes.namespace",
                self.config.namespace,
            )
        if self.config.service_account:
            spark_config.set(
                "spark.kubernetes.authenticate.driver.serviceAccountName",
                self.config.service_account,
            )
        for key, value in environment.items():
            spark_config.set(f"spark.executorEnv.{key}", value)
application_path: Any
  
      property
      readonly
  
    Provides the application path in the corresponding docker image.
Returns:
| Type | Description | 
|---|---|
| Any | The path to the application entrypoint within the docker image | 
config: KubernetesSparkStepOperatorConfig
  
      property
      readonly
  
    Returns the KubernetesSparkStepOperatorConfig config.
Returns:
| Type | Description | 
|---|---|
| KubernetesSparkStepOperatorConfig | The configuration. | 
validator: Optional[zenml.stack.stack_validator.StackValidator]
  
      property
      readonly
  
    Validates the stack.
Returns:
| Type | Description | 
|---|---|
| Optional[zenml.stack.stack_validator.StackValidator] | A validator that checks that the stack contains a remote container registry and a remote artifact store. | 
get_docker_builds(self, deployment)
    Gets the Docker builds required for the component.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The pipeline deployment for which to get the builds. | required | 
Returns:
| Type | Description | 
|---|---|
| List[BuildConfiguration] | The required Docker builds. | 
Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
          def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.
    Args:
        deployment: The pipeline deployment for which to get the builds.
    Returns:
        The required Docker builds.
    """
    from zenml.config.build_configuration import BuildConfiguration
    builds = []
    extra_files = {ENTRYPOINT_NAME: LOCAL_ENTRYPOINT}
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=SPARK_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
                extra_files=extra_files,
            )
            builds.append(build)
    return builds
        spark_entrypoint_configuration
    Spark step operator entrypoint configuration.
        
SparkEntrypointConfiguration            (StepOperatorEntrypointConfiguration)
        
    Entrypoint configuration for the Spark step operator.
Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
          class SparkEntrypointConfiguration(StepOperatorEntrypointConfiguration):
    """Entrypoint configuration for the Spark step operator."""
    def run(self) -> None:
        """Runs the entrypoint configuration.
        This prepends the directory containing the source files to the python
        path so that spark can find them.
        """
        with source_utils.prepend_python_path(DOCKER_IMAGE_WORKDIR):
            super().run()
run(self)
    Runs the entrypoint configuration.
This prepends the directory containing the source files to the python path so that spark can find them.
Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
          def run(self) -> None:
    """Runs the entrypoint configuration.
    This prepends the directory containing the source files to the python
    path so that spark can find them.
    """
    with source_utils.prepend_python_path(DOCKER_IMAGE_WORKDIR):
        super().run()
        spark_step_operator
    Implementation of the Spark Step Operator.
        
SparkStepOperator            (BaseStepOperator)
        
    Base class for all Spark-related step operators.
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
          class SparkStepOperator(BaseStepOperator):
    """Base class for all Spark-related step operators."""
    @property
    def config(self) -> SparkStepOperatorConfig:
        """Returns the `SparkStepOperatorConfig` config.
        Returns:
            The configuration.
        """
        return cast(SparkStepOperatorConfig, self._config)
    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Spark step operator.
        Returns:
            The settings class.
        """
        return SparkStepOperatorSettings
    @property
    def application_path(self) -> Optional[str]:
        """Optional method for providing the application path.
        This is especially critical when using 'spark-submit' as it defines the
        path (to the application in the environment where Spark is running)
        which is used within the command.
        For more information on how to set this property please check:
        https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
        Returns:
            The path to the application entrypoint
        """
        return None
    def _resource_configuration(
        self,
        spark_config: SparkConf,
        resource_settings: "ResourceSettings",
    ) -> None:
        """Configures Spark to handle the resource settings.
        This should serve as the layer between our ResourceSettings
        and Spark's own ways of configuring its resources.
        Note: This is still work-in-progress. In the future, we would like to
        enable much more than executor cores and memory with a dedicated
        ResourceSettings object.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            resource_settings: the resource settings for this step
        """
        if resource_settings.cpu_count:
            spark_config.set(
                "spark.executor.cores",
                str(int(resource_settings.cpu_count)),
            )
        if resource_settings.memory:
            # TODO[LOW]: Fix the conversion of the memory unit with a new
            #   type of resource configuration.
            spark_config.set(
                "spark.executor.memory",
                resource_settings.memory.lower().strip("b"),
            )
    def _backend_configuration(
        self,
        spark_config: SparkConf,
        info: "StepRunInfo",
        environment: Dict[str, str],
    ) -> None:
        """Configures Spark to handle backends like YARN, Mesos or Kubernetes.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            info: Information about the step run.
            environment: Environment variables to set.
        """
    def _io_configuration(self, spark_config: SparkConf) -> None:
        """Configures Spark to handle different input/output sources.
        When you work with the Spark integration, you get materializers
        such as SparkDataFrameMaterializer, SparkModelMaterializer. However, in
        many cases, these materializer work only if the environment, where
        Spark is running, is configured according to the artifact store.
        Take s3 as an example. When you want to save a dataframe to an S3
        artifact store, you need to provide configuration parameters such as,
        '"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" to
        Spark. This method aims to provide these configuration parameters.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
        Raises:
            RuntimeError: when the step operator is being used with an S3
                artifact store and the artifact store does not have the
                required authentication
        """
        # Get active artifact store
        client = Client()
        artifact_store = client.active_stack.artifact_store
        from zenml.integrations.s3 import S3_ARTIFACT_STORE_FLAVOR
        # If S3, preconfigure the spark session
        if artifact_store.flavor == S3_ARTIFACT_STORE_FLAVOR:
            (
                key,
                secret,
                _,
            ) = artifact_store._get_credentials()  # type:ignore[attr-defined]
            if key and secret:
                spark_config.setAll(
                    [
                        ("spark.hadoop.fs.s3a.fast.upload", "true"),
                        (
                            "spark.hadoop.fs.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3AFileSystem",
                        ),
                        (
                            "spark.hadoop.fs.AbstractFileSystem.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3A",
                        ),
                        (
                            "spark.hadoop.fs.s3a.aws.credentials.provider",
                            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
                        ),
                        ("spark.hadoop.fs.s3a.access.key", f"{key}"),
                        ("spark.hadoop.fs.s3a.secret.key", f"{secret}"),
                    ]
                )
            else:
                raise RuntimeError(
                    "When you use an Spark step operator with an S3 artifact "
                    "store, please make sure that your artifact store has"
                    "defined the required credentials namely the access key "
                    "and the secret access key."
                )
        else:
            logger.warning(
                "In most cases, the Spark step operator requires additional "
                "configuration based on the artifact store flavor you are "
                "using. That also means, that when you use this step operator "
                "with certain artifact store flavor, ZenML can take care of "
                "the pre-configuration. However, the artifact store flavor "
                f"'{artifact_store.flavor}' featured in this stack is not "
                f"known to this step operator and it might require additional "
                f"configuration."
            )
    def _additional_configuration(
        self, spark_config: SparkConf, settings: SparkStepOperatorSettings
    ) -> None:
        """Appends the user-defined configuration parameters.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            settings: Step operator settings for the current step run.
        """
        # Add the additional parameters
        if settings.submit_kwargs:
            for k, v in settings.submit_kwargs.items():
                spark_config.set(k, v)
    def _launch_spark_job(
        self,
        spark_config: SparkConf,
        deploy_mode: str,
        entrypoint_command: List[str],
    ) -> None:
        """Generates and executes a spark-submit command.
        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            deploy_mode: The spark deploy mode to use.
            entrypoint_command: The entrypoint command to run.
        Raises:
            RuntimeError: if the spark-submit fails
        """
        # Base spark-submit command
        command = [
            f"spark-submit "
            f"--master {self.config.master} "
            f"--deploy-mode {deploy_mode}"
        ]
        # Add the configuration parameters
        command += [f"--conf {c[0]}={c[1]}" for c in spark_config.getAll()]
        # Add the application path
        command.append(self.application_path)  # type: ignore[arg-type]
        # Update the default step operator command to use the spark entrypoint
        # configuration
        original_args = SparkEntrypointConfiguration._parse_arguments(
            entrypoint_command
        )
        command += SparkEntrypointConfiguration.get_entrypoint_arguments(
            **original_args
        )
        final_command = " ".join(command)
        # Execute the spark-submit
        process = subprocess.Popen(
            final_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
            shell=True,  # nosec
        )
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            raise RuntimeError(stderr)
        print(stdout)
    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Launches a step on Spark.
        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
        settings = cast(SparkStepOperatorSettings, self.get_settings(info))
        # Start off with an empty configuration
        conf = SparkConf()
        # Add the resource configuration such as cores, memory.
        self._resource_configuration(
            spark_config=conf,
            resource_settings=info.config.resource_settings,
        )
        # Add the backend configuration such as namespace, docker images names.
        self._backend_configuration(
            spark_config=conf, info=info, environment=environment
        )
        # Add the IO configuration for the inputs and the outputs
        self._io_configuration(
            spark_config=conf,
        )
        # Add any additional configuration given by the user.
        self._additional_configuration(spark_config=conf, settings=settings)
        info.force_write_logs()
        # Generate a spark-submit command given the configuration
        self._launch_spark_job(
            spark_config=conf,
            deploy_mode=settings.deploy_mode,
            entrypoint_command=entrypoint_command,
        )
application_path: Optional[str]
  
      property
      readonly
  
    Optional method for providing the application path.
This is especially critical when using 'spark-submit' as it defines the path (to the application in the environment where Spark is running) which is used within the command.
For more information on how to set this property please check:
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the application entrypoint | 
config: SparkStepOperatorConfig
  
      property
      readonly
  
    Returns the SparkStepOperatorConfig config.
Returns:
| Type | Description | 
|---|---|
| SparkStepOperatorConfig | The configuration. | 
settings_class: Optional[Type[BaseSettings]]
  
      property
      readonly
  
    Settings class for the Spark step operator.
Returns:
| Type | Description | 
|---|---|
| Optional[Type[BaseSettings]] | The settings class. | 
launch(self, info, entrypoint_command, environment)
    Launches a step on Spark.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| info | StepRunInfo | Information about the step run. | required | 
| entrypoint_command | List[str] | Command that executes the step. | required | 
| environment | Dict[str, str] | Environment variables to set in the step operator environment. | required | 
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
          def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Spark.
    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(SparkStepOperatorSettings, self.get_settings(info))
    # Start off with an empty configuration
    conf = SparkConf()
    # Add the resource configuration such as cores, memory.
    self._resource_configuration(
        spark_config=conf,
        resource_settings=info.config.resource_settings,
    )
    # Add the backend configuration such as namespace, docker images names.
    self._backend_configuration(
        spark_config=conf, info=info, environment=environment
    )
    # Add the IO configuration for the inputs and the outputs
    self._io_configuration(
        spark_config=conf,
    )
    # Add any additional configuration given by the user.
    self._additional_configuration(spark_config=conf, settings=settings)
    info.force_write_logs()
    # Generate a spark-submit command given the configuration
    self._launch_spark_job(
        spark_config=conf,
        deploy_mode=settings.deploy_mode,
        entrypoint_command=entrypoint_command,
    )