Skip to content

Spark

zenml.integrations.spark special

The Spark integration module to enable distributed processing for steps.

SparkIntegration (Integration)

Definition of Spark integration for ZenML.

Source code in zenml/integrations/spark/__init__.py
class SparkIntegration(Integration):
    """Definition of Spark integration for ZenML."""

    NAME = SPARK
    REQUIREMENTS = ["pyspark==3.2.1"]

    @classmethod
    def activate(cls) -> None:
        """Activating the corresponding Spark materializers."""
        from zenml.integrations.spark import materializers  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Spark integration.

        Returns:
            The flavor wrapper for the step operator flavor
        """
        from zenml.integrations.spark.flavors import (
            KubernetesSparkStepOperatorFlavor,
        )

        return [KubernetesSparkStepOperatorFlavor]

activate() classmethod

Activating the corresponding Spark materializers.

Source code in zenml/integrations/spark/__init__.py
@classmethod
def activate(cls) -> None:
    """Activating the corresponding Spark materializers."""
    from zenml.integrations.spark import materializers  # noqa

flavors() classmethod

Declare the stack component flavors for the Spark integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

The flavor wrapper for the step operator flavor

Source code in zenml/integrations/spark/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Spark integration.

    Returns:
        The flavor wrapper for the step operator flavor
    """
    from zenml.integrations.spark.flavors import (
        KubernetesSparkStepOperatorFlavor,
    )

    return [KubernetesSparkStepOperatorFlavor]

flavors special

Spark integration flavors.

spark_on_kubernetes_step_operator_flavor

Spark on Kubernetes step operator flavor.

KubernetesSparkStepOperatorConfig (SparkStepOperatorConfig)

Config for the Kubernetes Spark step operator.

Attributes:

Name Type Description
namespace Optional[str]

the namespace under which the driver and executor pods will run.

service_account Optional[str]

the service account that will be used by various Spark components (to create and watch the pods).

Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorConfig(SparkStepOperatorConfig):
    """Config for the Kubernetes Spark step operator.

    Attributes:
        namespace: the namespace under which the driver and executor pods
            will run.
        service_account: the service account that will be used by various Spark
            components (to create and watch the pods).
    """

    namespace: Optional[str] = None
    service_account: Optional[str] = None
KubernetesSparkStepOperatorFlavor (SparkStepOperatorFlavor)

Flavor for the Kubernetes Spark step operator.

Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorFlavor(SparkStepOperatorFlavor):
    """Flavor for the Kubernetes Spark step operator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SPARK_KUBERNETES_STEP_OPERATOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/spark.png"

    @property
    def config_class(self) -> Type[KubernetesSparkStepOperatorConfig]:
        """Returns `KubernetesSparkStepOperatorConfig` config class.

        Returns:
                The config class.
        """
        return KubernetesSparkStepOperatorConfig

    @property
    def implementation_class(self) -> Type["KubernetesSparkStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators import (
            KubernetesSparkStepOperator,
        )

        return KubernetesSparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig] property readonly

Returns KubernetesSparkStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesSparkStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesSparkStepOperator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

spark_step_operator_flavor

Spark step operator flavor.

SparkStepOperatorConfig (BaseStepOperatorConfig, SparkStepOperatorSettings)

Spark step operator config.

Attributes:

Name Type Description
master str

is the master URL for the cluster. You might see different schemes for different cluster managers which are supported by Spark like Mesos, YARN, or Kubernetes. Within the context of this PR, the implementation supports Kubernetes as a cluster manager.

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorConfig(
    BaseStepOperatorConfig, SparkStepOperatorSettings
):
    """Spark step operator config.

    Attributes:
        master: is the master URL for the cluster. You might see different
            schemes for different cluster managers which are supported by Spark
            like Mesos, YARN, or Kubernetes. Within the context of this PR,
            the implementation supports Kubernetes as a cluster manager.
    """

    master: str
SparkStepOperatorFlavor (BaseStepOperatorFlavor)

Spark step operator flavor.

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorFlavor(BaseStepOperatorFlavor):
    """Spark step operator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return "spark"

    @property
    def config_class(self) -> Type[SparkStepOperatorConfig]:
        """Returns `SparkStepOperatorConfig` config class.

        Returns:
                The config class.
        """
        return SparkStepOperatorConfig

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def implementation_class(self) -> Type["SparkStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators.spark_step_operator import (
            SparkStepOperator,
        )

        return SparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig] property readonly

