Skip to content

Airflow

zenml.integrations.airflow special

Airflow integration for ZenML.

The Airflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Airflow orchestrator with the CLI tool, then bootstrap using the zenml orchestrator up command.

AirflowIntegration (Integration)

Definition of Airflow Integration for ZenML.

Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
    """Definition of Airflow Integration for ZenML."""

    NAME = AIRFLOW
    REQUIREMENTS = ["apache-airflow==2.2.0"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Airflow integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor

        return [AirflowOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Airflow integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Airflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor

    return [AirflowOrchestratorFlavor]

flavors special

Airflow integration flavors.

airflow_orchestrator_flavor

Airflow orchestrator flavor.

AirflowOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Airflow orchestrator.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Airflow orchestrator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AIRFLOW_ORCHESTRATOR_FLAVOR

    @property
    def implementation_class(self) -> Type["AirflowOrchestrator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.airflow.orchestrators import AirflowOrchestrator

        return AirflowOrchestrator
implementation_class: Type[AirflowOrchestrator] property readonly

Implementation class.

Returns:

Type Description
Type[AirflowOrchestrator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

orchestrators special

The Airflow integration enables the use of Airflow as a pipeline orchestrator.

airflow_orchestrator

Implementation of Airflow orchestrator integration.

AirflowOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines using Airflow.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines using Airflow."""

    def __init__(self, **values: Any):
        """Sets environment variables to configure airflow.

        Args:
            **values: Values to set in the orchestrator.
        """
        super().__init__(**values)
        self.airflow_home = os.path.join(
            io_utils.get_global_config_directory(),
            AIRFLOW_ROOT_DIR,
            str(self.id),
        )
        self._set_env()

    @staticmethod
    def _translate_schedule(
        schedule: Optional[Schedule] = None,
    ) -> Dict[str, Any]:
        """Convert ZenML schedule into Airflow schedule.

        The Airflow schedule uses slightly different naming and needs some
        default entries for execution without a schedule.

        Args:
            schedule: Containing the interval, start and end date and
                a boolean flag that defines if past runs should be caught up
                on

        Returns:
            Airflow configuration dict.
        """
        if schedule:
            if schedule.cron_expression:
                start_time = schedule.start_time or (
                    datetime.datetime.now() - datetime.timedelta(1)
                )
                return {
                    "schedule_interval": schedule.cron_expression,
                    "start_date": start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }
            else:
                return {
                    "schedule_interval": schedule.interval_second,
                    "start_date": schedule.start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }

        return {
            "schedule_interval": "@once",
            # set the a start time in the past and disable catchup so airflow runs the dag immediately
            "start_date": datetime.datetime.now() - datetime.timedelta(7),
            "catchup": False,
        }

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Creates an Airflow DAG as the intermediate representation for the pipeline.

        This DAG will be loaded by airflow in the target environment
        and used for orchestration of the pipeline.

        How it works:
        -------------
        A new airflow_dag is instantiated with the pipeline name and among
        others things the run schedule.

        For each step of the pipeline a callable is created. This callable
        uses the run_step() method to execute the step. The parameters of
        this callable are pre-filled and an airflow step_operator is created
        within the dag. The dependencies to upstream steps are then
        configured.

        Finally, the dag is fully complete and can be returned.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Returns:
            The Airflow DAG.
        """
        import airflow
        from airflow.operators import python as airflow_python

        # Instantiate and configure airflow Dag with name and schedule
        airflow_dag = airflow.DAG(
            dag_id=deployment.pipeline.name,
            is_paused_upon_creation=False,
            **self._translate_schedule(deployment.schedule),
        )

        # Dictionary mapping step names to airflow_operators. This will be needed
        # to configure airflow operator dependencies
        step_name_to_airflow_operator = {}

        for step in deployment.steps.values():
            # Create callable that will be used by airflow to execute the step
            # within the orchestrated environment
            def _step_callable(step_instance: "Step", **kwargs):
                if self.requires_resources_in_orchestration_environment(step):
                    logger.warning(
                        "Specifying step resources is not yet supported for "
                        "the Airflow orchestrator, ignoring resource "
                        "configuration for step %s.",
                        step.name,
                    )
                # Extract run name for the kwargs that will be passed to the
                # callable
                run_name = kwargs["ti"].get_dagrun().run_id
                self._prepare_run(deployment=deployment)
                self.run_step(step=step_instance, run_name=run_name)
                self._cleanup_run()

            # Create airflow python operator that contains the step callable
            airflow_operator = airflow_python.PythonOperator(
                dag=airflow_dag,
                task_id=step.config.name,
                provide_context=True,
                python_callable=functools.partial(
                    _step_callable, step_instance=step
                ),
            )

            # Configure the current airflow operator to run after all upstream
            # operators finished executing
            step_name_to_airflow_operator[step.config.name] = airflow_operator
            for upstream_step_name in step.spec.upstream_steps:
                airflow_operator.set_upstream(
                    step_name_to_airflow_operator[upstream_step_name]
                )

        # Return the finished airflow dag
        return airflow_dag

    @property
    def dags_directory(self) -> str:
        """Returns path to the airflow dags directory.

        Returns:
            Path to the airflow dags directory.
        """
        return os.path.join(self.airflow_home, "dags")

    @property
    def pid_file(self) -> str:
        """Returns path to the daemon PID file.

        Returns:
            Path to the daemon PID file.
        """
        return os.path.join(self.airflow_home, "airflow_daemon.pid")

    @property
    def log_file(self) -> str:
        """Returns path to the airflow log file.

        Returns:
            str: Path to the airflow log file.
        """
        return os.path.join(self.airflow_home, "airflow_orchestrator.log")

    @property
    def password_file(self) -> str:
        """Returns path to the webserver password file.

        Returns:
            Path to the webserver password file.
        """
        return os.path.join(self.airflow_home, "standalone_admin_password.txt")

    def _set_env(self) -> None:
        """Sets environment variables to configure airflow."""
        os.environ["AIRFLOW_HOME"] = self.airflow_home
        os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
        os.environ["AIRFLOW__CORE__DAG_DISCOVERY_SAFE_MODE"] = "false"
        os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
        # check the DAG folder every 10 seconds for new files
        os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"

    def _copy_to_dag_directory_if_necessary(self, dag_filepath: str) -> None:
        """Copies DAG module to the Airflow DAGs directory if not already present.

        Args:
            dag_filepath: Path to the file in which the DAG is defined.
        """
        dags_directory = io_utils.resolve_relative_path(self.dags_directory)

        if dags_directory == os.path.dirname(dag_filepath):
            logger.debug("File is already in airflow DAGs directory.")
        else:
            logger.debug(
                "Copying dag file '%s' to DAGs directory.", dag_filepath
            )
            destination_path = os.path.join(
                dags_directory, os.path.basename(dag_filepath)
            )
            if fileio.exists(destination_path):
                logger.info(
                    "File '%s' already exists, overwriting with new DAG file",
                    destination_path,
                )
            fileio.copy(dag_filepath, destination_path, overwrite=True)

    def _log_webserver_credentials(self) -> None:
        """Logs URL and credentials to log in to the airflow webserver.

        Raises:
            FileNotFoundError: If the password file does not exist.
        """
        if fileio.exists(self.password_file):
            with open(self.password_file) as file:
                password = file.read().strip()
        else:
            raise FileNotFoundError(
                f"Can't find password file '{self.password_file}'"
            )
        logger.info(
            "To inspect your DAGs, login to http://localhost:8080 "
            "with username: admin password: %s",
            password,
        )

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Checks Airflow is running and copies DAG file to the DAGs directory.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.

        Raises:
            RuntimeError: If Airflow is not running or no DAG filepath runtime
                          option is provided.
        """
        if not self.is_running:
            raise RuntimeError(
                "Airflow orchestrator is currently not running. Run `zenml "
                "stack up` to provision resources for the active stack."
            )

        if Environment.in_notebook():
            raise RuntimeError(
                "Unable to run the Airflow orchestrator from within a "
                "notebook. Airflow requires a python file which contains a "
                "global Airflow DAG object and therefore does not work with "
                "notebooks. Please copy your ZenML pipeline code in a python "
                "file and try again."
            )

        try:
            dag_filepath = deployment.pipeline.extra[DAG_FILEPATH_OPTION_KEY]
        except KeyError:
            raise RuntimeError(
                f"No DAG filepath found in runtime configuration. Make sure "
                f"to add the filepath to your airflow DAG file as a runtime "
                f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
            )

        self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)

    @property
    def is_running(self) -> bool:
        """Returns whether the airflow daemon is currently running.

        Returns:
            True if the daemon is running, False otherwise.

        Raises:
            RuntimeError: If port 8080 is occupied.
        """
        from airflow.cli.commands.standalone_command import StandaloneCommand
        from airflow.jobs.triggerer_job import TriggererJob

        daemon_running = daemon.check_if_daemon_is_running(self.pid_file)

        command = StandaloneCommand()
        webserver_port_open = command.port_open(8080)

        if not daemon_running:
            if webserver_port_open:
                raise RuntimeError(
                    "The airflow daemon does not seem to be running but "
                    "local port 8080 is occupied. Make sure the port is "
                    "available and try again."
                )

            # exit early so we don't check non-existing airflow databases
            return False

        # we can't use StandaloneCommand().is_ready() here as the
        # Airflow SequentialExecutor apparently does not send a heartbeat
        # while running a task which would result in this returning `False`
        # even if Airflow is running.
        airflow_running = webserver_port_open and command.job_running(
            TriggererJob
        )
        return airflow_running

    @property
    def is_provisioned(self) -> bool:
        """Returns whether the airflow daemon is currently running.

        Returns:
            True if the airflow daemon is running, False otherwise.
        """
        return self.is_running

    def provision(self) -> None:
        """Ensures that Airflow is running."""
        if self.is_running:
            logger.info("Airflow is already running.")
            self._log_webserver_credentials()
            return

        if not fileio.exists(self.dags_directory):
            io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

        from airflow.cli.commands.standalone_command import StandaloneCommand

        try:
            command = StandaloneCommand()
            # Skip pipeline registration inside the airflow server process.
            # When searching for DAGs, airflow imports the runner file in a
            # randomly generated module. If we don't skip pipeline registration,
            # it would fail by trying to register a pipeline with an existing
            # name but different module sources for the steps.
            with set_environment_variable(
                key=ENV_ZENML_SKIP_PIPELINE_REGISTRATION, value="True"
            ):
                # Run the daemon with a working directory inside the current
                # zenml repo so the same repo will be used to run the DAGs
                daemon.run_as_daemon(
                    command.run,
                    pid_file=self.pid_file,
                    log_file=self.log_file,
                    working_directory=get_source_root_path(),
                )
            while not self.is_running:
                # Wait until the daemon started all the relevant airflow
                # processes
                time.sleep(0.1)
            self._log_webserver_credentials()
        except Exception as e:
            logger.error(e)
            logger.error(
                "An error occurred while starting the Airflow daemon. If you "
                "want to start it manually, use the commands described in the "
                "official Airflow quickstart guide for running Airflow locally."
            )
            self.deprovision()

    def deprovision(self) -> None:
        """Stops the airflow daemon if necessary and tears down resources."""
        if self.is_running:
            daemon.stop_daemon(self.pid_file)

        fileio.rmtree(self.airflow_home)
        logger.info("Airflow spun down.")
dags_directory: str property readonly

Returns path to the airflow dags directory.

Returns:

Type Description
str

Path to the airflow dags directory.

is_provisioned: bool property readonly

Returns whether the airflow daemon is currently running.

Returns:

Type Description
bool

True if the airflow daemon is running, False otherwise.

is_running: bool property readonly

Returns whether the airflow daemon is currently running.

Returns:

Type Description
bool

True if the daemon is running, False otherwise.

Exceptions:

Type Description
RuntimeError

If port 8080 is occupied.

log_file: str property readonly

Returns path to the airflow log file.

Returns:

Type Description
str

Path to the airflow log file.

password_file: str property readonly

Returns path to the webserver password file.

Returns:

Type Description
str

Path to the webserver password file.

pid_file: str property readonly

Returns path to the daemon PID file.

Returns:

Type Description
str

Path to the daemon PID file.

__init__(self, **values) special

Sets environment variables to configure airflow.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
    """Sets environment variables to configure airflow.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self.airflow_home = os.path.join(
        io_utils.get_global_config_directory(),
        AIRFLOW_ROOT_DIR,
        str(self.id),
    )
    self._set_env()
deprovision(self)

Stops the airflow daemon if necessary and tears down resources.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
    """Stops the airflow daemon if necessary and tears down resources."""
    if self.is_running:
        daemon.stop_daemon(self.pid_file)

    fileio.rmtree(self.airflow_home)
    logger.info("Airflow spun down.")
prepare_or_run_pipeline(self, deployment, stack)

Creates an Airflow DAG as the intermediate representation for the pipeline.

This DAG will be loaded by airflow in the target environment and used for orchestration of the pipeline.

How it works:

A new airflow_dag is instantiated with the pipeline name and among others things the run schedule.

For each step of the pipeline a callable is created. This callable uses the run_step() method to execute the step. The parameters of this callable are pre-filled and an airflow step_operator is created within the dag. The dependencies to upstream steps are then configured.

Finally, the dag is fully complete and can be returned.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Returns:

Type Description
Any

The Airflow DAG.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Creates an Airflow DAG as the intermediate representation for the pipeline.

    This DAG will be loaded by airflow in the target environment
    and used for orchestration of the pipeline.

    How it works:
    -------------
    A new airflow_dag is instantiated with the pipeline name and among
    others things the run schedule.

    For each step of the pipeline a callable is created. This callable
    uses the run_step() method to execute the step. The parameters of
    this callable are pre-filled and an airflow step_operator is created
    within the dag. The dependencies to upstream steps are then
    configured.

    Finally, the dag is fully complete and can be returned.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Returns:
        The Airflow DAG.
    """
    import airflow
    from airflow.operators import python as airflow_python

    # Instantiate and configure airflow Dag with name and schedule
    airflow_dag = airflow.DAG(
        dag_id=deployment.pipeline.name,
        is_paused_upon_creation=False,
        **self._translate_schedule(deployment.schedule),
    )

    # Dictionary mapping step names to airflow_operators. This will be needed
    # to configure airflow operator dependencies
    step_name_to_airflow_operator = {}

    for step in deployment.steps.values():
        # Create callable that will be used by airflow to execute the step
        # within the orchestrated environment
        def _step_callable(step_instance: "Step", **kwargs):
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not yet supported for "
                    "the Airflow orchestrator, ignoring resource "
                    "configuration for step %s.",
                    step.name,
                )
            # Extract run name for the kwargs that will be passed to the
            # callable
            run_name = kwargs["ti"].get_dagrun().run_id
            self._prepare_run(deployment=deployment)
            self.run_step(step=step_instance, run_name=run_name)
            self._cleanup_run()

        # Create airflow python operator that contains the step callable
        airflow_operator = airflow_python.PythonOperator(
            dag=airflow_dag,
            task_id=step.config.name,
            provide_context=True,
            python_callable=functools.partial(
                _step_callable, step_instance=step
            ),
        )

        # Configure the current airflow operator to run after all upstream
        # operators finished executing
        step_name_to_airflow_operator[step.config.name] = airflow_operator
        for upstream_step_name in step.spec.upstream_steps:
            airflow_operator.set_upstream(
                step_name_to_airflow_operator[upstream_step_name]
            )

    # Return the finished airflow dag
    return airflow_dag
prepare_pipeline_deployment(self, deployment, stack)

Checks Airflow is running and copies DAG file to the DAGs directory.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required

Exceptions:

Type Description
RuntimeError

If Airflow is not running or no DAG filepath runtime option is provided.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Checks Airflow is running and copies DAG file to the DAGs directory.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.

    Raises:
        RuntimeError: If Airflow is not running or no DAG filepath runtime
                      option is provided.
    """
    if not self.is_running:
        raise RuntimeError(
            "Airflow orchestrator is currently not running. Run `zenml "
            "stack up` to provision resources for the active stack."
        )

    if Environment.in_notebook():
        raise RuntimeError(
            "Unable to run the Airflow orchestrator from within a "
            "notebook. Airflow requires a python file which contains a "
            "global Airflow DAG object and therefore does not work with "
            "notebooks. Please copy your ZenML pipeline code in a python "
            "file and try again."
        )

    try:
        dag_filepath = deployment.pipeline.extra[DAG_FILEPATH_OPTION_KEY]
    except KeyError:
        raise RuntimeError(
            f"No DAG filepath found in runtime configuration. Make sure "
            f"to add the filepath to your airflow DAG file as a runtime "
            f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
        )

    self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
provision(self)

Ensures that Airflow is running.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
    """Ensures that Airflow is running."""
    if self.is_running:
        logger.info("Airflow is already running.")
        self._log_webserver_credentials()
        return

    if not fileio.exists(self.dags_directory):
        io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

    from airflow.cli.commands.standalone_command import StandaloneCommand

    try:
        command = StandaloneCommand()
        # Skip pipeline registration inside the airflow server process.
        # When searching for DAGs, airflow imports the runner file in a
        # randomly generated module. If we don't skip pipeline registration,
        # it would fail by trying to register a pipeline with an existing
        # name but different module sources for the steps.
        with set_environment_variable(
            key=ENV_ZENML_SKIP_PIPELINE_REGISTRATION, value="True"
        ):
            # Run the daemon with a working directory inside the current
            # zenml repo so the same repo will be used to run the DAGs
            daemon.run_as_daemon(
                command.run,
                pid_file=self.pid_file,
                log_file=self.log_file,
                working_directory=get_source_root_path(),
            )
        while not self.is_running:
            # Wait until the daemon started all the relevant airflow
            # processes
            time.sleep(0.1)
        self._log_webserver_credentials()
    except Exception as e:
        logger.error(e)
        logger.error(
            "An error occurred while starting the Airflow daemon. If you "
            "want to start it manually, use the commands described in the "
            "official Airflow quickstart guide for running Airflow locally."
        )
        self.deprovision()
set_environment_variable(key, value)

Temporarily sets an environment variable.

The value will only be set while this context manager is active and will be reset to the previous value afterward.

Parameters:

Name Type Description Default
key str

The environment variable key.

required
value str

The environment variable value.

required

Yields:

Type Description
Iterator[NoneType]

None.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
@contextmanager
def set_environment_variable(key: str, value: str) -> Iterator[None]:
    """Temporarily sets an environment variable.

    The value will only be set while this context manager is active and will
    be reset to the previous value afterward.

    Args:
        key: The environment variable key.
        value: The environment variable value.

    Yields:
        None.
    """
    old_value = os.environ.get(key, None)
    try:
        os.environ[key] = value
        yield
    finally:
        if old_value:
            os.environ[key] = old_value
        else:
            del os.environ[key]