Skip to content

Tekton

zenml.integrations.tekton special

Initialization of the Tekton integration for ZenML.

The Tekton integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Tekton orchestrator with the CLI tool.

TektonIntegration (Integration)

Definition of Tekton Integration for ZenML.

Source code in zenml/integrations/tekton/__init__.py
class TektonIntegration(Integration):
    """Definition of Tekton Integration for ZenML."""

    NAME = TEKTON
    REQUIREMENTS = ["kfp-tekton==1.4.1"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Tekton integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.tekton.flavors import TektonOrchestratorFlavor

        return [TektonOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Tekton integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/tekton/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Tekton integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.tekton.flavors import TektonOrchestratorFlavor

    return [TektonOrchestratorFlavor]

flavors special

Tekton integration flavors.

tekton_orchestrator_flavor

Tekton orchestrator flavor.

TektonOrchestratorConfig (BaseOrchestratorConfig, TektonOrchestratorSettings) pydantic-model

Configuration for the Tekton orchestrator.

Attributes:

Name Type Description
kubernetes_context str

Name of a kubernetes context to run pipelines in.

kubernetes_namespace str

Name of the kubernetes namespace in which the pods that run the pipeline steps should be running.

local bool

If True, the orchestrator will assume it is connected to a local kubernetes cluster and will perform additional validations and operations to allow using the orchestrator in combination with other local stack components that store data in the local filesystem (i.e. it will mount the local stores directory into the pipeline containers).

skip_local_validations bool

If True, the local validations will be skipped.

Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, TektonOrchestratorSettings
):
    """Configuration for the Tekton orchestrator.

    Attributes:
        kubernetes_context: Name of a kubernetes context to run
            pipelines in.
        kubernetes_namespace: Name of the kubernetes namespace in which the
            pods that run the pipeline steps should be running.
        local: If `True`, the orchestrator will assume it is connected to a
            local kubernetes cluster and will perform additional validations and
            operations to allow using the orchestrator in combination with other
            local stack components that store data in the local filesystem
            (i.e. it will mount the local stores directory into the pipeline
            containers).
        skip_local_validations: If `True`, the local validations will be
            skipped.
    """

    kubernetes_context: str  # TODO: Potential setting
    kubernetes_namespace: str = "zenml"
    local: bool = False
    skip_local_validations: bool = False

    @root_validator(pre=True)
    def _validate_deprecated_attrs(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Pydantic root_validator for deprecated attributes.

        This root validator is used for backwards compatibility purposes. E.g.
        it handles attributes that are no longer available or that have become
        mandatory in the meantime.

        Args:
            values: Values passed to the object constructor

        Returns:
            Values passed to the object constructor

        """
        provisioning_attrs = [
            "tekton_ui_port",
            "skip_ui_daemon_provisioning",
        ]

        provisioning_attrs_used = [
            attr for attr in provisioning_attrs if attr in values
        ]

        msg_header = (
            "Automatically exposing the Tekton UI TCP port locally as part of "
            "the stack provisioning using `zenml stack up` has been "
            "removed in favor of methods better suited for this "
            "purpose, such as using an Ingress controller in the remote "
            "cluster. \n"
            "As a result, the following Kubernetes orchestrator configuration "
            "attributes have been deprecated: "
            f"{provisioning_attrs}.\n"
        )

        if provisioning_attrs_used:
            logger.warning(
                msg_header
                + "To get rid of this warning, you should remove the deprecated "
                "attributes from your orchestrator configuration (e.g. by "
                "using the `zenml orchestrator remove-attribute <attr-name>` "
                "CLI command)."
            )
            # remove deprecated attributes from values dict
            for attr in provisioning_attrs_used:
                del values[attr]

        return values

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return not self.local

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return self.local
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

TektonOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Tekton orchestrator.

Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Tekton orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return TEKTON_ORCHESTRATOR_FLAVOR

    @property
    def config_class(self) -> Type[TektonOrchestratorConfig]:
        """Returns `TektonOrchestratorConfig` config class.

        Returns:
                The config class.
        """
        return TektonOrchestratorConfig

    @property
    def implementation_class(self) -> Type["TektonOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.tekton.orchestrators import TektonOrchestrator

        return TektonOrchestrator
config_class: Type[zenml.integrations.tekton.flavors.tekton_orchestrator_flavor.TektonOrchestratorConfig] property readonly

Returns TektonOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.tekton.flavors.tekton_orchestrator_flavor.TektonOrchestratorConfig]

The config class.

implementation_class: Type[TektonOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[TektonOrchestrator]

Implementation class for this flavor.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

TektonOrchestratorSettings (BaseSettings) pydantic-model

Settings for the Tekton orchestrator.

Attributes:

Name Type Description
pod_settings Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings]

Pod settings to apply.

Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorSettings(BaseSettings):
    """Settings for the Tekton orchestrator.

    Attributes:
        pod_settings: Pod settings to apply.
    """

    pod_settings: Optional[KubernetesPodSettings] = None

orchestrators special

Initialization of the Tekton ZenML orchestrator.

tekton_orchestrator

Implementation of the Tekton orchestrator.

TektonOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines using Tekton.

Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
class TektonOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines using Tekton."""

    @property
    def config(self) -> TektonOrchestratorConfig:
        """Returns the `TektonOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(TektonOrchestratorConfig, self._config)

    @property
    def kubernetes_context(self) -> str:
        """Gets the kubernetes context associated with the orchestrator.

        Returns:
            The kubernetes context associated with the orchestrator.
        """
        return self.config.kubernetes_context

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Tekton orchestrator.

        Returns:
            The settings class.
        """
        return TektonOrchestratorSettings

    def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
        """Get the list of configured Kubernetes contexts and the active context.

        Returns:
            A tuple containing the list of configured Kubernetes contexts and
            the active context.
        """
        try:
            contexts, active_context = k8s_config.list_kube_config_contexts()
        except k8s_config.config_exception.ConfigException:
            return [], None

        context_names = [c["name"] for c in contexts]
        active_context_name = active_context["name"]
        return context_names, active_context_name

    @property
    def validator(self) -> Optional[StackValidator]:
        """Ensures a stack with only remote components and a container registry.

        Returns:
            A `StackValidator` instance.
        """

        def _validate(stack: "Stack") -> Tuple[bool, str]:
            container_registry = stack.container_registry

            # should not happen, because the stack validation takes care of
            # this, but just in case
            assert container_registry is not None

            contexts, _ = self.get_kubernetes_contexts()

            if self.config.kubernetes_context not in contexts:
                return False, (
                    f"Could not find a Kubernetes context named "
                    f"'{self.config.kubernetes_context}' in the local "
                    f"Kubernetes configuration. Please make sure that the "
                    f"Kubernetes cluster is running and that the kubeconfig "
                    f"file is configured correctly. To list all configured "
                    f"contexts, run:\n\n"
                    f"  `kubectl config get-contexts`\n"
                )

            silence_local_validations_msg = (
                f"To silence this warning, set the "
                f"`skip_local_validations` attribute to True in the "
                f"orchestrator configuration by running:\n\n"
                f"  'zenml orchestrator update {self.name} "
                f"--skip_local_validations=True'\n"
            )

            if (
                not self.config.skip_local_validations
                and not self.config.is_local
            ):

                # if the orchestrator is not running in a local k3d cluster,
                # we cannot have any other local components in our stack,
                # because we cannot mount the local path into the container.
                # This may result in problems when running the pipeline, because
                # the local components will not be available inside the
                # Tekton containers.

                # go through all stack components and identify those that
                # advertise a local path where they persist information that
                # they need to be available when running pipelines.
                for stack_comp in stack.components.values():
                    local_path = stack_comp.local_path
                    if not local_path:
                        continue
                    return False, (
                        f"The Tekton orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster designated "
                        f"by the '{self.kubernetes_context}' configuration "
                        f"context, but the '{stack_comp.name}' "
                        f"{stack_comp.type.value} is a local stack component "
                        f"and will not be available in the Tekton pipeline "
                        f"step.\nPlease ensure that you always use non-local "
                        f"stack components with a remote Tekton orchestrator, "
                        f"otherwise you may run into pipeline execution "
                        f"problems. You should use a flavor of "
                        f"{stack_comp.type.value} other than "
                        f"'{stack_comp.flavor}'.\n"
                        + silence_local_validations_msg
                    )

                # if the orchestrator is remote, the container registry must
                # also be remote.
                if container_registry.config.is_local:
                    return False, (
                        f"The Tekton orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster designated "
                        f"by the '{self.kubernetes_context}' configuration "
                        f"context, but the '{container_registry.name}' "
                        f"container registry URI "
                        f"'{container_registry.config.uri}' "
                        f"points to a local container registry. Please ensure "
                        f"that you always use non-local stack components with "
                        f"a remote Tekton orchestrator, otherwise you will "
                        f"run into problems. You should use a flavor of "
                        f"container registry other than "
                        f"'{container_registry.flavor}'.\n"
                        + silence_local_validations_msg
                    )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate,
        )

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        repo_digest = docker_image_builder.build_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)

    def _configure_container_op(
        self,
        container_op: dsl.ContainerOp,
    ) -> None:
        """Makes changes in place to the configuration of the container op.

        Configures persistent mounted volumes for each stack component that
        writes to a local path.

        Args:
            container_op: The Tekton container operation to configure.
        """
        volumes: Dict[str, k8s_client.V1Volume] = {}

        stack = Client().active_stack

        if self.config.is_local:
            stack.check_local_paths()

            local_stores_path = GlobalConfiguration().local_stores_path

            host_path = k8s_client.V1HostPathVolumeSource(
                path=local_stores_path, type="Directory"
            )

            volumes[local_stores_path] = k8s_client.V1Volume(
                name="local-stores",
                host_path=host_path,
            )
            logger.debug(
                "Adding host path volume for the local ZenML stores (path: %s) "
                "in Tekton pipelines container.",
                local_stores_path,
            )

            if sys.platform == "win32":
                # File permissions are not checked on Windows. This if clause
                # prevents mypy from complaining about unused 'type: ignore'
                # statements
                pass
            else:
                # Run KFP containers in the context of the local UID/GID
                # to ensure that the artifact and metadata stores can be shared
                # with the local pipeline runs.
                container_op.container.security_context = (
                    k8s_client.V1SecurityContext(
                        run_as_user=os.getuid(),
                        run_as_group=os.getgid(),
                    )
                )
                logger.debug(
                    "Setting security context UID and GID to local user/group "
                    "in Tekton pipelines container."
                )

            container_op.container.add_env_variable(
                k8s_client.V1EnvVar(
                    name=ENV_ZENML_LOCAL_STORES_PATH,
                    value=local_stores_path,
                )
            )

        container_op.add_pvolumes(volumes)

    @staticmethod
    def _configure_container_resources(
        container_op: dsl.ContainerOp,
        resource_settings: "ResourceSettings",
    ) -> None:
        """Adds resource requirements to the container.

        Args:
            container_op: The container operation to configure.
            resource_settings: The resource settings to use for this
                container.
        """
        if resource_settings.cpu_count is not None:
            container_op = container_op.set_cpu_limit(
                str(resource_settings.cpu_count)
            )

        if resource_settings.gpu_count is not None:
            container_op = container_op.set_gpu_limit(
                resource_settings.gpu_count
            )

        if resource_settings.memory is not None:
            memory_limit = resource_settings.memory[:-1]
            container_op = container_op.set_memory_limit(memory_limit)

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Runs the pipeline on Tekton.

        This function first compiles the ZenML pipeline into a Tekton yaml
        and then applies this configuration to run the pipeline.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Raises:
            RuntimeError: If you try to run the pipelines in a notebook environment.
        """
        # First check whether the code running in a notebook
        if Environment.in_notebook():
            raise RuntimeError(
                "The Tekton orchestrator cannot run pipelines in a notebook "
                "environment. The reason is that it is non-trivial to create "
                "a Docker image of a notebook. Please consider refactoring "
                "your notebook cells into separate scripts in a Python module "
                "and run the code outside of a notebook when using this "
                "orchestrator."
            )

        assert stack.container_registry
        image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]

        orchestrator_run_name = get_orchestrator_run_name(
            pipeline_name=deployment.pipeline.name
        )

        def _construct_kfp_pipeline() -> None:
            """Create a container_op for each step.

            This should contain the name of the docker image and configures the
            entrypoint of the docker image to run the step.

            Additionally, this gives each container_op information about its
            direct downstream steps.
            """
            # Dictionary of container_ops index by the associated step name
            step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

            for step_name, step in deployment.steps.items():
                command = StepEntrypointConfiguration.get_entrypoint_command()
                arguments = (
                    StepEntrypointConfiguration.get_entrypoint_arguments(
                        step_name=step_name,
                    )
                )

                container_op = dsl.ContainerOp(
                    name=step.config.name,
                    image=image_name,
                    command=command,
                    arguments=arguments,
                )

                settings = cast(
                    TektonOrchestratorSettings, self.get_settings(step)
                )
                self._configure_container_op(
                    container_op=container_op,
                )

                if settings.pod_settings:
                    apply_pod_settings(
                        container_op=container_op,
                        settings=settings.pod_settings,
                    )

                container_op.container.add_env_variable(
                    k8s_client.V1EnvVar(
                        name=ENV_ZENML_TEKTON_RUN_ID,
                        value="$(context.pipelineRun.name)",
                    )
                )

                if self.requires_resources_in_orchestration_environment(step):
                    self._configure_container_resources(
                        container_op=container_op,
                        resource_settings=step.config.resource_settings,
                    )

                # Find the upstream container ops of the current step and
                # configure the current container op to run after them
                for upstream_step_name in step.spec.upstream_steps:
                    upstream_container_op = step_name_to_container_op[
                        upstream_step_name
                    ]
                    container_op.after(upstream_container_op)

                # Update dictionary of container ops with the current one
                step_name_to_container_op[step.config.name] = container_op

        # Get a filepath to use to save the finished yaml to
        fileio.makedirs(self.pipeline_directory)
        pipeline_file_path = os.path.join(
            self.pipeline_directory, f"{orchestrator_run_name}.yaml"
        )

        # Set the run name, which Tekton reads from this attribute of the
        # pipeline function
        setattr(
            _construct_kfp_pipeline,
            "_component_human_name",
            orchestrator_run_name,
        )
        pipeline_config = TektonPipelineConf()
        pipeline_config.add_pipeline_label(
            "pipelines.kubeflow.org/cache_enabled", "false"
        )
        TektonCompiler().compile(
            _construct_kfp_pipeline,
            pipeline_file_path,
            tekton_pipeline_conf=pipeline_config,
        )
        logger.info(
            "Writing Tekton workflow definition to `%s`.", pipeline_file_path
        )

        if deployment.schedule:
            logger.warning(
                "The Tekton Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        logger.info(
            "Running Tekton pipeline in kubernetes context '%s' and namespace "
            "'%s'.",
            self.config.kubernetes_context,
            self.config.kubernetes_namespace,
        )
        try:
            subprocess.check_call(
                [
                    "kubectl",
                    "--context",
                    self.config.kubernetes_context,
                    "--namespace",
                    self.config.kubernetes_namespace,
                    "apply",
                    "-f",
                    pipeline_file_path,
                ]
            )
        except subprocess.CalledProcessError as e:
            raise RuntimeError(
                f"Failed to upload Tekton pipeline: {str(e)}. "
                f"Please make sure your kubernetes config is present and the "
                f"{self.config.kubernetes_context} kubernetes context is "
                f"configured correctly.",
            )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_TEKTON_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_TEKTON_RUN_ID}."
            )

    @property
    def root_directory(self) -> str:
        """Returns path to the root directory for all files concerning this orchestrator.

        Returns:
            Path to the root directory.
        """
        return os.path.join(
            io_utils.get_global_config_directory(),
            "tekton",
            str(self.id),
        )

    @property
    def pipeline_directory(self) -> str:
        """Path to a directory in which the Tekton pipeline files are stored.

        Returns:
            Path to the pipeline directory.
        """
        return os.path.join(self.root_directory, "pipelines")

    @property
    def _pid_file_path(self) -> str:
        """Returns path to the daemon PID file.

        Returns:
            Path to the daemon PID file.
        """
        return os.path.join(self.root_directory, "tekton_daemon.pid")

    @property
    def log_file(self) -> str:
        """Path of the daemon log file.

        Returns:
            Path of the daemon log file.
        """
        return os.path.join(self.root_directory, "tekton_daemon.log")
config: TektonOrchestratorConfig property readonly

Returns the TektonOrchestratorConfig config.

Returns:

Type Description
TektonOrchestratorConfig

The configuration.

kubernetes_context: str property readonly

Gets the kubernetes context associated with the orchestrator.

Returns:

Type Description
str

The kubernetes context associated with the orchestrator.

log_file: str property readonly

Path of the daemon log file.

Returns:

Type Description
str

Path of the daemon log file.

pipeline_directory: str property readonly

Path to a directory in which the Tekton pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property readonly

Returns path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Tekton orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Ensures a stack with only remote components and a container registry.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_kubernetes_contexts(self)

Get the list of configured Kubernetes contexts and the active context.

Returns:

Type Description
Tuple[List[str], Optional[str]]

A tuple containing the list of configured Kubernetes contexts and the active context.

Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
    """Get the list of configured Kubernetes contexts and the active context.

    Returns:
        A tuple containing the list of configured Kubernetes contexts and
        the active context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException:
        return [], None

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_TEKTON_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_TEKTON_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack)