Returns SparkStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SparkStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SparkStepOperator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SparkStepOperatorSettings (BaseSettings)

Spark step operator settings.

Attributes:

Name Type Description
deploy_mode str

can either be 'cluster' (default) or 'client' and it decides where the driver node of the application will run.

submit_kwargs Optional[Dict[str, Any]]

is the JSON string of a dict, which will be used to define additional params if required (Spark has quite a lot of different parameters, so including them, all in the step operator was not implemented).

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorSettings(BaseSettings):
    """Spark step operator settings.

    Attributes:
        deploy_mode: can either be 'cluster' (default) or 'client' and it
            decides where the driver node of the application will run.
        submit_kwargs: is the JSON string of a dict, which will be used
            to define additional params if required (Spark has quite a
            lot of different parameters, so including them, all in the step
            operator was not implemented).
    """

    deploy_mode: str = "cluster"
    submit_kwargs: Optional[Dict[str, Any]] = None

materializers special

Spark Materializers.

spark_dataframe_materializer

Implementation of the Spark Dataframe Materializer.

SparkDataFrameMaterializer (BaseMaterializer)

Materializer to read/write Spark dataframes.

Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
class SparkDataFrameMaterializer(BaseMaterializer):
    """Materializer to read/write Spark dataframes."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (DataFrame,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    def load(self, data_type: Type[Any]) -> DataFrame:
        """Reads and returns a spark dataframe.

        Args:
            data_type: The type of the data to read.

        Returns:
            A loaded spark dataframe.
        """
        # Create the Spark session
        spark = SparkSession.builder.getOrCreate()

        # Read the data
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        return spark.read.parquet(path)

    def save(self, df: DataFrame) -> None:
        """Writes a spark dataframe.

        Args:
            df: A spark dataframe object.
        """
        # Write the dataframe to the artifact store
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        df.write.parquet(path)

    def extract_metadata(self, df: DataFrame) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `DataFrame` object.

        Args:
            df: The `DataFrame` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {
            "shape": (df.count(), len(df.columns)),
        }
extract_metadata(self, df)

Extract metadata from the given DataFrame object.

Parameters:

Name Type Description Default
df pyspark.sql.DataFrame

The DataFrame object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def extract_metadata(self, df: DataFrame) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `DataFrame` object.

    Args:
        df: The `DataFrame` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "shape": (df.count(), len(df.columns)),
    }
load(self, data_type)

Reads and returns a spark dataframe.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
pyspark.sql.DataFrame

A loaded spark dataframe.

Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def load(self, data_type: Type[Any]) -> DataFrame:
    """Reads and returns a spark dataframe.

    Args:
        data_type: The type of the data to read.

    Returns:
        A loaded spark dataframe.
    """
    # Create the Spark session
    spark = SparkSession.builder.getOrCreate()

    # Read the data
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    return spark.read.parquet(path)
save(self, df)

Writes a spark dataframe.

Parameters:

Name Type Description Default
df pyspark.sql.DataFrame

A spark dataframe object.

required
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def save(self, df: DataFrame) -> None:
    """Writes a spark dataframe.

    Args:
        df: A spark dataframe object.
    """
    # Write the dataframe to the artifact store
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    df.write.parquet(path)

spark_model_materializer

Implementation of the Spark Model Materializer.

SparkModelMaterializer (BaseMaterializer)

Materializer to read/write Spark models.

Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
class SparkModelMaterializer(BaseMaterializer):
    """Materializer to read/write Spark models."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        Transformer,
        Estimator,
        Model,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL

    def load(
        self, model_type: Type[Any]
    ) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
        """Reads and returns a Spark ML model.

        Args:
            model_type: The type of the model to read.

        Returns:
            A loaded spark model.
        """
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        return model_type.load(path)  # type: ignore[no-any-return]

    def save(
        self,
        model: Union[Transformer, Estimator, Model],  # type: ignore[type-arg]
    ) -> None:
        """Writes a spark model.

        Args:
            model: A spark model.
        """
        # Write the dataframe to the artifact store
        path = os.path.join(self.uri, DEFAULT_FILEPATH)
        model.save(path)  # type: ignore[union-attr]
load(self, model_type)

Reads and returns a Spark ML model.

Parameters:

Name Type Description Default
model_type Type[Any]

The type of the model to read.

required

Returns:

