Skip to content

Airflow

zenml.integrations.airflow special

Airflow integration for ZenML.

The Airflow integration powers an alternative orchestrator.

AirflowIntegration (Integration)

Definition of Airflow Integration for ZenML.

Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
    """Definition of Airflow Integration for ZenML."""

    NAME = AIRFLOW
    REQUIREMENTS = []

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Airflow integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.airflow.flavors import (
            AirflowOrchestratorFlavor,
        )

        return [AirflowOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Airflow integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Airflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.airflow.flavors import (
        AirflowOrchestratorFlavor,
    )

    return [AirflowOrchestratorFlavor]

flavors special

Airflow integration flavors.

airflow_orchestrator_flavor

Airflow orchestrator flavor.

AirflowOrchestratorConfig (BaseOrchestratorConfig, AirflowOrchestratorSettings)

Configuration for the Airflow orchestrator.

Attributes:

Name Type Description
local bool

If the orchestrator is local or not. If this is True, will spin up a local Airflow server to run pipelines.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorConfig(
    BaseOrchestratorConfig, AirflowOrchestratorSettings
):
    """Configuration for the Airflow orchestrator.

    Attributes:
        local: If the orchestrator is local or not. If this is True, will spin
            up a local Airflow server to run pipelines.
    """

    local: bool = True
AirflowOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Airflow orchestrator.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Airflow orchestrator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AIRFLOW_ORCHESTRATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/airflow.png"

    @property
    def config_class(self) -> Type[AirflowOrchestratorConfig]:
        """Returns `AirflowOrchestratorConfig` config class.

        Returns:
            The config class.
        """
        return AirflowOrchestratorConfig

    @property
    def implementation_class(self) -> Type["AirflowOrchestrator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.airflow.orchestrators import (
            AirflowOrchestrator,
        )

        return AirflowOrchestrator
config_class: Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig] property readonly

Returns AirflowOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AirflowOrchestrator] property readonly

Implementation class.

Returns:

