Airflow
zenml.integrations.airflow
special
Airflow integration for ZenML.
The Airflow integration powers an alternative orchestrator.
AirflowIntegration (Integration)
Definition of Airflow Integration for ZenML.
Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
"""Definition of Airflow Integration for ZenML."""
NAME = AIRFLOW
REQUIREMENTS = []
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import (
AirflowOrchestratorFlavor,
)
return [AirflowOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Airflow integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import (
AirflowOrchestratorFlavor,
)
return [AirflowOrchestratorFlavor]
flavors
special
Airflow integration flavors.
airflow_orchestrator_flavor
Airflow orchestrator flavor.
AirflowOrchestratorConfig (BaseOrchestratorConfig, AirflowOrchestratorSettings)
Configuration for the Airflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
local |
bool |
If the orchestrator is local or not. If this is True, will spin up a local Airflow server to run pipelines. |
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorConfig(
BaseOrchestratorConfig, AirflowOrchestratorSettings
):
"""Configuration for the Airflow orchestrator.
Attributes:
local: If the orchestrator is local or not. If this is True, will spin
up a local Airflow server to run pipelines.
"""
local: bool = True
@property
def is_schedulable(self) -> bool:
"""Whether the orchestrator is schedulable or not.
Returns:
Whether the orchestrator is schedulable or not.
"""
return True
is_schedulable: bool
property
readonly
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator is schedulable or not. |
AirflowOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Airflow orchestrator.
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Airflow orchestrator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AIRFLOW_ORCHESTRATOR_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/airflow.png"
@property
def config_class(self) -> Type[AirflowOrchestratorConfig]:
"""Returns `AirflowOrchestratorConfig` config class.
Returns:
The config class.
"""
return AirflowOrchestratorConfig
@property
def implementation_class(self) -> Type["AirflowOrchestrator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.airflow.orchestrators import (
AirflowOrchestrator,
)
return AirflowOrchestrator
config_class: Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig]
property
readonly
Returns AirflowOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AirflowOrchestrator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AirflowOrchestrator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
AirflowOrchestratorSettings (BaseSettings)
Settings for the Airflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
dag_output_dir |
Optional[str] |
Output directory in which to write the Airflow DAG. |
dag_id |
Optional[str] |
Optional ID of the Airflow DAG to create. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
dag_tags |
List[str] |
Tags to add to the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
dag_args |
Dict[str, Any] |
Arguments for initializing the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
operator |
str |
The operator to use for one or all steps. This can either be
a |
operator_args |
Dict[str, Any] |
Arguments for initializing the Airflow operator. |
custom_dag_generator |
Optional[str] |
Source string of a module to use for generating
Airflow DAGs. This module must contain the same classes and
constants as the
|
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorSettings(BaseSettings):
"""Settings for the Airflow orchestrator.
Attributes:
dag_output_dir: Output directory in which to write the Airflow DAG.
dag_id: Optional ID of the Airflow DAG to create. This value is only
applied if the settings are defined on a ZenML pipeline and
ignored if defined on a step.
dag_tags: Tags to add to the Airflow DAG. This value is only
applied if the settings are defined on a ZenML pipeline and
ignored if defined on a step.
dag_args: Arguments for initializing the Airflow DAG. This
value is only applied if the settings are defined on a ZenML
pipeline and ignored if defined on a step.
operator: The operator to use for one or all steps. This can either be
a `zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType`
or a string representing the source of the operator class to use
(e.g. `airflow.providers.docker.operators.docker.DockerOperator`)
operator_args: Arguments for initializing the Airflow
operator.
custom_dag_generator: Source string of a module to use for generating
Airflow DAGs. This module must contain the same classes and
constants as the
`zenml.integrations.airflow.orchestrators.dag_generator` module.
This value is only applied if the settings are defined on a ZenML
pipeline and ignored if defined on a step.
"""
dag_output_dir: Optional[str] = None
dag_id: Optional[str] = None
dag_tags: List[str] = []
dag_args: Dict[str, Any] = {}
operator: str = OperatorType.DOCKER.source
operator_args: Dict[str, Any] = {}
custom_dag_generator: Optional[str] = None
@field_validator("operator", mode="before")
@classmethod
def _convert_operator(cls, value: Any) -> Any:
"""Converts operator types to source strings.
Args:
value: The operator type value.
Returns:
The operator source.
"""
if isinstance(value, OperatorType):
return value.source
try:
return OperatorType(value).source
except ValueError:
return value
OperatorType (Enum)
Airflow operator types.
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class OperatorType(Enum):
"""Airflow operator types."""
DOCKER = "docker"
KUBERNETES_POD = "kubernetes_pod"
GKE_START_POD = "gke_start_pod"
@property
def source(self) -> str:
"""Operator source.
Returns:
The operator source.
"""
return {
OperatorType.DOCKER: "airflow.providers.docker.operators.docker.DockerOperator",
OperatorType.KUBERNETES_POD: "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator",
OperatorType.GKE_START_POD: "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator",
}[self]
orchestrators
special
The Airflow integration enables the use of Airflow as a pipeline orchestrator.
airflow_orchestrator
Implementation of Airflow orchestrator integration.
AirflowOrchestrator (ContainerizedOrchestrator)
Orchestrator responsible for running pipelines using Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(ContainerizedOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""
def __init__(self, **values: Any):
"""Initialize the orchestrator.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.dags_directory = os.path.join(
io_utils.get_global_config_directory(),
"airflow",
str(self.id),
"dags",
)
@property
def config(self) -> AirflowOrchestratorConfig:
"""Returns the orchestrator config.
Returns:
The configuration.
"""
return cast(AirflowOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubeflow orchestrator.
Returns:
The settings class.
"""
return AirflowOrchestratorSettings
@property
def validator(self) -> Optional["StackValidator"]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry
and only remote components.
Returns:
A `StackValidator` instance.
"""
if self.config.local:
# No container registry required if just running locally.
return None
else:
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The Airflow orchestrator is configured to run "
f"pipelines remotely, but the '{component.name}' "
f"{component.type.value} is a local stack component "
f"and will not be available in the Airflow "
f"task.\nPlease ensure that you always use non-local "
f"stack components with a remote Airflow orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
) -> None:
"""Builds a Docker image to run pipeline steps.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
if self.config.local:
stack.check_local_paths()
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Creates and writes an Airflow DAG zip file.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
"""
pipeline_settings = cast(
AirflowOrchestratorSettings, self.get_settings(deployment)
)
dag_generator_values = get_dag_generator_values(
custom_dag_generator_source=pipeline_settings.custom_dag_generator
)
command = StepEntrypointConfiguration.get_entrypoint_command()
tasks = []
for step_name, step in deployment.step_configurations.items():
settings = cast(
AirflowOrchestratorSettings, self.get_settings(step)
)
image = self.get_image(deployment=deployment, step_name=step_name)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name, deployment_id=deployment.id
)
operator_args = settings.operator_args.copy()
if self.requires_resources_in_orchestration_environment(step=step):
if settings.operator == OperatorType.KUBERNETES_POD.source:
self._apply_resource_settings(
resource_settings=step.config.resource_settings,
operator_args=operator_args,
)
else:
logger.warning(
"Specifying step resources is only supported when "
"using KubernetesPodOperators, ignoring resource "
"configuration for step %s.",
step_name,
)
task = dag_generator_values.task_configuration_class(
id=step_name,
zenml_step_name=step_name,
upstream_steps=step.spec.upstream_steps,
docker_image=image,
command=command,
arguments=arguments,
environment=environment,
operator_source=settings.operator,
operator_args=operator_args,
)
tasks.append(task)
local_stores_path = (
os.path.expanduser(GlobalConfiguration().local_stores_path)
if self.config.local
else None
)
dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
pipeline_name=deployment.pipeline_configuration.name
)
dag_config = dag_generator_values.dag_configuration_class(
id=dag_id,
local_stores_path=local_stores_path,
tasks=tasks,
tags=pipeline_settings.dag_tags,
dag_args=pipeline_settings.dag_args,
**self._translate_schedule(deployment.schedule),
)
self._write_dag(
dag_config,
dag_generator_values=dag_generator_values,
output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
)
def _apply_resource_settings(
self,
resource_settings: "ResourceSettings",
operator_args: Dict[str, Any],
) -> None:
"""Adds resource settings to the operator args.
Args:
resource_settings: The resource settings to add.
operator_args: The operator args which will get modified in-place.
"""
if "container_resources" in operator_args:
logger.warning(
"Received duplicate resources from ResourceSettings: `%s`"
"and operator_args: `%s`. Ignoring the resources defined by "
"the ResourceSettings.",
resource_settings,
operator_args["container_resources"],
)
else:
limits = {}
if resource_settings.cpu_count is not None:
limits["cpu"] = str(resource_settings.cpu_count)
if resource_settings.memory is not None:
memory_limit = resource_settings.memory[:-1]
limits["memory"] = memory_limit
if resource_settings.gpu_count is not None:
logger.warning(
"Specifying GPU resources is not supported for the Airflow "
"orchestrator."
)
operator_args["container_resources"] = {"limits": limits}
def _write_dag(
self,
dag_config: "DagConfiguration",
dag_generator_values: DagGeneratorValues,
output_dir: str,
) -> None:
"""Writes an Airflow DAG to disk.
Args:
dag_config: Configuration of the DAG to write.
dag_generator_values: Values of the DAG generator to use.
output_dir: The directory in which to write the DAG.
"""
io_utils.create_dir_recursive_if_not_exists(output_dir)
if self.config.local and output_dir == self.dags_directory:
logger.warning(
"You're using a local Airflow orchestrator but have not "
"specified a custom DAG output directory. Unless you've "
"configured your Airflow server to look for DAGs in this "
"directory (%s), this DAG will not be found automatically "
"by your local Airflow server.",
output_dir,
)
def _write_zip(path: str) -> None:
with zipfile.ZipFile(path, mode="w") as z:
z.write(dag_generator_values.file, arcname="dag.py")
z.writestr(
dag_generator_values.config_file_name,
dag_config.model_dump_json(),
)
logger.info("Writing DAG definition to `%s`.", path)
dag_filename = f"{dag_config.id}.zip"
if io_utils.is_remote(output_dir):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
local_zip_path = os.path.join(self.dags_directory, dag_filename)
remote_zip_path = os.path.join(output_dir, dag_filename)
_write_zip(local_zip_path)
try:
fileio.copy(local_zip_path, remote_zip_path)
logger.info("Copied DAG definition to `%s`.", remote_zip_path)
except Exception as e:
logger.exception(e)
logger.error(
"Failed to upload DAG to remote path `%s`. To run the "
"pipeline in Airflow, please manually copy the file `%s` "
"to your Airflow DAG directory.",
remote_zip_path,
local_zip_path,
)
else:
zip_path = os.path.join(output_dir, dag_filename)
_write_zip(zip_path)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
from zenml.integrations.airflow.orchestrators.dag_generator import (
ENV_ZENML_AIRFLOW_RUN_ID,
)
try:
return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AIRFLOW_RUN_ID}."
)
@staticmethod
def _translate_schedule(
schedule: Optional["ScheduleResponse"] = None,
) -> Dict[str, Any]:
"""Convert ZenML schedule into Airflow schedule.
The Airflow schedule uses slightly different naming and needs some
default entries for execution without a schedule.
Args:
schedule: Containing the interval, start and end date and
a boolean flag that defines if past runs should be caught up
on
Returns:
Airflow configuration dict.
"""
if schedule:
if schedule.cron_expression:
start_time = schedule.start_time or (
datetime.datetime.utcnow() - datetime.timedelta(7)
)
return {
"schedule": schedule.cron_expression,
"start_date": start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
else:
return {
"schedule": schedule.interval_second,
"start_date": schedule.start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
return {
"schedule": "@once",
# set a start time in the past and disable catchup so airflow
# runs the dag immediately
"start_date": datetime.datetime.utcnow() - datetime.timedelta(7),
"catchup": False,
}
config: AirflowOrchestratorConfig
property
readonly
Returns the orchestrator config.
Returns:
Type | Description |
---|---|
AirflowOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubeflow orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry and only remote components.
Returns:
Type | Description |
---|---|
Optional[StackValidator] |
A |
__init__(self, **values)
special
Initialize the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**values |
Any |
Values to set in the orchestrator. |
{} |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
"""Initialize the orchestrator.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.dags_directory = os.path.join(
io_utils.get_global_config_directory(),
"airflow",
str(self.id),
"dags",
)
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
from zenml.integrations.airflow.orchestrators.dag_generator import (
ENV_ZENML_AIRFLOW_RUN_ID,
)
try:
return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AIRFLOW_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack, environment)
Creates and writes an Airflow DAG zip file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Creates and writes an Airflow DAG zip file.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
"""
pipeline_settings = cast(
AirflowOrchestratorSettings, self.get_settings(deployment)
)
dag_generator_values = get_dag_generator_values(
custom_dag_generator_source=pipeline_settings.custom_dag_generator
)
command = StepEntrypointConfiguration.get_entrypoint_command()
tasks = []
for step_name, step in deployment.step_configurations.items():
settings = cast(
AirflowOrchestratorSettings, self.get_settings(step)
)
image = self.get_image(deployment=deployment, step_name=step_name)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name, deployment_id=deployment.id
)
operator_args = settings.operator_args.copy()
if self.requires_resources_in_orchestration_environment(step=step):
if settings.operator == OperatorType.KUBERNETES_POD.source:
self._apply_resource_settings(
resource_settings=step.config.resource_settings,
operator_args=operator_args,
)
else:
logger.warning(
"Specifying step resources is only supported when "
"using KubernetesPodOperators, ignoring resource "
"configuration for step %s.",
step_name,
)
task = dag_generator_values.task_configuration_class(
id=step_name,
zenml_step_name=step_name,
upstream_steps=step.spec.upstream_steps,
docker_image=image,
command=command,
arguments=arguments,
environment=environment,
operator_source=settings.operator,
operator_args=operator_args,
)
tasks.append(task)
local_stores_path = (
os.path.expanduser(GlobalConfiguration().local_stores_path)
if self.config.local
else None
)
dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
pipeline_name=deployment.pipeline_configuration.name
)
dag_config = dag_generator_values.dag_configuration_class(
id=dag_id,
local_stores_path=local_stores_path,
tasks=tasks,
tags=pipeline_settings.dag_tags,
dag_args=pipeline_settings.dag_args,
**self._translate_schedule(deployment.schedule),
)
self._write_dag(
dag_config,
dag_generator_values=dag_generator_values,
output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
)
prepare_pipeline_deployment(self, deployment, stack)
Builds a Docker image to run pipeline steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
) -> None:
"""Builds a Docker image to run pipeline steps.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
if self.config.local:
stack.check_local_paths()
DagGeneratorValues (tuple)
Values from the DAG generator module.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class DagGeneratorValues(NamedTuple):
"""Values from the DAG generator module."""
file: str
config_file_name: str
run_id_env_variable_name: str
dag_configuration_class: Type["DagConfiguration"]
task_configuration_class: Type["TaskConfiguration"]
__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)
special
staticmethod
Create new instance of DagGeneratorValues(file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)
__repr__(self)
special
Return a nicely formatted representation string
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
get_dag_generator_values(custom_dag_generator_source=None)
Gets values from the DAG generator module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
custom_dag_generator_source |
Optional[str] |
Source of a custom DAG generator module. |
None |
Returns:
Type | Description |
---|---|
DagGeneratorValues |
DAG generator module values. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_dag_generator_values(
custom_dag_generator_source: Optional[str] = None,
) -> DagGeneratorValues:
"""Gets values from the DAG generator module.
Args:
custom_dag_generator_source: Source of a custom DAG generator module.
Returns:
DAG generator module values.
"""
if custom_dag_generator_source:
module = importlib.import_module(custom_dag_generator_source)
else:
from zenml.integrations.airflow.orchestrators import dag_generator
module = dag_generator
assert module.__file__
return DagGeneratorValues(
file=module.__file__,
config_file_name=module.CONFIG_FILENAME,
run_id_env_variable_name=module.ENV_ZENML_AIRFLOW_RUN_ID,
dag_configuration_class=module.DagConfiguration,
task_configuration_class=module.TaskConfiguration,
)
dag_generator
Module to generate an Airflow DAG from a config file.
DagConfiguration (BaseModel)
Airflow DAG configuration.
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class DagConfiguration(BaseModel):
"""Airflow DAG configuration."""
id: str
tasks: List[TaskConfiguration]
local_stores_path: Optional[str] = None
schedule: Union[datetime.timedelta, str] = Field(
union_mode="left_to_right"
)
start_date: datetime.datetime
end_date: Optional[datetime.datetime] = None
catchup: bool = False
tags: List[str] = []
dag_args: Dict[str, Any] = {}
TaskConfiguration (BaseModel)
Airflow task configuration.
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class TaskConfiguration(BaseModel):
"""Airflow task configuration."""
id: str
zenml_step_name: str
upstream_steps: List[str]
docker_image: str
command: List[str]
arguments: List[str]
environment: Dict[str, str] = {}
operator_source: str
operator_args: Dict[str, Any] = {}
get_docker_operator_init_kwargs(dag_config, task_config)
Gets keyword arguments to pass to the DockerOperator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_docker_operator_init_kwargs(
dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the DockerOperator.
Args:
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
mounts = []
extra_hosts = {}
environment = task_config.environment
environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"
if dag_config.local_stores_path:
from docker.types import Mount
environment[ENV_ZENML_LOCAL_STORES_PATH] = dag_config.local_stores_path
mounts = [
Mount(
target=dag_config.local_stores_path,
source=dag_config.local_stores_path,
type="bind",
)
]
extra_hosts = {"host.docker.internal": "host-gateway"}
return {
"image": task_config.docker_image,
"command": task_config.command + task_config.arguments,
"mounts": mounts,
"environment": environment,
"extra_hosts": extra_hosts,
}
get_kubernetes_pod_operator_init_kwargs(dag_config, task_config)
Gets keyword arguments to pass to the KubernetesPodOperator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_kubernetes_pod_operator_init_kwargs(
dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the KubernetesPodOperator.
Args:
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
from kubernetes.client.models import V1EnvVar
environment = task_config.environment
environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"
return {
"name": f"{dag_config.id}_{task_config.id}",
"namespace": "default",
"image": task_config.docker_image,
"cmds": task_config.command,
"arguments": task_config.arguments,
"env_vars": [
V1EnvVar(name=key, value=value)
for key, value in environment.items()
],
}
get_operator_init_kwargs(operator_class, dag_config, task_config)
Gets keyword arguments to pass to the operator init method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operator_class |
Type[Any] |
The operator class for which to get the kwargs. |
required |
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_operator_init_kwargs(
operator_class: Type[Any],
dag_config: DagConfiguration,
task_config: TaskConfiguration,
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the operator init method.
Args:
operator_class: The operator class for which to get the kwargs.
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
init_kwargs = {"task_id": task_config.id}
try:
from airflow.providers.docker.operators.docker import DockerOperator
if issubclass(operator_class, DockerOperator):
init_kwargs.update(
get_docker_operator_init_kwargs(
dag_config=dag_config, task_config=task_config
)
)
except ImportError:
pass
try:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
if issubclass(operator_class, KubernetesPodOperator):
init_kwargs.update(
get_kubernetes_pod_operator_init_kwargs(
dag_config=dag_config, task_config=task_config
)
)
except ImportError:
pass
init_kwargs.update(task_config.operator_args)
return init_kwargs
import_class_by_path(class_path)
Imports a class based on a given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_path |
str |
str, class_source e.g. this.module.Class |
required |
Returns:
Type | Description |
---|---|
Type[Any] |
the given class |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def import_class_by_path(class_path: str) -> Type[Any]:
"""Imports a class based on a given path.
Args:
class_path: str, class_source e.g. this.module.Class
Returns:
the given class
"""
module_name, class_name = class_path.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name) # type: ignore[no-any-return]