Skip to content

Pipelines

zenml.pipelines special

A ZenML pipeline is a sequence of tasks that execute in a specific order and yield artifacts. The artifacts are stored within the artifact store and indexed via the metadata store. Each individual task within a pipeline is known as a step. The standard pipelines within ZenML are designed to have easy interfaces to add pre-decided steps, with the order also pre-decided. Other sorts of pipelines can be created as well from scratch, building on the BasePipeline class.

Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.

base_pipeline

BasePipeline

Abstract base class for all ZenML pipelines.

Attributes:

Name Type Description
name

The name of this pipeline.

enable_cache

A boolean indicating if caching is enabled for this pipeline.

requirements_file

Optional path to a pip requirements file that contains all requirements to run the pipeline.

required_integrations

Optional set of integrations that need to be installed for this pipeline to run.

Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
    """Abstract base class for all ZenML pipelines.

    Attributes:
        name: The name of this pipeline.
        enable_cache: A boolean indicating if caching is enabled for this
            pipeline.
        requirements_file: Optional path to a pip requirements file that
            contains all requirements to run the pipeline.
        required_integrations: Optional set of integrations that need to be
            installed for this pipeline to run.
    """

    STEP_SPEC: ClassVar[Dict[str, Any]] = None  # type: ignore[assignment]

    INSTANCE_CONFIGURATION: Dict[Text, Any] = {}

    def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
        kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
        self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
        self.required_integrations = kwargs.pop(PARAM_REQUIRED_INTEGRATIONS, ())
        self.requirements_file = kwargs.pop(PARAM_REQUIREMENTS_FILE, None)
        self.dockerignore_file = kwargs.pop(PARAM_DOCKERIGNORE_FILE, None)
        self.secrets = kwargs.pop(PARAM_SECRETS, None)

        self.name = self.__class__.__name__
        logger.info("Creating run for pipeline: `%s`", self.name)
        logger.info(
            f'Cache {"enabled" if self.enable_cache else "disabled"} for '
            f"pipeline `{self.name}`"
        )

        self.__steps: Dict[str, BaseStep] = {}
        self._verify_arguments(*args, **kwargs)

    def _verify_arguments(self, *steps: BaseStep, **kw_steps: BaseStep) -> None:
        """Verifies the initialization args and kwargs of this pipeline.

        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline. If
        all arguments are correct, saves the steps to `self.__steps`.

        Args:
            *steps: The args passed to the init method of this pipeline.
            **kw_steps: The kwargs passed to the init method of this pipeline.

        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        """
        input_step_keys = list(self.STEP_SPEC.keys())
        if len(steps) > len(input_step_keys):
            raise PipelineInterfaceError(
                f"Too many input steps for pipeline '{self.name}'. "
                f"This pipeline expects {len(input_step_keys)} step(s) "
                f"but got {len(steps) + len(kw_steps)}."
            )

        combined_steps = {}
        step_classes: Dict[Type[BaseStep], str] = {}

        for i, step in enumerate(steps):
            step_class = type(step)
            key = input_step_keys[i]

            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for positional "
                    f"argument {i} of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            if step_class in step_classes:
                previous_key = step_classes[step_class]
                raise PipelineInterfaceError(
                    f"Found multiple step objects of the same class "
                    f"(`{step_class}`) for arguments '{previous_key}' and "
                    f"'{key}' in pipeline '{self.name}'. Only one step object "
                    f"per class is allowed inside a ZenML pipeline."
                )

            step.pipeline_parameter_name = key
            combined_steps[key] = step
            step_classes[step_class] = key

        for key, step in kw_steps.items():
            step_class = type(step)

            if key in combined_steps:
                # a step for this key was already set by
                # the positional input steps
                raise PipelineInterfaceError(
                    f"Unexpected keyword argument '{key}' for pipeline "
                    f"'{self.name}'. A step for this key was "
                    f"already passed as a positional argument."
                )

            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            if step_class in step_classes:
                previous_key = step_classes[step_class]
                raise PipelineInterfaceError(
                    f"Found multiple step objects of the same class "
                    f"(`{step_class}`) for arguments '{previous_key}' and "
                    f"'{key}' in pipeline '{self.name}'. Only one step object "
                    f"per class is allowed inside a ZenML pipeline."
                )

            step.pipeline_parameter_name = key
            combined_steps[key] = step
            step_classes[step_class] = key

        # check if there are any missing or unexpected steps
        expected_steps = set(self.STEP_SPEC.keys())
        actual_steps = set(combined_steps.keys())
        missing_steps = expected_steps - actual_steps
        unexpected_steps = actual_steps - expected_steps

        if missing_steps:
            raise PipelineInterfaceError(
                f"Missing input step(s) for pipeline "
                f"'{self.name}': {missing_steps}."
            )

        if unexpected_steps:
            raise PipelineInterfaceError(
                f"Unexpected input step(s) for pipeline "
                f"'{self.name}': {unexpected_steps}. This pipeline "
                f"only requires the following steps: {expected_steps}."
            )

        self.__steps = combined_steps

    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Function that connects inputs and outputs of the pipeline steps."""
        raise NotImplementedError

    @property
    def requirements(self) -> Set[str]:
        """Set of python requirements of this pipeline.

        This property is a combination of the requirements of
        - required integrations for this pipeline
        - the `requirements_file` specified for this pipeline
        """
        requirements = set()

        for integration_name in self.required_integrations:
            try:
                integration_requirements = (
                    integration_registry.select_integration_requirements(
                        integration_name
                    )
                )
                requirements.update(integration_requirements)
            except KeyError as e:
                raise KeyError(
                    f"Unable to find requirements for integration "
                    f"'{integration_name}'."
                ) from e

        if self.requirements_file and fileio.file_exists(
            self.requirements_file
        ):
            with fileio.open(self.requirements_file, "r") as f:
                requirements.update(
                    {
                        requirement.strip()
                        for requirement in f.read().split("\n")
                    }
                )

        return requirements

    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns a dictionary of pipeline steps."""
        return self.__steps

    @steps.setter
    def steps(self, steps: Dict[str, BaseStep]) -> NoReturn:
        """Setting the steps property is not allowed. This method always
        raises a PipelineInterfaceError.
        """
        raise PipelineInterfaceError("Cannot set steps manually!")

    def validate_stack(self, stack: "Stack") -> None:
        """Validates if a stack is able to run this pipeline."""
        available_step_operators = (
            {stack.step_operator.name} if stack.step_operator else set()
        )

        for step in self.steps.values():
            if (
                step.custom_step_operator
                and step.custom_step_operator not in available_step_operators
            ):
                raise StackValidationError(
                    f"Step '{step.name}' requires custom step operator "
                    f"'{step.custom_step_operator}' which is not configured in "
                    f"the active stack. Available step operators: "
                    f"{available_step_operators}."
                )

    def _reset_step_flags(self) -> None:
        """Reset the _has_been_called flag at the beginning of a pipeline run,
        to make sure a pipeline instance can be called more than once."""
        for step in self.steps.values():
            step._has_been_called = False

    def run(
        self,
        *,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        **additional_parameters: Any,
    ) -> Any:
        """Runs the pipeline on the active stack of the current repository.

        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule of the pipeline.
        """
        if SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls inside docker containers which should only
            # run a single step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return

        # Activating the built-in integrations through lazy loading
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        # Path of the file where pipeline.run() was called. This is needed by
        # the airflow orchestrator so it knows which file to copy into the DAG
        # directory
        dag_filepath = fileio.resolve_relative_path(
            inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr] # noqa
        )
        runtime_configuration = RuntimeConfiguration(
            run_name=run_name,
            dag_filepath=dag_filepath,
            schedule=schedule,
            **additional_parameters,
        )
        stack = Repository().active_stack

        stack_metadata = {
            component_type.value: component.flavor.value
            for component_type, component in stack.components.items()
        }
        track_event(
            event=AnalyticsEvent.RUN_PIPELINE,
            metadata={
                "store_type": Repository().active_profile.store_type.value,
                **stack_metadata,
                "total_steps": len(self.steps),
                "schedule": bool(schedule),
            },
        )

        self._reset_step_flags()
        self.validate_stack(stack)

        return stack.deploy_pipeline(
            self, runtime_configuration=runtime_configuration
        )

    def with_config(
        self: T, config_file: str, overwrite_step_parameters: bool = False
    ) -> T:
        """Configures this pipeline using a yaml file.

        Args:
            config_file: Path to a yaml file which contains configuration
                options for running this pipeline. See
                https://docs.zenml.io/features/pipeline-configuration#setting-step-parameters-using-a-config-file
                for details regarding the specification of this file.
            overwrite_step_parameters: If set to `True`, values from the
                configuration file will overwrite configuration parameters
                passed in code.

        Returns:
            The pipeline object that this method was called on.
        """
        config_yaml = yaml_utils.read_yaml(config_file)

        if PipelineConfigurationKeys.STEPS in config_yaml:
            self._read_config_steps(
                config_yaml[PipelineConfigurationKeys.STEPS],
                overwrite=overwrite_step_parameters,
            )

        return self

    def _read_config_steps(
        self, steps: Dict[str, Dict[str, Any]], overwrite: bool = False
    ) -> None:
        """Reads and sets step parameters from a config file.

        Args:
            steps: Maps step names to dicts of parameter names and values.
            overwrite: If `True`, overwrite previously set step parameters.
        """
        for step_name, step_dict in steps.items():
            StepConfigurationKeys.key_check(step_dict)

            if step_name not in self.__steps:
                raise PipelineConfigurationError(
                    f"Found '{step_name}' step in configuration yaml but it "
                    f"doesn't exist in the pipeline steps "
                    f"{list(self.__steps.keys())}."
                )

            step = self.__steps[step_name]
            step_parameters = (
                step.CONFIG_CLASS.__fields__.keys() if step.CONFIG_CLASS else {}
            )
            parameters = step_dict.get(StepConfigurationKeys.PARAMETERS_, {})
            for parameter, value in parameters.items():
                if parameter not in step_parameters:
                    raise PipelineConfigurationError(
                        f"Found parameter '{parameter}' for '{step_name}' step "
                        f"in configuration yaml but it doesn't exist in the "
                        f"configuration class `{step.CONFIG_CLASS}`. Available "
                        f"parameters for this step: "
                        f"{list(step_parameters)}."
                    )

                previous_value = step.PARAM_SPEC.get(parameter, None)

                if overwrite:
                    step.PARAM_SPEC[parameter] = value
                else:
                    step.PARAM_SPEC.setdefault(parameter, value)

                if overwrite or not previous_value:
                    logger.debug(
                        "Setting parameter %s=%s for step '%s'.",
                        parameter,
                        value,
                        step_name,
                    )
                if previous_value and not overwrite:
                    logger.warning(
                        "Parameter '%s' from configuration yaml will NOT be "
                        "set as a configuration object was given when "
                        "creating the step. Set `overwrite_step_parameters="
                        "True` when setting the configuration yaml to always "
                        "use the options specified in the yaml file.",
                        parameter,
                    )