Runs the pipeline on Tekton.

This function first compiles the ZenML pipeline into a Tekton yaml and then applies this configuration to run the pipeline.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Exceptions:

Type Description
RuntimeError

If you try to run the pipelines in a notebook environment.

Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Runs the pipeline on Tekton.

    This function first compiles the ZenML pipeline into a Tekton yaml
    and then applies this configuration to run the pipeline.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Raises:
        RuntimeError: If you try to run the pipelines in a notebook environment.
    """
    # First check whether the code running in a notebook
    if Environment.in_notebook():
        raise RuntimeError(
            "The Tekton orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )

    assert stack.container_registry
    image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]

    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline.name
    )

    def _construct_kfp_pipeline() -> None:
        """Create a container_op for each step.

        This should contain the name of the docker image and configures the
        entrypoint of the docker image to run the step.

        Additionally, this gives each container_op information about its
        direct downstream steps.
        """
        # Dictionary of container_ops index by the associated step name
        step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

        for step_name, step in deployment.steps.items():
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                )
            )

            container_op = dsl.ContainerOp(
                name=step.config.name,
                image=image_name,
                command=command,
                arguments=arguments,
            )

            settings = cast(
                TektonOrchestratorSettings, self.get_settings(step)
            )
            self._configure_container_op(
                container_op=container_op,
            )

            if settings.pod_settings:
                apply_pod_settings(
                    container_op=container_op,
                    settings=settings.pod_settings,
                )

            container_op.container.add_env_variable(
                k8s_client.V1EnvVar(
                    name=ENV_ZENML_TEKTON_RUN_ID,
                    value="$(context.pipelineRun.name)",
                )
            )

            if self.requires_resources_in_orchestration_environment(step):
                self._configure_container_resources(
                    container_op=container_op,
                    resource_settings=step.config.resource_settings,
                )

            # Find the upstream container ops of the current step and
            # configure the current container op to run after them
            for upstream_step_name in step.spec.upstream_steps:
                upstream_container_op = step_name_to_container_op[
                    upstream_step_name
                ]
                container_op.after(upstream_container_op)

            # Update dictionary of container ops with the current one
            step_name_to_container_op[step.config.name] = container_op

    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )

    # Set the run name, which Tekton reads from this attribute of the
    # pipeline function
    setattr(
        _construct_kfp_pipeline,
        "_component_human_name",
        orchestrator_run_name,
    )
    pipeline_config = TektonPipelineConf()
    pipeline_config.add_pipeline_label(
        "pipelines.kubeflow.org/cache_enabled", "false"
    )
    TektonCompiler().compile(
        _construct_kfp_pipeline,
        pipeline_file_path,
        tekton_pipeline_conf=pipeline_config,
    )
    logger.info(
        "Writing Tekton workflow definition to `%s`.", pipeline_file_path
    )

    if deployment.schedule:
        logger.warning(
            "The Tekton Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    logger.info(
        "Running Tekton pipeline in kubernetes context '%s' and namespace "
        "'%s'.",
        self.config.kubernetes_context,
        self.config.kubernetes_namespace,
    )
    try:
        subprocess.check_call(
            [
                "kubectl",
                "--context",
                self.config.kubernetes_context,
                "--namespace",
                self.config.kubernetes_namespace,
                "apply",
                "-f",
                pipeline_file_path,
            ]
        )
    except subprocess.CalledProcessError as e:
        raise RuntimeError(
            f"Failed to upload Tekton pipeline: {str(e)}. "
            f"Please make sure your kubernetes config is present and the "
            f"{self.config.kubernetes_context} kubernetes context is "
            f"configured correctly.",
        )
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    docker_image_builder = PipelineDockerImageBuilder()
    repo_digest = docker_image_builder.build_docker_image(
        deployment=deployment, stack=stack
    )
    deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)