Type Description
Type[AirflowOrchestrator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

AirflowOrchestratorSettings (BaseSettings)

Settings for the Airflow orchestrator.

Attributes:

Name Type Description
dag_output_dir Optional[str]

Output directory in which to write the Airflow DAG.

dag_id Optional[str]

Optional ID of the Airflow DAG to create. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_tags List[str]

Tags to add to the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_args Dict[str, Any]

Arguments for initializing the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

operator str

The operator to use for one or all steps. This can either be a zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType or a string representing the source of the operator class to use (e.g. airflow.providers.docker.operators.docker.DockerOperator)

operator_args Dict[str, Any]

Arguments for initializing the Airflow operator.

custom_dag_generator Optional[str]

Source string of a module to use for generating Airflow DAGs. This module must contain the same classes and constants as the zenml.integrations.airflow.orchestrators.dag_generator module. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorSettings(BaseSettings):
    """Settings for the Airflow orchestrator.

    Attributes:
        dag_output_dir: Output directory in which to write the Airflow DAG.
        dag_id: Optional ID of the Airflow DAG to create. This value is only
            applied if the settings are defined on a ZenML pipeline and
            ignored if defined on a step.
        dag_tags: Tags to add to the Airflow DAG. This value is only
            applied if the settings are defined on a ZenML pipeline and
            ignored if defined on a step.
        dag_args: Arguments for initializing the Airflow DAG. This
            value is only applied if the settings are defined on a ZenML
            pipeline and ignored if defined on a step.
        operator: The operator to use for one or all steps. This can either be
            a `zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType`
            or a string representing the source of the operator class to use
            (e.g. `airflow.providers.docker.operators.docker.DockerOperator`)
        operator_args: Arguments for initializing the Airflow
            operator.
        custom_dag_generator: Source string of a module to use for generating
            Airflow DAGs. This module must contain the same classes and
            constants as the
            `zenml.integrations.airflow.orchestrators.dag_generator` module.
            This value is only applied if the settings are defined on a ZenML
            pipeline and ignored if defined on a step.
    """

    dag_output_dir: Optional[str] = None

    dag_id: Optional[str] = None
    dag_tags: List[str] = []
    dag_args: Dict[str, Any] = {}

    operator: str = OperatorType.DOCKER.source
    operator_args: Dict[str, Any] = {}

    custom_dag_generator: Optional[str] = None

    @field_validator("operator", mode="before")
    @classmethod
    def _convert_operator(cls, value: Any) -> Any:
        """Converts operator types to source strings.

        Args:
            value: The operator type value.

        Returns:
            The operator source.
        """
        if isinstance(value, OperatorType):
            return value.source

        try:
            return OperatorType(value).source
        except ValueError:
            return value
OperatorType (Enum)

Airflow operator types.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class OperatorType(Enum):
    """Airflow operator types."""

    DOCKER = "docker"
    KUBERNETES_POD = "kubernetes_pod"
    GKE_START_POD = "gke_start_pod"

    @property
    def source(self) -> str:
        """Operator source.

        Returns:
            The operator source.
        """
        return {
            OperatorType.DOCKER: "airflow.providers.docker.operators.docker.DockerOperator",
            OperatorType.KUBERNETES_POD: "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator",
            OperatorType.GKE_START_POD: "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator",
        }[self]

orchestrators special

The Airflow integration enables the use of Airflow as a pipeline orchestrator.

airflow_orchestrator

Implementation of Airflow orchestrator integration.

AirflowOrchestrator (ContainerizedOrchestrator)

Orchestrator responsible for running pipelines using Airflow.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(ContainerizedOrchestrator):
    """Orchestrator responsible for running pipelines using Airflow."""

    def __init__(self, **values: Any):
        """Initialize the orchestrator.

        Args:
            **values: Values to set in the orchestrator.
        """
        super().__init__(**values)
        self.dags_directory = os.path.join(
            io_utils.get_global_config_directory(),
            "airflow",
            str(self.id),
            "dags",
        )

    @property
    def config(self) -> AirflowOrchestratorConfig:
        """Returns the orchestrator config.

        Returns:
            The configuration.
        """
        return cast(AirflowOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Kubeflow orchestrator.

        Returns:
            The settings class.
        """
        return AirflowOrchestratorSettings

    @property
    def validator(self) -> Optional["StackValidator"]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry
        and only remote components.

        Returns:
            A `StackValidator` instance.
        """
        if self.config.local:
            # No container registry required if just running locally.
            return None
        else:

            def _validate_remote_components(
                stack: "Stack",
            ) -> Tuple[bool, str]:
                for component in stack.components.values():
                    if not component.config.is_local:
                        continue

                    return False, (
                        f"The Airflow orchestrator is configured to run "
                        f"pipelines remotely, but the '{component.name}' "
                        f"{component.type.value} is a local stack component "
                        f"and will not be available in the Airflow "
                        f"task.\nPlease ensure that you always use non-local "
                        f"stack components with a remote Airflow orchestrator, "
                        f"otherwise you may run into pipeline execution "
                        f"problems."
                    )

                return True, ""

            return StackValidator(
                required_components={
                    StackComponentType.CONTAINER_REGISTRY,
                    StackComponentType.IMAGE_BUILDER,
                },
                custom_validation_function=_validate_remote_components,
            )

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
    ) -> None:
        """Builds a Docker image to run pipeline steps.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        if self.config.local:
            stack.check_local_paths()

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Creates and writes an Airflow DAG zip file.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        """
        pipeline_settings = cast(
            AirflowOrchestratorSettings, self.get_settings(deployment)
        )

        dag_generator_values = get_dag_generator_values(
            custom_dag_generator_source=pipeline_settings.custom_dag_generator
        )

        command = StepEntrypointConfiguration.get_entrypoint_command()

        tasks = []
        for step_name, step in deployment.step_configurations.items():
            settings = cast(
                AirflowOrchestratorSettings, self.get_settings(step)
            )
            image = self.get_image(deployment=deployment, step_name=step_name)
            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            )
            operator_args = settings.operator_args.copy()
            if self.requires_resources_in_orchestration_environment(step=step):
                if settings.operator == OperatorType.KUBERNETES_POD.source:
                    self._apply_resource_settings(
                        resource_settings=step.config.resource_settings,
                        operator_args=operator_args,
                    )
                else:
                    logger.warning(
                        "Specifying step resources is only supported when "
                        "using KubernetesPodOperators, ignoring resource "
                        "configuration for step %s.",
                        step_name,
                    )

            task = dag_generator_values.task_configuration_class(
                id=step_name,
                zenml_step_name=step_name,
                upstream_steps=step.spec.upstream_steps,
                docker_image=image,
                command=command,
                arguments=arguments,
                environment=environment,
                operator_source=settings.operator,
                operator_args=operator_args,
            )
            tasks.append(task)

        local_stores_path = (
            os.path.expanduser(GlobalConfiguration().local_stores_path)
            if self.config.local
            else None
        )

        dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
            pipeline_name=deployment.pipeline_configuration.name
        )
        dag_config = dag_generator_values.dag_configuration_class(
            id=dag_id,
            local_stores_path=local_stores_path,
            tasks=tasks,
            tags=pipeline_settings.dag_tags,
            dag_args=pipeline_settings.dag_args,
            **self._translate_schedule(deployment.schedule),
        )

        self._write_dag(
            dag_config,
            dag_generator_values=dag_generator_values,
            output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
        )

    def _apply_resource_settings(
        self,
        resource_settings: "ResourceSettings",
        operator_args: Dict[str, Any],
    ) -> None:
        """Adds resource settings to the operator args.

        Args:
            resource_settings: The resource settings to add.
            operator_args: The operator args which will get modified in-place.
        """
        if "container_resources" in operator_args:
            logger.warning(
                "Received duplicate resources from ResourceSettings: `%s`"
                "and operator_args: `%s`. Ignoring the resources defined by "
                "the ResourceSettings.",
                resource_settings,
                operator_args["container_resources"],
            )
        else:
            limits = {}

            if resource_settings.cpu_count is not None:
                limits["cpu"] = str(resource_settings.cpu_count)

            if resource_settings.memory is not None:
                memory_limit = resource_settings.memory[:-1]
                limits["memory"] = memory_limit

            if resource_settings.gpu_count is not None:
                logger.warning(
                    "Specifying GPU resources is not supported for the Airflow "
                    "orchestrator."
                )

            operator_args["container_resources"] = {"limits": limits}

    def _write_dag(
        self,
        dag_config: "DagConfiguration",
        dag_generator_values: DagGeneratorValues,
        output_dir: str,
    ) -> None:
        """Writes an Airflow DAG to disk.

        Args:
            dag_config: Configuration of the DAG to write.
            dag_generator_values: Values of the DAG generator to use.
            output_dir: The directory in which to write the DAG.
        """
        io_utils.create_dir_recursive_if_not_exists(output_dir)

        if self.config.local and output_dir == self.dags_directory:
            logger.warning(
                "You're using a local Airflow orchestrator but have not "
                "specified a custom DAG output directory. Unless you've "
                "configured your Airflow server to look for DAGs in this "
                "directory (%s), this DAG will not be found automatically "
                "by your local Airflow server.",
                output_dir,
            )

        def _write_zip(path: str) -> None:
            with zipfile.ZipFile(path, mode="w") as z:
                z.write(dag_generator_values.file, arcname="dag.py")
                z.writestr(
                    dag_generator_values.config_file_name,
                    dag_config.model_dump_json(),
                )

            logger.info("Writing DAG definition to `%s`.", path)

        dag_filename = f"{dag_config.id}.zip"
        if io_utils.is_remote(output_dir):
            io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
            local_zip_path = os.path.join(self.dags_directory, dag_filename)
            remote_zip_path = os.path.join(output_dir, dag_filename)
            _write_zip(local_zip_path)
            try:
                fileio.copy(local_zip_path, remote_zip_path)
                logger.info("Copied DAG definition to `%s`.", remote_zip_path)
            except Exception as e:
                logger.exception(e)
                logger.error(
                    "Failed to upload DAG to remote path `%s`. To run the "
                    "pipeline in Airflow, please manually copy the file `%s` "
                    "to your Airflow DAG directory.",
                    remote_zip_path,
                    local_zip_path,
                )
        else:
            zip_path = os.path.join(output_dir, dag_filename)
            _write_zip(zip_path)

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        from zenml.integrations.airflow.orchestrators.dag_generator import (
            ENV_ZENML_AIRFLOW_RUN_ID,
        )

        try:
            return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_AIRFLOW_RUN_ID}."
            )

    @staticmethod
    def _translate_schedule(
        schedule: Optional["Schedule"] = None,
    ) -> Dict[str, Any]:
        """Convert ZenML schedule into Airflow schedule.

        The Airflow schedule uses slightly different naming and needs some
        default entries for execution without a schedule.

        Args:
            schedule: Containing the interval, start and end date and
                a boolean flag that defines if past runs should be caught up
                on

        Returns:
            Airflow configuration dict.
        """
        if schedule:
            if schedule.cron_expression:
                start_time = schedule.start_time or (
                    datetime.datetime.utcnow() - datetime.timedelta(7)
                )
                return {
                    "schedule": schedule.cron_expression,
                    "start_date": start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }
            else:
                return {
                    "schedule": schedule.interval_second,
                    "start_date": schedule.start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }

        return {
            "schedule": "@once",
            # set a start time in the past and disable catchup so airflow
            # runs the dag immediately
            "start_date": datetime.datetime.utcnow() - datetime.timedelta(7),
            "catchup": False,
        }