requirements: Set[str] property readonly

Set of python requirements of this pipeline.

This property is a combination of the requirements of - required integrations for this pipeline - the requirements_file specified for this pipeline

steps: Dict[str, zenml.steps.base_step.BaseStep] property writable

Returns a dictionary of pipeline steps.

connect(self, *args, **kwargs)

Function that connects inputs and outputs of the pipeline steps.

Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Function that connects inputs and outputs of the pipeline steps."""
    raise NotImplementedError
run(self, *, run_name=None, schedule=None, **additional_parameters)

Runs the pipeline on the active stack of the current repository.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[zenml.pipelines.schedule.Schedule]

Optional schedule of the pipeline.

None
Source code in zenml/pipelines/base_pipeline.py
def run(
    self,
    *,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    **additional_parameters: Any,
) -> Any:
    """Runs the pipeline on the active stack of the current repository.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule of the pipeline.
    """
    if SHOULD_PREVENT_PIPELINE_EXECUTION:
        # An environment variable was set to stop the execution of
        # pipelines. This is done to prevent execution of module-level
        # pipeline.run() calls inside docker containers which should only
        # run a single step.
        logger.info(
            "Preventing execution of pipeline '%s'. If this is not "
            "intended behavior, make sure to unset the environment "
            "variable '%s'.",
            self.name,
            ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
        )
        return

    # Activating the built-in integrations through lazy loading
    from zenml.integrations.registry import integration_registry

    integration_registry.activate_integrations()

    # Path of the file where pipeline.run() was called. This is needed by
    # the airflow orchestrator so it knows which file to copy into the DAG
    # directory
    dag_filepath = fileio.resolve_relative_path(
        inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr] # noqa
    )
    runtime_configuration = RuntimeConfiguration(
        run_name=run_name,
        dag_filepath=dag_filepath,
        schedule=schedule,
        **additional_parameters,
    )
    stack = Repository().active_stack

    stack_metadata = {
        component_type.value: component.flavor.value
        for component_type, component in stack.components.items()
    }
    track_event(
        event=AnalyticsEvent.RUN_PIPELINE,
        metadata={
            "store_type": Repository().active_profile.store_type.value,
            **stack_metadata,
            "total_steps": len(self.steps),
            "schedule": bool(schedule),
        },
    )

    self._reset_step_flags()
    self.validate_stack(stack)

    return stack.deploy_pipeline(
        self, runtime_configuration=runtime_configuration
    )
validate_stack(self, stack)

Validates if a stack is able to run this pipeline.

Source code in zenml/pipelines/base_pipeline.py
def validate_stack(self, stack: "Stack") -> None:
    """Validates if a stack is able to run this pipeline."""
    available_step_operators = (
        {stack.step_operator.name} if stack.step_operator else set()
    )

    for step in self.steps.values():
        if (
            step.custom_step_operator
            and step.custom_step_operator not in available_step_operators
        ):
            raise StackValidationError(
                f"Step '{step.name}' requires custom step operator "
                f"'{step.custom_step_operator}' which is not configured in "
                f"the active stack. Available step operators: "
                f"{available_step_operators}."
            )
with_config(self, config_file, overwrite_step_parameters=False)

Configures this pipeline using a yaml file.

Parameters:

Name Type Description Default
config_file str

Path to a yaml file which contains configuration options for running this pipeline. See https://docs.zenml.io/features/pipeline-configuration#setting-step-parameters-using-a-config-file for details regarding the specification of this file.

required
overwrite_step_parameters bool

If set to True, values from the configuration file will overwrite configuration parameters passed in code.

False

Returns:

Type Description
~T

The pipeline object that this method was called on.

Source code in zenml/pipelines/base_pipeline.py
def with_config(
    self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
    """Configures this pipeline using a yaml file.

    Args:
        config_file: Path to a yaml file which contains configuration
            options for running this pipeline. See
            https://docs.zenml.io/features/pipeline-configuration#setting-step-parameters-using-a-config-file
            for details regarding the specification of this file.
        overwrite_step_parameters: If set to `True`, values from the
            configuration file will overwrite configuration parameters
            passed in code.

    Returns:
        The pipeline object that this method was called on.
    """
    config_yaml = yaml_utils.read_yaml(config_file)

    if PipelineConfigurationKeys.STEPS in config_yaml:
        self._read_config_steps(
            config_yaml[PipelineConfigurationKeys.STEPS],
            overwrite=overwrite_step_parameters,
        )

    return self

BasePipelineMeta (type)

Pipeline Metaclass responsible for validating the pipeline definition.

Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
    """Pipeline Metaclass responsible for validating the pipeline definition."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BasePipelineMeta":
        """Saves argument names for later verification purposes"""
        cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

        cls.STEP_SPEC = {}

        connect_spec = inspect.getfullargspec(
            inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
        )
        connect_args = connect_spec.args

        if connect_args and connect_args[0] == "self":
            connect_args.pop(0)

        for arg in connect_args:
            arg_type = connect_spec.annotations.get(arg, None)
            cls.STEP_SPEC.update({arg: arg_type})
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Saves argument names for later verification purposes