Type Description
Union[pyspark.ml.Transformer, pyspark.ml.Estimator, pyspark.ml.Model]

A loaded spark model.

Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def load(
    self, model_type: Type[Any]
) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
    """Reads and returns a Spark ML model.

    Args:
        model_type: The type of the model to read.

    Returns:
        A loaded spark model.
    """
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    return model_type.load(path)  # type: ignore[no-any-return]
save(self, model)

Writes a spark model.

Parameters:

Name Type Description Default
model Union[pyspark.ml.Transformer, pyspark.ml.Estimator, pyspark.ml.Model]

A spark model.

required
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def save(
    self,
    model: Union[Transformer, Estimator, Model],  # type: ignore[type-arg]
) -> None:
    """Writes a spark model.

    Args:
        model: A spark model.
    """
    # Write the dataframe to the artifact store
    path = os.path.join(self.uri, DEFAULT_FILEPATH)
    model.save(path)  # type: ignore[union-attr]

step_operators special

Spark Step Operators.

kubernetes_step_operator

Implementation of the Kubernetes Spark Step Operator.

KubernetesSparkStepOperator (SparkStepOperator)

Step operator which runs Steps with Spark on Kubernetes.

Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
class KubernetesSparkStepOperator(SparkStepOperator):
    """Step operator which runs Steps with Spark on Kubernetes."""

    @property
    def config(self) -> KubernetesSparkStepOperatorConfig:
        """Returns the `KubernetesSparkStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(KubernetesSparkStepOperatorConfig, self._config)

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote container
            registry and a remote artifact store.
        """

        def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the Spark "
                    "step operator."
                )

            container_registry = stack.container_registry
            assert container_registry is not None

            if container_registry.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "Spark step operator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    @property
    def application_path(self) -> Any:
        """Provides the application path in the corresponding docker image.

        Returns:
            The path to the application entrypoint within the docker image
        """
        return f"local://{DOCKER_IMAGE_WORKDIR}/{ENTRYPOINT_NAME}"

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        from zenml.config.build_configuration import BuildConfiguration

        builds = []
        extra_files = {ENTRYPOINT_NAME: LOCAL_ENTRYPOINT}
        for step_name, step in deployment.step_configurations.items():
            if step.config.step_operator == self.name:
                build = BuildConfiguration(
                    key=SPARK_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                    extra_files=extra_files,
                )
                builds.append(build)

        return builds

    def _backend_configuration(
        self,
        spark_config: SparkConf,
        info: "StepRunInfo",
        environment: Dict[str, str],
    ) -> None:
        """Configures Spark to run on Kubernetes.

        This method will build and push a docker image for the drivers and
        executors and adjust the config accordingly.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            info: Information about the step run.
            environment: Environment variables to set in the executor
                environment.
        """
        image_name = info.get_image(key=SPARK_DOCKER_IMAGE_KEY)

        # Adjust the spark configuration
        spark_config.set("spark.kubernetes.container.image", image_name)
        if self.config.namespace:
            spark_config.set(
                "spark.kubernetes.namespace",
                self.config.namespace,
            )
        if self.config.service_account:
            spark_config.set(
                "spark.kubernetes.authenticate.driver.serviceAccountName",
                self.config.service_account,
            )

        for key, value in environment.items():
            spark_config.set(f"spark.executorEnv.{key}", value)
application_path: Any property readonly

Provides the application path in the corresponding docker image.

Returns:

Type Description
Any

The path to the application entrypoint within the docker image

config: KubernetesSparkStepOperatorConfig property readonly

Returns the KubernetesSparkStepOperatorConfig config.

Returns:

Type Description
KubernetesSparkStepOperatorConfig

The configuration.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote container registry and a remote artifact store.

get_docker_builds(self, deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    from zenml.config.build_configuration import BuildConfiguration

    builds = []
    extra_files = {ENTRYPOINT_NAME: LOCAL_ENTRYPOINT}
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=SPARK_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
                extra_files=extra_files,
            )
            builds.append(build)

    return builds

spark_entrypoint_configuration

Spark step operator entrypoint configuration.

SparkEntrypointConfiguration (StepOperatorEntrypointConfiguration)

Entrypoint configuration for the Spark step operator.

Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
class SparkEntrypointConfiguration(StepOperatorEntrypointConfiguration):
    """Entrypoint configuration for the Spark step operator."""

    def run(self) -> None:
        """Runs the entrypoint configuration.

        This prepends the directory containing the source files to the python
        path so that spark can find them.
        """
        with source_utils.prepend_python_path(DOCKER_IMAGE_WORKDIR):
            super().run()
