Skip to content

Airflow

zenml.integrations.airflow

Airflow integration for ZenML.

The Airflow integration powers an alternative orchestrator.

Attributes

AIRFLOW = 'airflow' module-attribute

AIRFLOW_ORCHESTRATOR_FLAVOR = 'airflow' module-attribute

Classes

AirflowIntegration

Bases: Integration

Definition of Airflow Integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Airflow integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/airflow/__init__.py
33
34
35
36
37
38
39
40
41
42
43
44
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Airflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.airflow.flavors import (
        AirflowOrchestratorFlavor,
    )

    return [AirflowOrchestratorFlavor]

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

flavors

Airflow integration flavors.

Classes
AirflowOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the Airflow orchestrator.

Attributes
config_class: Type[AirflowOrchestratorConfig] property

Returns AirflowOrchestratorConfig config class.

Returns:

Type Description
Type[AirflowOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AirflowOrchestrator] property

Implementation class.

Returns:

Type Description
Type[AirflowOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
airflow_orchestrator_flavor

Airflow orchestrator flavor.

Classes
AirflowOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, AirflowOrchestratorSettings

Configuration for the Airflow orchestrator.

Attributes:

Name Type Description
local bool

If the orchestrator is local or not. If this is True, will spin up a local Airflow server to run pipelines.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

AirflowOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the Airflow orchestrator.

Attributes
config_class: Type[AirflowOrchestratorConfig] property

Returns AirflowOrchestratorConfig config class.

Returns:

Type Description
Type[AirflowOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AirflowOrchestrator] property

Implementation class.

Returns:

Type Description
Type[AirflowOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

AirflowOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Airflow orchestrator.

Attributes:

Name Type Description
dag_output_dir Optional[str]

Output directory in which to write the Airflow DAG.

dag_id Optional[str]

Optional ID of the Airflow DAG to create. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_tags List[str]

Tags to add to the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_args Dict[str, Any]

Arguments for initializing the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

operator str

The operator to use for one or all steps. This can either be a zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType or a string representing the source of the operator class to use (e.g. airflow.providers.docker.operators.docker.DockerOperator)

operator_args Dict[str, Any]

Arguments for initializing the Airflow operator.

custom_dag_generator Optional[str]

Source string of a module to use for generating Airflow DAGs. This module must contain the same classes and constants as the zenml.integrations.airflow.orchestrators.dag_generator module. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
OperatorType

Bases: Enum

Airflow operator types.

Attributes
source: str property

Operator source.

Returns:

Type Description
str

The operator source.

orchestrators

The Airflow integration enables the use of Airflow as a pipeline orchestrator.

Classes
AirflowOrchestrator(**values: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines using Airflow.

Initialize the orchestrator.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(self, **values: Any):
    """Initialize the orchestrator.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self.dags_directory = os.path.join(
        io_utils.get_global_config_directory(),
        "airflow",
        str(self.id),
        "dags",
    )
Attributes
config: AirflowOrchestratorConfig property

Returns the orchestrator config.

Returns:

Type Description
AirflowOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    from zenml.integrations.airflow.orchestrators.dag_generator import (
        ENV_ZENML_AIRFLOW_RUN_ID,
    )

    try:
        return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AIRFLOW_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Creates and writes an Airflow DAG zip file.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Creates and writes an Airflow DAG zip file.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.
    """
    pipeline_settings = cast(
        AirflowOrchestratorSettings, self.get_settings(deployment)
    )

    dag_generator_values = get_dag_generator_values(
        custom_dag_generator_source=pipeline_settings.custom_dag_generator
    )

    command = StepEntrypointConfiguration.get_entrypoint_command()

    tasks = []
    for step_name, step in deployment.step_configurations.items():
        settings = cast(
            AirflowOrchestratorSettings, self.get_settings(step)
        )
        image = self.get_image(deployment=deployment, step_name=step_name)
        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )
        operator_args = settings.operator_args.copy()
        if self.requires_resources_in_orchestration_environment(step=step):
            if settings.operator == OperatorType.KUBERNETES_POD.source:
                self._apply_resource_settings(
                    resource_settings=step.config.resource_settings,
                    operator_args=operator_args,
                )
            else:
                logger.warning(
                    "Specifying step resources is only supported when "
                    "using KubernetesPodOperators, ignoring resource "
                    "configuration for step %s.",
                    step_name,
                )

        task = dag_generator_values.task_configuration_class(
            id=step_name,
            zenml_step_name=step_name,
            upstream_steps=step.spec.upstream_steps,
            docker_image=image,
            command=command,
            arguments=arguments,
            environment=environment,
            operator_source=settings.operator,
            operator_args=operator_args,
        )
        tasks.append(task)

    local_stores_path = (
        os.path.expanduser(GlobalConfiguration().local_stores_path)
        if self.config.local
        else None
    )

    dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    dag_config = dag_generator_values.dag_configuration_class(
        id=dag_id,
        local_stores_path=local_stores_path,
        tasks=tasks,
        tags=pipeline_settings.dag_tags,
        dag_args=pipeline_settings.dag_args,
        **self._translate_schedule(deployment.schedule),
    )

    self._write_dag(
        dag_config,
        dag_generator_values=dag_generator_values,
        output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
    )
prepare_pipeline_deployment(deployment: PipelineDeploymentResponse, stack: Stack) -> None

Builds a Docker image to run pipeline steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
180
181
182
183
184
185
186
187
188
189
190
191
192
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
) -> None:
    """Builds a Docker image to run pipeline steps.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    if self.config.local:
        stack.check_local_paths()
Modules
airflow_orchestrator

Implementation of Airflow orchestrator integration.

Classes
AirflowOrchestrator(**values: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines using Airflow.

Initialize the orchestrator.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(self, **values: Any):
    """Initialize the orchestrator.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self.dags_directory = os.path.join(
        io_utils.get_global_config_directory(),
        "airflow",
        str(self.id),
        "dags",
    )
Attributes
config: AirflowOrchestratorConfig property

Returns the orchestrator config.

Returns:

Type Description
AirflowOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    from zenml.integrations.airflow.orchestrators.dag_generator import (
        ENV_ZENML_AIRFLOW_RUN_ID,
    )

    try:
        return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AIRFLOW_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Creates and writes an Airflow DAG zip file.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Creates and writes an Airflow DAG zip file.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.
    """
    pipeline_settings = cast(
        AirflowOrchestratorSettings, self.get_settings(deployment)
    )

    dag_generator_values = get_dag_generator_values(
        custom_dag_generator_source=pipeline_settings.custom_dag_generator
    )

    command = StepEntrypointConfiguration.get_entrypoint_command()

    tasks = []
    for step_name, step in deployment.step_configurations.items():
        settings = cast(
            AirflowOrchestratorSettings, self.get_settings(step)
        )
        image = self.get_image(deployment=deployment, step_name=step_name)
        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )
        operator_args = settings.operator_args.copy()
        if self.requires_resources_in_orchestration_environment(step=step):
            if settings.operator == OperatorType.KUBERNETES_POD.source:
                self._apply_resource_settings(
                    resource_settings=step.config.resource_settings,
                    operator_args=operator_args,
                )
            else:
                logger.warning(
                    "Specifying step resources is only supported when "
                    "using KubernetesPodOperators, ignoring resource "
                    "configuration for step %s.",
                    step_name,
                )

        task = dag_generator_values.task_configuration_class(
            id=step_name,
            zenml_step_name=step_name,
            upstream_steps=step.spec.upstream_steps,
            docker_image=image,
            command=command,
            arguments=arguments,
            environment=environment,
            operator_source=settings.operator,
            operator_args=operator_args,
        )
        tasks.append(task)

    local_stores_path = (
        os.path.expanduser(GlobalConfiguration().local_stores_path)
        if self.config.local
        else None
    )

    dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    dag_config = dag_generator_values.dag_configuration_class(
        id=dag_id,
        local_stores_path=local_stores_path,
        tasks=tasks,
        tags=pipeline_settings.dag_tags,
        dag_args=pipeline_settings.dag_args,
        **self._translate_schedule(deployment.schedule),
    )

    self._write_dag(
        dag_config,
        dag_generator_values=dag_generator_values,
        output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
    )
prepare_pipeline_deployment(deployment: PipelineDeploymentResponse, stack: Stack) -> None

Builds a Docker image to run pipeline steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
180
181
182
183
184
185
186
187
188
189
190
191
192
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
) -> None:
    """Builds a Docker image to run pipeline steps.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    if self.config.local:
        stack.check_local_paths()
DagGeneratorValues

Bases: NamedTuple

Values from the DAG generator module.

Functions
get_dag_generator_values(custom_dag_generator_source: Optional[str] = None) -> DagGeneratorValues

Gets values from the DAG generator module.

Parameters:

Name Type Description Default
custom_dag_generator_source Optional[str]

Source of a custom DAG generator module.

None

Returns:

Type Description
DagGeneratorValues

DAG generator module values.

Source code in src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_dag_generator_values(
    custom_dag_generator_source: Optional[str] = None,
) -> DagGeneratorValues:
    """Gets values from the DAG generator module.

    Args:
        custom_dag_generator_source: Source of a custom DAG generator module.

    Returns:
        DAG generator module values.
    """
    if custom_dag_generator_source:
        module = importlib.import_module(custom_dag_generator_source)
    else:
        from zenml.integrations.airflow.orchestrators import dag_generator

        module = dag_generator

    assert module.__file__
    return DagGeneratorValues(
        file=module.__file__,
        config_file_name=module.CONFIG_FILENAME,
        run_id_env_variable_name=module.ENV_ZENML_AIRFLOW_RUN_ID,
        dag_configuration_class=module.DagConfiguration,
        task_configuration_class=module.TaskConfiguration,
    )
Modules
dag_generator

Module to generate an Airflow DAG from a config file.

Classes
DagConfiguration

Bases: BaseModel

Airflow DAG configuration.

TaskConfiguration

Bases: BaseModel

Airflow task configuration.

Functions
get_docker_operator_init_kwargs(dag_config: DagConfiguration, task_config: TaskConfiguration) -> Dict[str, Any]

Gets keyword arguments to pass to the DockerOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in src/zenml/integrations/airflow/orchestrators/dag_generator.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_docker_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the DockerOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    mounts = []
    extra_hosts = {}
    environment = task_config.environment
    environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"

    if dag_config.local_stores_path:
        from docker.types import Mount

        environment[ENV_ZENML_LOCAL_STORES_PATH] = dag_config.local_stores_path
        mounts = [
            Mount(
                target=dag_config.local_stores_path,
                source=dag_config.local_stores_path,
                type="bind",
            )
        ]
        extra_hosts = {"host.docker.internal": "host-gateway"}
    return {
        "image": task_config.docker_image,
        "command": task_config.command + task_config.arguments,
        "mounts": mounts,
        "environment": environment,
        "extra_hosts": extra_hosts,
    }
get_kubernetes_pod_operator_init_kwargs(dag_config: DagConfiguration, task_config: TaskConfiguration) -> Dict[str, Any]

Gets keyword arguments to pass to the KubernetesPodOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in src/zenml/integrations/airflow/orchestrators/dag_generator.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def get_kubernetes_pod_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the KubernetesPodOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    from kubernetes.client.models import V1EnvVar

    environment = task_config.environment
    environment[ENV_ZENML_AIRFLOW_RUN_ID] = "{{run_id}}"

    return {
        "name": f"{dag_config.id}_{task_config.id}",
        "namespace": "default",
        "image": task_config.docker_image,
        "cmds": task_config.command,
        "arguments": task_config.arguments,
        "env_vars": [
            V1EnvVar(name=key, value=value)
            for key, value in environment.items()
        ],
    }
get_operator_init_kwargs(operator_class: Type[Any], dag_config: DagConfiguration, task_config: TaskConfiguration) -> Dict[str, Any]

Gets keyword arguments to pass to the operator init method.

Parameters:

Name Type Description Default
operator_class Type[Any]

The operator class for which to get the kwargs.

required
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in src/zenml/integrations/airflow/orchestrators/dag_generator.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def get_operator_init_kwargs(
    operator_class: Type[Any],
    dag_config: DagConfiguration,
    task_config: TaskConfiguration,
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the operator init method.

    Args:
        operator_class: The operator class for which to get the kwargs.
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    init_kwargs = {"task_id": task_config.id}

    try:
        from airflow.providers.docker.operators.docker import DockerOperator

        if issubclass(operator_class, DockerOperator):
            init_kwargs.update(
                get_docker_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    try:
        from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
            KubernetesPodOperator,
        )

        if issubclass(operator_class, KubernetesPodOperator):
            init_kwargs.update(
                get_kubernetes_pod_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    try:
        # Support for apache-airflow-providers-cncf-kubernetes>=10.0.0 where
        # the import changed
        from airflow.providers.cncf.kubernetes.operators.pod import (
            KubernetesPodOperator,
        )

        if issubclass(operator_class, KubernetesPodOperator):
            init_kwargs.update(
                get_kubernetes_pod_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    init_kwargs.update(task_config.operator_args)
    return init_kwargs
import_class_by_path(class_path: str) -> Type[Any]

Imports a class based on a given path.

Parameters:

Name Type Description Default
class_path str

str, class_source e.g. this.module.Class

required

Returns:

Type Description
Type[Any]

the given class

Source code in src/zenml/integrations/airflow/orchestrators/dag_generator.py
65
66
67
68
69
70
71
72
73
74
75
76
def import_class_by_path(class_path: str) -> Type[Any]:
    """Imports a class based on a given path.

    Args:
        class_path: str, class_source e.g. this.module.Class

    Returns:
        the given class
    """
    module_name, class_name = class_path.rsplit(".", 1)
    module = importlib.import_module(module_name)
    return getattr(module, class_name)  # type: ignore[no-any-return]