Source code in zenml/pipelines/base_pipeline.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
    """Saves argument names for later verification purposes"""
    cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

    cls.STEP_SPEC = {}

    connect_spec = inspect.getfullargspec(
        inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
    )
    connect_args = connect_spec.args

    if connect_args and connect_args[0] == "self":
        connect_args.pop(0)

    for arg in connect_args:
        arg_type = connect_spec.annotations.get(arg, None)
        cls.STEP_SPEC.update({arg: arg_type})
    return cls

builtin_pipelines special

training_pipeline

TrainingPipeline (BasePipeline)

Class for the classic training pipeline implementation

Source code in zenml/pipelines/builtin_pipelines/training_pipeline.py
class TrainingPipeline(BasePipeline):
    """Class for the classic training pipeline implementation"""

    def connect(  # type: ignore[override]
        self,
        datasource: step_interfaces.BaseDatasourceStep,
        splitter: step_interfaces.BaseSplitStep,
        analyzer: step_interfaces.BaseAnalyzerStep,
        preprocessor: step_interfaces.BasePreprocessorStep,
        trainer: step_interfaces.BaseTrainerStep,
        evaluator: step_interfaces.BaseEvaluatorStep,
    ) -> None:
        """Main connect method for the standard training pipelines

        Args:
            datasource: the step responsible for the data ingestion
            splitter: the step responsible for splitting the dataset into
                train, test, val
            analyzer: the step responsible for extracting the statistics and
                the schema
            preprocessor: the step responsible for preprocessing the data
            trainer: the step responsible for training a model
            evaluator: the step responsible for computing the evaluation of
                the trained model
        """
        # Ingesting the datasource
        dataset = datasource()

        # Splitting the data
        train, test, validation = splitter(dataset=dataset)  # type:ignore

        # Analyzing the train dataset
        statistics, schema = analyzer(dataset=train)  # type:ignore

        # Preprocessing the splits
        train_t, test_t, validation_t = preprocessor(  # type:ignore
            train_dataset=train,
            test_dataset=test,
            validation_dataset=validation,
            statistics=statistics,
            schema=schema,
        )

        # Training the model
        model = trainer(train_dataset=train_t, validation_dataset=validation_t)

        # Evaluating the trained model
        evaluator(model=model, dataset=test_t)  # type:ignore