run(self)

Runs the entrypoint configuration.

This prepends the directory containing the source files to the python path so that spark can find them.

Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
def run(self) -> None:
    """Runs the entrypoint configuration.

    This prepends the directory containing the source files to the python
    path so that spark can find them.
    """
    with source_utils.prepend_python_path(DOCKER_IMAGE_WORKDIR):
        super().run()

spark_step_operator

Implementation of the Spark Step Operator.

SparkStepOperator (BaseStepOperator)

Base class for all Spark-related step operators.

Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
class SparkStepOperator(BaseStepOperator):
    """Base class for all Spark-related step operators."""

    @property
    def config(self) -> SparkStepOperatorConfig:
        """Returns the `SparkStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SparkStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Spark step operator.

        Returns:
            The settings class.
        """
        return SparkStepOperatorSettings

    @property
    def application_path(self) -> Optional[str]:
        """Optional method for providing the application path.

        This is especially critical when using 'spark-submit' as it defines the
        path (to the application in the environment where Spark is running)
        which is used within the command.

        For more information on how to set this property please check:

        https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management

        Returns:
            The path to the application entrypoint
        """
        return None

    def _resource_configuration(
        self,
        spark_config: SparkConf,
        resource_settings: "ResourceSettings",
    ) -> None:
        """Configures Spark to handle the resource settings.

        This should serve as the layer between our ResourceSettings
        and Spark's own ways of configuring its resources.

        Note: This is still work-in-progress. In the future, we would like to
        enable much more than executor cores and memory with a dedicated
        ResourceSettings object.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            resource_settings: the resource settings for this step
        """
        if resource_settings.cpu_count:
            spark_config.set(
                "spark.executor.cores",
                str(int(resource_settings.cpu_count)),
            )

        if resource_settings.memory:
            # TODO[LOW]: Fix the conversion of the memory unit with a new
            #   type of resource configuration.
            spark_config.set(
                "spark.executor.memory",
                resource_settings.memory.lower().strip("b"),
            )

    def _backend_configuration(
        self,
        spark_config: SparkConf,
        info: "StepRunInfo",
        environment: Dict[str, str],
    ) -> None:
        """Configures Spark to handle backends like YARN, Mesos or Kubernetes.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            info: Information about the step run.
            environment: Environment variables to set.
        """

    def _io_configuration(self, spark_config: SparkConf) -> None:
        """Configures Spark to handle different input/output sources.

        When you work with the Spark integration, you get materializers
        such as SparkDataFrameMaterializer, SparkModelMaterializer. However, in
        many cases, these materializer work only if the environment, where
        Spark is running, is configured according to the artifact store.

        Take s3 as an example. When you want to save a dataframe to an S3
        artifact store, you need to provide configuration parameters such as,
        '"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" to
        Spark. This method aims to provide these configuration parameters.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters

        Raises:
            RuntimeError: when the step operator is being used with an S3
                artifact store and the artifact store does not have the
                required authentication
        """
        # Get active artifact store
        client = Client()
        artifact_store = client.active_stack.artifact_store

        from zenml.integrations.s3 import S3_ARTIFACT_STORE_FLAVOR

        # If S3, preconfigure the spark session
        if artifact_store.flavor == S3_ARTIFACT_STORE_FLAVOR:
            (
                key,
                secret,
                _,
            ) = artifact_store._get_credentials()  # type:ignore[attr-defined]
            if key and secret:
                spark_config.setAll(
                    [
                        ("spark.hadoop.fs.s3a.fast.upload", "true"),
                        (
                            "spark.hadoop.fs.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3AFileSystem",
                        ),
                        (
                            "spark.hadoop.fs.AbstractFileSystem.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3A",
                        ),
                        (
                            "spark.hadoop.fs.s3a.aws.credentials.provider",
                            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
                        ),
                        ("spark.hadoop.fs.s3a.access.key", f"{key}"),
                        ("spark.hadoop.fs.s3a.secret.key", f"{secret}"),
                    ]
                )
            else:
                raise RuntimeError(
                    "When you use an Spark step operator with an S3 artifact "
                    "store, please make sure that your artifact store has"
                    "defined the required credentials namely the access key "
                    "and the secret access key."
                )
        else:
            logger.warning(
                "In most cases, the Spark step operator requires additional "
                "configuration based on the artifact store flavor you are "
                "using. That also means, that when you use this step operator "
                "with certain artifact store flavor, ZenML can take care of "
                "the pre-configuration. However, the artifact store flavor "
                f"'{artifact_store.flavor}' featured in this stack is not "
                f"known to this step operator and it might require additional "
                f"configuration."
            )

    def _additional_configuration(
        self, spark_config: SparkConf, settings: SparkStepOperatorSettings
    ) -> None:
        """Appends the user-defined configuration parameters.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            settings: Step operator settings for the current step run.
        """
        # Add the additional parameters
        if settings.submit_kwargs:
            for k, v in settings.submit_kwargs.items():
                spark_config.set(k, v)

    def _launch_spark_job(
        self,
        spark_config: SparkConf,
        deploy_mode: str,
        entrypoint_command: List[str],
    ) -> None:
        """Generates and executes a spark-submit command.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            deploy_mode: The spark deploy mode to use.
            entrypoint_command: The entrypoint command to run.

        Raises:
            RuntimeError: if the spark-submit fails
        """
        # Base spark-submit command
        command = [
            f"spark-submit "
            f"--master {self.config.master} "
            f"--deploy-mode {deploy_mode}"
        ]

        # Add the configuration parameters
        command += [f"--conf {c[0]}={c[1]}" for c in spark_config.getAll()]

        # Add the application path
        command.append(self.application_path)  # type: ignore[arg-type]

        # Update the default step operator command to use the spark entrypoint
        # configuration
        original_args = SparkEntrypointConfiguration._parse_arguments(
            entrypoint_command
        )
        command += SparkEntrypointConfiguration.get_entrypoint_arguments(
            **original_args
        )

        final_command = " ".join(command)

        # Execute the spark-submit
        process = subprocess.Popen(
            final_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
            shell=True,  # nosec
        )
        stdout, stderr = process.communicate()

        if process.returncode != 0:
            raise RuntimeError(stderr)
        print(stdout)

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Launches a step on Spark.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
        settings = cast(SparkStepOperatorSettings, self.get_settings(info))
        # Start off with an empty configuration
        conf = SparkConf()

        # Add the resource configuration such as cores, memory.
        self._resource_configuration(
            spark_config=conf,
            resource_settings=info.config.resource_settings,
        )

        # Add the backend configuration such as namespace, docker images names.
        self._backend_configuration(
            spark_config=conf, info=info, environment=environment
        )

        # Add the IO configuration for the inputs and the outputs
        self._io_configuration(
            spark_config=conf,
        )

        # Add any additional configuration given by the user.
        self._additional_configuration(spark_config=conf, settings=settings)

        info.force_write_logs()

        # Generate a spark-submit command given the configuration
        self._launch_spark_job(
            spark_config=conf,
            deploy_mode=settings.deploy_mode,
            entrypoint_command=entrypoint_command,
        )
