Airflow
zenml.integrations.airflow
special
Airflow integration for ZenML.
The Airflow integration sub-module powers an alternative to the local
orchestrator. You can enable it by registering the Airflow orchestrator with
the CLI tool, then bootstrap using the zenml orchestrator up
command.
AirflowIntegration (Integration)
Definition of Airflow Integration for ZenML.
Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
"""Definition of Airflow Integration for ZenML."""
NAME = AIRFLOW
REQUIREMENTS = ["apache-airflow==2.2.0"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor
return [AirflowOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Airflow integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor
return [AirflowOrchestratorFlavor]
flavors
special
Airflow integration flavors.
airflow_orchestrator_flavor
Airflow orchestrator flavor.
AirflowOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Airflow orchestrator.
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Airflow orchestrator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AIRFLOW_ORCHESTRATOR_FLAVOR
@property
def implementation_class(self) -> Type["AirflowOrchestrator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.airflow.orchestrators import AirflowOrchestrator
return AirflowOrchestrator
implementation_class: Type[AirflowOrchestrator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AirflowOrchestrator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
orchestrators
special
The Airflow integration enables the use of Airflow as a pipeline orchestrator.
airflow_orchestrator
Implementation of Airflow orchestrator integration.
AirflowOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines using Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""
_orchestrator_run_id: Optional[str] = None
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.airflow_home = os.path.join(
io_utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(self.id),
)
self._set_env()
@staticmethod
def _translate_schedule(
schedule: Optional[Schedule] = None,
) -> Dict[str, Any]:
"""Convert ZenML schedule into Airflow schedule.
The Airflow schedule uses slightly different naming and needs some
default entries for execution without a schedule.
Args:
schedule: Containing the interval, start and end date and
a boolean flag that defines if past runs should be caught up
on
Returns:
Airflow configuration dict.
"""
if schedule:
if schedule.cron_expression:
start_time = schedule.start_time or (
datetime.datetime.now() - datetime.timedelta(1)
)
return {
"schedule_interval": schedule.cron_expression,
"start_date": start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
else:
return {
"schedule_interval": schedule.interval_second,
"start_date": schedule.start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
return {
"schedule_interval": "@once",
# set the a start time in the past and disable catchup so airflow runs the dag immediately
"start_date": datetime.datetime.now() - datetime.timedelta(7),
"catchup": False,
}
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
"""
if not self._orchestrator_run_id:
raise RuntimeError("No run id set.")
return self._orchestrator_run_id
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment
and used for orchestration of the pipeline.
How it works:
-------------
A new airflow_dag is instantiated with the pipeline name and among
others things the run schedule.
For each step of the pipeline a callable is created. This callable
uses the run_step() method to execute the step. The parameters of
this callable are pre-filled and an airflow step_operator is created
within the dag. The dependencies to upstream steps are then
configured.
Finally, the dag is fully complete and can be returned.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Returns:
The Airflow DAG.
"""
import airflow
from airflow.operators import python as airflow_python
# Instantiate and configure airflow Dag with name and schedule
airflow_dag = airflow.DAG(
dag_id=deployment.pipeline.name,
is_paused_upon_creation=False,
**self._translate_schedule(deployment.schedule),
)
# Dictionary mapping step names to airflow_operators. This will be needed
# to configure airflow operator dependencies
step_name_to_airflow_operator = {}
for step in deployment.steps.values():
# Create callable that will be used by airflow to execute the step
# within the orchestrated environment
def _step_callable(step_instance: "Step", **kwargs):
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Airflow orchestrator, ignoring resource "
"configuration for step %s.",
step.name,
)
self._orchestrator_run_id = kwargs["ti"].get_dagrun().run_id
self._prepare_run(deployment=deployment)
self.run_step(step=step_instance)
self._cleanup_run()
self._orchestrator_run_id = None
# Create airflow python operator that contains the step callable
airflow_operator = airflow_python.PythonOperator(
dag=airflow_dag,
task_id=step.config.name,
provide_context=True,
python_callable=functools.partial(
_step_callable, step_instance=step
),
)
# Configure the current airflow operator to run after all upstream
# operators finished executing
step_name_to_airflow_operator[step.config.name] = airflow_operator
for upstream_step_name in step.spec.upstream_steps:
airflow_operator.set_upstream(
step_name_to_airflow_operator[upstream_step_name]
)
# Return the finished airflow dag
return airflow_dag
@property
def dags_directory(self) -> str:
"""Returns path to the airflow dags directory.
Returns:
Path to the airflow dags directory.
"""
return os.path.join(self.airflow_home, "dags")
@property
def pid_file(self) -> str:
"""Returns path to the daemon PID file.
Returns:
Path to the daemon PID file.
"""
return os.path.join(self.airflow_home, "airflow_daemon.pid")
@property
def log_file(self) -> str:
"""Returns path to the airflow log file.
Returns:
str: Path to the airflow log file.
"""
return os.path.join(self.airflow_home, "airflow_orchestrator.log")
@property
def password_file(self) -> str:
"""Returns path to the webserver password file.
Returns:
Path to the webserver password file.
"""
return os.path.join(self.airflow_home, "standalone_admin_password.txt")
def _set_env(self) -> None:
"""Sets environment variables to configure airflow."""
os.environ["AIRFLOW_HOME"] = self.airflow_home
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
os.environ["AIRFLOW__CORE__DAG_DISCOVERY_SAFE_MODE"] = "false"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
# check the DAG folder every 10 seconds for new files
os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"
def _copy_to_dag_directory_if_necessary(self, dag_filepath: str) -> None:
"""Copies DAG module to the Airflow DAGs directory if not already present.
Args:
dag_filepath: Path to the file in which the DAG is defined.
"""
dags_directory = io_utils.resolve_relative_path(self.dags_directory)
if dags_directory == os.path.dirname(dag_filepath):
logger.debug("File is already in airflow DAGs directory.")
else:
logger.debug(
"Copying dag file '%s' to DAGs directory.", dag_filepath
)
destination_path = os.path.join(
dags_directory, os.path.basename(dag_filepath)
)
if fileio.exists(destination_path):
logger.info(
"File '%s' already exists, overwriting with new DAG file",
destination_path,
)
fileio.copy(dag_filepath, destination_path, overwrite=True)
def _log_webserver_credentials(self) -> None:
"""Logs URL and credentials to log in to the airflow webserver.
Raises:
FileNotFoundError: If the password file does not exist.
"""
if fileio.exists(self.password_file):
with open(self.password_file) as file:
password = file.read().strip()
else:
raise FileNotFoundError(
f"Can't find password file '{self.password_file}'"
)
logger.info(
"To inspect your DAGs, login to http://localhost:8080 "
"with username: admin password: %s",
password,
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Checks Airflow is running and copies DAG file to the DAGs directory.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
RuntimeError: If Airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
if Environment.in_notebook():
raise RuntimeError(
"Unable to run the Airflow orchestrator from within a "
"notebook. Airflow requires a python file which contains a "
"global Airflow DAG object and therefore does not work with "
"notebooks. Please copy your ZenML pipeline code in a python "
"file and try again."
)
try:
dag_filepath = deployment.pipeline.extra[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
@property
def is_running(self) -> bool:
"""Returns whether the airflow daemon is currently running.
Returns:
True if the daemon is running, False otherwise.
Raises:
RuntimeError: If port 8080 is occupied.
"""
from airflow.cli.commands.standalone_command import StandaloneCommand
from airflow.jobs.triggerer_job import TriggererJob
daemon_running = daemon.check_if_daemon_is_running(self.pid_file)
command = StandaloneCommand()
webserver_port_open = command.port_open(8080)
if not daemon_running:
if webserver_port_open:
raise RuntimeError(
"The airflow daemon does not seem to be running but "
"local port 8080 is occupied. Make sure the port is "
"available and try again."
)
# exit early so we don't check non-existing airflow databases
return False
# we can't use StandaloneCommand().is_ready() here as the
# Airflow SequentialExecutor apparently does not send a heartbeat
# while running a task which would result in this returning `False`
# even if Airflow is running.
airflow_running = webserver_port_open and command.job_running(
TriggererJob
)
return airflow_running
@property
def is_provisioned(self) -> bool:
"""Returns whether the airflow daemon is currently running.
Returns:
True if the airflow daemon is running, False otherwise.
"""
return self.is_running
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Skip pipeline registration inside the airflow server process.
# When searching for DAGs, airflow imports the runner file in a
# randomly generated module. If we don't skip pipeline registration,
# it would fail by trying to register a pipeline with an existing
# name but different module sources for the steps.
with set_environment_variable(
key=ENV_ZENML_SKIP_PIPELINE_REGISTRATION, value="True"
):
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=get_source_root_path(),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
dags_directory: str
property
readonly
Returns path to the airflow dags directory.
Returns:
Type | Description |
---|---|
str |
Path to the airflow dags directory. |
is_provisioned: bool
property
readonly
Returns whether the airflow daemon is currently running.
Returns:
Type | Description |
---|---|
bool |
True if the airflow daemon is running, False otherwise. |
is_running: bool
property
readonly
Returns whether the airflow daemon is currently running.
Returns:
Type | Description |
---|---|
bool |
True if the daemon is running, False otherwise. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If port 8080 is occupied. |
log_file: str
property
readonly
Returns path to the airflow log file.
Returns:
Type | Description |
---|---|
str |
Path to the airflow log file. |
password_file: str
property
readonly
Returns path to the webserver password file.
Returns:
Type | Description |
---|---|
str |
Path to the webserver password file. |
pid_file: str
property
readonly
Returns path to the daemon PID file.
Returns:
Type | Description |
---|---|
str |
Path to the daemon PID file. |
__init__(self, **values)
special
Sets environment variables to configure airflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**values |
Any |
Values to set in the orchestrator. |
{} |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.airflow_home = os.path.join(
io_utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(self.id),
)
self._set_env()
deprovision(self)
Stops the airflow daemon if necessary and tears down resources.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
"""
if not self._orchestrator_run_id:
raise RuntimeError("No run id set.")
return self._orchestrator_run_id
prepare_or_run_pipeline(self, deployment, stack)
Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment and used for orchestration of the pipeline.
How it works:
A new airflow_dag is instantiated with the pipeline name and among others things the run schedule.
For each step of the pipeline a callable is created. This callable uses the run_step() method to execute the step. The parameters of this callable are pre-filled and an airflow step_operator is created within the dag. The dependencies to upstream steps are then configured.
Finally, the dag is fully complete and can be returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Returns:
Type | Description |
---|---|
Any |
The Airflow DAG. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment
and used for orchestration of the pipeline.
How it works:
-------------
A new airflow_dag is instantiated with the pipeline name and among
others things the run schedule.
For each step of the pipeline a callable is created. This callable
uses the run_step() method to execute the step. The parameters of
this callable are pre-filled and an airflow step_operator is created
within the dag. The dependencies to upstream steps are then
configured.
Finally, the dag is fully complete and can be returned.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Returns:
The Airflow DAG.
"""
import airflow
from airflow.operators import python as airflow_python
# Instantiate and configure airflow Dag with name and schedule
airflow_dag = airflow.DAG(
dag_id=deployment.pipeline.name,
is_paused_upon_creation=False,
**self._translate_schedule(deployment.schedule),
)
# Dictionary mapping step names to airflow_operators. This will be needed
# to configure airflow operator dependencies
step_name_to_airflow_operator = {}
for step in deployment.steps.values():
# Create callable that will be used by airflow to execute the step
# within the orchestrated environment
def _step_callable(step_instance: "Step", **kwargs):
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Airflow orchestrator, ignoring resource "
"configuration for step %s.",
step.name,
)
self._orchestrator_run_id = kwargs["ti"].get_dagrun().run_id
self._prepare_run(deployment=deployment)
self.run_step(step=step_instance)
self._cleanup_run()
self._orchestrator_run_id = None
# Create airflow python operator that contains the step callable
airflow_operator = airflow_python.PythonOperator(
dag=airflow_dag,
task_id=step.config.name,
provide_context=True,
python_callable=functools.partial(
_step_callable, step_instance=step
),
)
# Configure the current airflow operator to run after all upstream
# operators finished executing
step_name_to_airflow_operator[step.config.name] = airflow_operator
for upstream_step_name in step.spec.upstream_steps:
airflow_operator.set_upstream(
step_name_to_airflow_operator[upstream_step_name]
)
# Return the finished airflow dag
return airflow_dag
prepare_pipeline_deployment(self, deployment, stack)
Checks Airflow is running and copies DAG file to the DAGs directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If Airflow is not running or no DAG filepath runtime option is provided. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Checks Airflow is running and copies DAG file to the DAGs directory.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
RuntimeError: If Airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
if Environment.in_notebook():
raise RuntimeError(
"Unable to run the Airflow orchestrator from within a "
"notebook. Airflow requires a python file which contains a "
"global Airflow DAG object and therefore does not work with "
"notebooks. Please copy your ZenML pipeline code in a python "
"file and try again."
)
try:
dag_filepath = deployment.pipeline.extra[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
provision(self)
Ensures that Airflow is running.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Skip pipeline registration inside the airflow server process.
# When searching for DAGs, airflow imports the runner file in a
# randomly generated module. If we don't skip pipeline registration,
# it would fail by trying to register a pipeline with an existing
# name but different module sources for the steps.
with set_environment_variable(
key=ENV_ZENML_SKIP_PIPELINE_REGISTRATION, value="True"
):
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=get_source_root_path(),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
set_environment_variable(key, value)
Temporarily sets an environment variable.
The value will only be set while this context manager is active and will be reset to the previous value afterward.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The environment variable key. |
required |
value |
str |
The environment variable value. |
required |
Yields:
Type | Description |
---|---|
Iterator[NoneType] |
None. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
@contextmanager
def set_environment_variable(key: str, value: str) -> Iterator[None]:
"""Temporarily sets an environment variable.
The value will only be set while this context manager is active and will
be reset to the previous value afterward.
Args:
key: The environment variable key.
value: The environment variable value.
Yields:
None.
"""
old_value = os.environ.get(key, None)
try:
os.environ[key] = value
yield
finally:
if old_value:
os.environ[key] = old_value
else:
del os.environ[key]