config: AirflowOrchestratorConfig property readonly

Returns the orchestrator config.

Returns:

Type Description
AirflowOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

__init__(self, **values) special

Initialize the orchestrator.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
    """Initialize the orchestrator.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self.dags_directory = os.path.join(
        io_utils.get_global_config_directory(),
        "airflow",
        str(self.id),
        "dags",
    )
get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    from zenml.integrations.airflow.orchestrators.dag_generator import (
        ENV_ZENML_AIRFLOW_RUN_ID,
    )

    try:
        return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AIRFLOW_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack, environment)

Creates and writes an Airflow DAG zip file.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Creates and writes an Airflow DAG zip file.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    """
    pipeline_settings = cast(
        AirflowOrchestratorSettings, self.get_settings(deployment)
    )

    dag_generator_values = get_dag_generator_values(
        custom_dag_generator_source=pipeline_settings.custom_dag_generator
    )

    command = StepEntrypointConfiguration.get_entrypoint_command()

    tasks = []
    for step_name, step in deployment.step_configurations.items():
        settings = cast(
            AirflowOrchestratorSettings, self.get_settings(step)
        )
        image = self.get_image(deployment=deployment, step_name=step_name)
        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )
        operator_args = settings.operator_args.copy()
        if self.requires_resources_in_orchestration_environment(step=step):
            if settings.operator == OperatorType.KUBERNETES_POD.source:
                self._apply_resource_settings(
                    resource_settings=step.config.resource_settings,
                    operator_args=operator_args,
                )
            else:
                logger.warning(
                    "Specifying step resources is only supported when "
                    "using KubernetesPodOperators, ignoring resource "
                    "configuration for step %s.",
                    step_name,
                )

        task = dag_generator_values.task_configuration_class(
            id=step_name,
            zenml_step_name=step_name,
            upstream_steps=step.spec.upstream_steps,
            docker_image=image,
            command=command,
            arguments=arguments,
            environment=environment,
            operator_source=settings.operator,
            operator_args=operator_args,
        )
        tasks.append(task)

    local_stores_path = (
        os.path.expanduser(GlobalConfiguration().local_stores_path)
        if self.config.local
        else None
    )

    dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    dag_config = dag_generator_values.dag_configuration_class(
        id=dag_id,
        local_stores_path=local_stores_path,
        tasks=tasks,
        tags=pipeline_settings.dag_tags,
        dag_args=pipeline_settings.dag_args,
        **self._translate_schedule(deployment.schedule),
    )

    self._write_dag(
        dag_config,
        dag_generator_values=dag_generator_values,
        output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
    )