connect(self, datasource, splitter, analyzer, preprocessor, trainer, evaluator)

Main connect method for the standard training pipelines

Parameters:

Name Type Description Default
datasource BaseDatasourceStep

the step responsible for the data ingestion

required
splitter BaseSplitStep

the step responsible for splitting the dataset into train, test, val

required
analyzer BaseAnalyzerStep

the step responsible for extracting the statistics and the schema

required
preprocessor BasePreprocessorStep

the step responsible for preprocessing the data

required
trainer BaseTrainerStep

the step responsible for training a model

required
evaluator BaseEvaluatorStep

the step responsible for computing the evaluation of the trained model

required
Source code in zenml/pipelines/builtin_pipelines/training_pipeline.py
def connect(  # type: ignore[override]
    self,
    datasource: step_interfaces.BaseDatasourceStep,
    splitter: step_interfaces.BaseSplitStep,
    analyzer: step_interfaces.BaseAnalyzerStep,
    preprocessor: step_interfaces.BasePreprocessorStep,
    trainer: step_interfaces.BaseTrainerStep,
    evaluator: step_interfaces.BaseEvaluatorStep,
) -> None:
    """Main connect method for the standard training pipelines

    Args:
        datasource: the step responsible for the data ingestion
        splitter: the step responsible for splitting the dataset into
            train, test, val
        analyzer: the step responsible for extracting the statistics and
            the schema
        preprocessor: the step responsible for preprocessing the data
        trainer: the step responsible for training a model
        evaluator: the step responsible for computing the evaluation of
            the trained model
    """
    # Ingesting the datasource
    dataset = datasource()

    # Splitting the data
    train, test, validation = splitter(dataset=dataset)  # type:ignore

    # Analyzing the train dataset
    statistics, schema = analyzer(dataset=train)  # type:ignore

    # Preprocessing the splits
    train_t, test_t, validation_t = preprocessor(  # type:ignore
        train_dataset=train,
        test_dataset=test,
        validation_dataset=validation,
        statistics=statistics,
        schema=schema,
    )

    # Training the model
    model = trainer(train_dataset=train_t, validation_dataset=validation_t)

    # Evaluating the trained model
    evaluator(model=model, dataset=test_t)  # type:ignore