application_path: Optional[str] property readonly

Optional method for providing the application path.

This is especially critical when using 'spark-submit' as it defines the path (to the application in the environment where Spark is running) which is used within the command.

For more information on how to set this property please check:

https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management

Returns:

Type Description
Optional[str]

The path to the application entrypoint

config: SparkStepOperatorConfig property readonly

Returns the SparkStepOperatorConfig config.

Returns:

Type Description
SparkStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Spark step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

launch(self, info, entrypoint_command, environment)

Launches a step on Spark.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Spark.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(SparkStepOperatorSettings, self.get_settings(info))
    # Start off with an empty configuration
    conf = SparkConf()

    # Add the resource configuration such as cores, memory.
    self._resource_configuration(
        spark_config=conf,
        resource_settings=info.config.resource_settings,
    )

    # Add the backend configuration such as namespace, docker images names.
    self._backend_configuration(
        spark_config=conf, info=info, environment=environment
    )

    # Add the IO configuration for the inputs and the outputs
    self._io_configuration(
        spark_config=conf,
    )

    # Add any additional configuration given by the user.
    self._additional_configuration(spark_config=conf, settings=settings)

    info.force_write_logs()

    # Generate a spark-submit command given the configuration
    self._launch_spark_job(
        spark_config=conf,
        deploy_mode=settings.deploy_mode,
        entrypoint_command=entrypoint_command,
    )