prepare_pipeline_deployment(self, deployment, stack)

Builds a Docker image to run pipeline steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
) -> None:
    """Builds a Docker image to run pipeline steps.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    if self.config.local:
        stack.check_local_paths()
DagGeneratorValues (tuple)

Values from the DAG generator module.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class DagGeneratorValues(NamedTuple):
    """Values from the DAG generator module."""

    file: str
    config_file_name: str
    run_id_env_variable_name: str
    dag_configuration_class: Type["DagConfiguration"]
    task_configuration_class: Type["TaskConfiguration"]
__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class) special staticmethod

Create new instance of DagGeneratorValues(file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)

__repr__(self) special

Return a nicely formatted representation string

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
get_dag_generator_values(custom_dag_generator_source=None)

Gets values from the DAG generator module.

Parameters:

Name Type Description Default
custom_dag_generator_source Optional[str]

Source of a custom DAG generator module.

None

Returns:

Type Description
DagGeneratorValues

DAG generator module values.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_dag_generator_values(
    custom_dag_generator_source: Optional[str] = None,
) -> DagGeneratorValues:
    """Gets values from the DAG generator module.

    Args:
        custom_dag_generator_source: Source of a custom DAG generator module.

    Returns:
        DAG generator module values.
    """
    if custom_dag_generator_source:
        module = importlib.import_module(custom_dag_generator_source)
    else:
        from zenml.integrations.airflow.orchestrators import dag_generator

        module = dag_generator

    assert module.__file__
    return DagGeneratorValues(
        file=module.__file__,
        config_file_name=module.CONFIG_FILENAME,
        run_id_env_variable_name=module.ENV_ZENML_AIRFLOW_RUN_ID,
        dag_configuration_class=module.DagConfiguration,
        task_configuration_class=module.TaskConfiguration,
    )

dag_generator

Module to generate an Airflow DAG from a config file.

DagConfiguration (BaseModel)

Airflow DAG configuration.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class DagConfiguration(BaseModel):
    """Airflow DAG configuration."""

    id: str
    tasks: List[TaskConfiguration]

    local_stores_path: Optional[str] = None

    schedule: Union[datetime.timedelta, str] = Field(
        union_mode="left_to_right"
    )
    start_date: datetime.datetime
    end_date: Optional[datetime.datetime] = None
    catchup: bool = False

    tags: List[str] = []
    dag_args: Dict[str, Any] = {}
TaskConfiguration (BaseModel)

Airflow task configuration.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class TaskConfiguration(BaseModel):
    """Airflow task configuration."""

    id: str
    zenml_step_name: str
    upstream_steps: List[str]

    docker_image: str
    command: List[str]
    arguments: List[str]

    environment: Dict[str, str] = {}

    operator_source: str
    operator_args: Dict[str, Any] = {}
get_docker_operator_init_kwargs(dag_config, task_config)

Gets keyword arguments to pass to the DockerOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_docker_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the DockerOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    mounts = []
    extra_hosts = {}
    environment = task_config.environment
    environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"

    if dag_config.local_stores_path:
        from docker.types import Mount

        environment[ENV_ZENML_LOCAL_STORES_PATH] = dag_config.local_stores_path
        mounts = [
            Mount(
                target=dag_config.local_stores_path,
                source=dag_config.local_stores_path,
                type="bind",
            )
        ]
        extra_hosts = {"host.docker.internal": "host-gateway"}
    return {
        "image": task_config.docker_image,
        "command": task_config.command + task_config.arguments,
        "mounts": mounts,
        "environment": environment,
        "extra_hosts": extra_hosts,
    }
get_kubernetes_pod_operator_init_kwargs(dag_config, task_config)

Gets keyword arguments to pass to the KubernetesPodOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_kubernetes_pod_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the KubernetesPodOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    from kubernetes.client.models import V1EnvVar

    environment = task_config.environment
    environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"

    return {
        "name": f"{dag_config.id}_{task_config.id}",
        "namespace": "default",
        "image": task_config.docker_image,
        "cmds": task_config.command,
        "arguments": task_config.arguments,
        "env_vars": [
            V1EnvVar(name=key, value=value)
            for key, value in environment.items()
        ],
    }
get_operator_init_kwargs(operator_class, dag_config, task_config)

Gets keyword arguments to pass to the operator init method.

Parameters:

Name Type Description Default
operator_class Type[Any]

The operator class for which to get the kwargs.

required
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_operator_init_kwargs(
    operator_class: Type[Any],
    dag_config: DagConfiguration,
    task_config: TaskConfiguration,
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the operator init method.

    Args:
        operator_class: The operator class for which to get the kwargs.
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    init_kwargs = {"task_id": task_config.id}

    try:
        from airflow.providers.docker.operators.docker import DockerOperator

        if issubclass(operator_class, DockerOperator):
            init_kwargs.update(
                get_docker_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    try:
        from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
            KubernetesPodOperator,
        )

        if issubclass(operator_class, KubernetesPodOperator):
            init_kwargs.update(
                get_kubernetes_pod_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    init_kwargs.update(task_config.operator_args)
    return init_kwargs
import_class_by_path(class_path)

Imports a class based on a given path.

Parameters:

Name Type Description Default
class_path str

str, class_source e.g. this.module.Class

required

Returns:

Type Description
Type[Any]

the given class

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def import_class_by_path(class_path: str) -> Type[Any]:
    """Imports a class based on a given path.

    Args:
        class_path: str, class_source e.g. this.module.Class

    Returns:
        the given class
    """
    module_name, class_name = class_path.rsplit(".", 1)
    module = importlib.import_module(module_name)
    return getattr(module, class_name)  # type: ignore[no-any-return]