pipeline_decorator

pipeline(_func=None, *, name=None, enable_cache=True, required_integrations=(), requirements_file=None, dockerignore_file=None, secrets=None)

Outer decorator function for the creation of a ZenML pipeline

In order to be able to work with parameters such as "name", it features a nested decorator structure.

Parameters:

Name Type Description Default
_func Optional[~F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache bool

Whether to use caching or not.

True
required_integrations Sequence[str]

Optional list of ZenML integrations that are required to run this pipeline. Run zenml integration list for a full list of available integrations.

()
requirements_file Optional[str]

Optional path to a pip requirements file that contains requirements to run the pipeline.

None
dockerignore_file Optional[str]

Optional path to a dockerignore file to use when building docker images for running this pipeline. Note: If you pass a file, make sure it does not include the .zen directory as it is needed to run ZenML inside the container.

None

Returns:

Type Description
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]]

the inner decorator which creates the pipeline class based on the ZenML BasePipeline

Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: bool = True,
    required_integrations: Sequence[str] = (),
    requirements_file: Optional[str] = None,
    dockerignore_file: Optional[str] = None,
    secrets: Optional[List[str]] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline

    In order to be able to work with parameters such as "name", it features a
    nested decorator structure.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        required_integrations: Optional list of ZenML integrations that are
            required to run this pipeline. Run `zenml integration list` for
            a full list of available integrations.
        requirements_file: Optional path to a pip requirements file that
            contains requirements to run the pipeline.
        dockerignore_file: Optional path to a dockerignore file to use when
            building docker images for running this pipeline.
            **Note**: If you pass a file, make sure it does not include the
            `.zen` directory as it is needed to run ZenML inside the container.

    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """

    def inner_decorator(func: F) -> Type[BasePipeline]:
        """Inner decorator function for the creation of a ZenML Pipeline

        Args:
          func: types.FunctionType, this function will be used as the
            "connect" method of the generated Pipeline

        Returns:
            the class of a newly generated ZenML Pipeline

        """
        return type(  # noqa
            name if name else func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type] # noqa
                INSTANCE_CONFIGURATION: {
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_REQUIRED_INTEGRATIONS: required_integrations,
                    PARAM_REQUIREMENTS_FILE: requirements_file,
                    PARAM_DOCKERIGNORE_FILE: dockerignore_file,
                    PARAM_SECRETS: secrets,
                },
            },
        )

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

schedule

Schedule (BaseModel) pydantic-model

Class for defining a pipeline schedule.

Attributes:

Name Type Description
start_time datetime

Datetime object to indicate when to start the schedule.

end_time Optional[datetime.datetime]

Datetime object to indicate when to end the schedule.

interval_second int

Integer indicating the seconds between two recurring runs in for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

Source code in zenml/pipelines/schedule.py
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        start_time: Datetime object to indicate when to start the schedule.
        end_time: Datetime object to indicate when to end the schedule.
        interval_second: Integer indicating the seconds between two recurring
            runs in for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterwards. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
    """

    start_time: datetime.datetime
    end_time: Optional[datetime.datetime] = None
    interval_second: int
    catchup: bool = False

    @property
    def utc_start_time(self) -> str:
        """ISO-formatted string of the UTC start time."""
        return self.start_time.astimezone(datetime.timezone.utc).isoformat()

    @property
    def utc_end_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC end time."""
        if not self.end_time:
            return None

        return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC end time.

utc_start_time: str property readonly

ISO-formatted string of the UTC start time.