Skip to content

Client Lazy Loader

Lazy loading functionality for Client methods.

ClientLazyLoader

Bases: BaseModel

Lazy loader for Client methods.

Source code in src/zenml/client_lazy_loader.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class ClientLazyLoader(BaseModel):
    """Lazy loader for Client methods."""

    method_name: str
    call_chain: List[_CallStep] = []
    exclude_next_call: bool = False

    def __getattr__(self, name: str) -> "ClientLazyLoader":
        """Get attribute not defined in ClientLazyLoader.

        Args:
            name: Name of the attribute to get.

        Returns:
            self
        """
        self_ = ClientLazyLoader(
            method_name=self.method_name, call_chain=self.call_chain.copy()
        )
        # workaround to protect from infinitely looping over in deepcopy called in invocations
        if name != "__deepcopy__":
            self_.call_chain.append(_CallStep(attribute_name=name))
        else:
            self_.exclude_next_call = True
        return self_

    def __call__(self, *args: Any, **kwargs: Any) -> "ClientLazyLoader":
        """Call mocked attribute.

        Args:
            args: Positional arguments.
            kwargs: Keyword arguments.

        Returns:
            self
        """
        # workaround to protect from infinitely looping over in deepcopy called in invocations
        if not self.exclude_next_call:
            self.call_chain.append(
                _CallStep(is_call=True, call_args=args, call_kwargs=kwargs)
            )
        self.exclude_next_call = False
        return self

    def __getitem__(self, item: Any) -> "ClientLazyLoader":
        """Get item from mocked attribute.

        Args:
            item: Item to get.

        Returns:
            self
        """
        self.call_chain.append(_CallStep(selector=item))
        return self

    def evaluate(self) -> Any:
        """Evaluate lazy loaded Client method.

        Returns:
            Evaluated lazy loader chain of calls.
        """
        from zenml.client import Client

        def _iterate_over_lazy_chain(
            self: "ClientLazyLoader", self_: Any, call_chain_: List[_CallStep]
        ) -> Any:
            next_step = call_chain_.pop(0)
            try:
                if next_step.is_call:
                    self_ = self_(
                        *next_step.call_args, **next_step.call_kwargs
                    )
                elif next_step.selector:
                    self_ = self_[next_step.selector]
                elif next_step.attribute_name:
                    self_ = getattr(self_, next_step.attribute_name)
                else:
                    raise ValueError(
                        "Invalid call chain. Reach out to the ZenML team."
                    )
            except Exception as e:
                logger.debug(
                    f"Failed to evaluate lazy load chain `{self.method_name}` "
                    f"+ `{next_step}` + `{self.call_chain}`."
                )
                msg = f"`{self.method_name}("
                if next_step:
                    for arg in next_step.call_args:
                        msg += f"'{arg}',"
                    for k, v in next_step.call_kwargs.items():
                        msg += f"{k}='{v}',"
                    msg = msg[:-1]
                msg += f")` failed during lazy load with error: {e}"
                logger.error(msg)
                raise RuntimeError(msg)
            return self_

        self_ = getattr(Client(), self.method_name)
        call_chain_ = self.call_chain.copy()
        while call_chain_:
            self_ = _iterate_over_lazy_chain(self, self_, call_chain_)
        return self_

__call__(*args, **kwargs)

Call mocked attribute.

Parameters:

Name Type Description Default
args Any

Positional arguments.

()
kwargs Any

Keyword arguments.

{}

Returns:

Type Description
ClientLazyLoader

self

Source code in src/zenml/client_lazy_loader.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __call__(self, *args: Any, **kwargs: Any) -> "ClientLazyLoader":
    """Call mocked attribute.

    Args:
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        self
    """
    # workaround to protect from infinitely looping over in deepcopy called in invocations
    if not self.exclude_next_call:
        self.call_chain.append(
            _CallStep(is_call=True, call_args=args, call_kwargs=kwargs)
        )
    self.exclude_next_call = False
    return self

__getattr__(name)

Get attribute not defined in ClientLazyLoader.

Parameters:

Name Type Description Default
name str

Name of the attribute to get.

required

Returns:

Type Description
ClientLazyLoader

self

Source code in src/zenml/client_lazy_loader.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __getattr__(self, name: str) -> "ClientLazyLoader":
    """Get attribute not defined in ClientLazyLoader.

    Args:
        name: Name of the attribute to get.

    Returns:
        self
    """
    self_ = ClientLazyLoader(
        method_name=self.method_name, call_chain=self.call_chain.copy()
    )
    # workaround to protect from infinitely looping over in deepcopy called in invocations
    if name != "__deepcopy__":
        self_.call_chain.append(_CallStep(attribute_name=name))
    else:
        self_.exclude_next_call = True
    return self_

__getitem__(item)

Get item from mocked attribute.

Parameters:

Name Type Description Default
item Any

Item to get.

required

Returns:

Type Description
ClientLazyLoader

self

Source code in src/zenml/client_lazy_loader.py
81
82
83
84
85
86
87
88
89
90
91
def __getitem__(self, item: Any) -> "ClientLazyLoader":
    """Get item from mocked attribute.

    Args:
        item: Item to get.

    Returns:
        self
    """
    self.call_chain.append(_CallStep(selector=item))
    return self

evaluate()

Evaluate lazy loaded Client method.

Returns:

Type Description
Any

Evaluated lazy loader chain of calls.

Source code in src/zenml/client_lazy_loader.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def evaluate(self) -> Any:
    """Evaluate lazy loaded Client method.

    Returns:
        Evaluated lazy loader chain of calls.
    """
    from zenml.client import Client

    def _iterate_over_lazy_chain(
        self: "ClientLazyLoader", self_: Any, call_chain_: List[_CallStep]
    ) -> Any:
        next_step = call_chain_.pop(0)
        try:
            if next_step.is_call:
                self_ = self_(
                    *next_step.call_args, **next_step.call_kwargs
                )
            elif next_step.selector:
                self_ = self_[next_step.selector]
            elif next_step.attribute_name:
                self_ = getattr(self_, next_step.attribute_name)
            else:
                raise ValueError(
                    "Invalid call chain. Reach out to the ZenML team."
                )
        except Exception as e:
            logger.debug(
                f"Failed to evaluate lazy load chain `{self.method_name}` "
                f"+ `{next_step}` + `{self.call_chain}`."
            )
            msg = f"`{self.method_name}("
            if next_step:
                for arg in next_step.call_args:
                    msg += f"'{arg}',"
                for k, v in next_step.call_kwargs.items():
                    msg += f"{k}='{v}',"
                msg = msg[:-1]
            msg += f")` failed during lazy load with error: {e}"
            logger.error(msg)
            raise RuntimeError(msg)
        return self_

    self_ = getattr(Client(), self.method_name)
    call_chain_ = self.call_chain.copy()
    while call_chain_:
        self_ = _iterate_over_lazy_chain(self, self_, call_chain_)
    return self_

client_lazy_loader(method_name, *args, **kwargs)

Lazy loader for Client methods helper.

Usage:

def get_something(self, arg1: Any)->SomeResponse:
    if cll:=client_lazy_loader("get_something", arg1):
        return cll # type: ignore[return-value]
    return SomeResponse()

Parameters:

Name Type Description Default
method_name str

The name of the method to be called.

required
*args Any

The arguments to be passed to the method.

()
**kwargs Any

The keyword arguments to be passed to the method.

{}

Returns:

Type Description
Optional[ClientLazyLoader]

The result of the method call.

Source code in src/zenml/client_lazy_loader.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def client_lazy_loader(
    method_name: str, *args: Any, **kwargs: Any
) -> Optional[ClientLazyLoader]:
    """Lazy loader for Client methods helper.

    Usage:
    ```
    def get_something(self, arg1: Any)->SomeResponse:
        if cll:=client_lazy_loader("get_something", arg1):
            return cll # type: ignore[return-value]
        return SomeResponse()
    ```

    Args:
        method_name: The name of the method to be called.
        *args: The arguments to be passed to the method.
        **kwargs: The keyword arguments to be passed to the method.

    Returns:
        The result of the method call.
    """
    from zenml import get_pipeline_context

    try:
        get_pipeline_context()
        cll = ClientLazyLoader(
            method_name=method_name,
        )
        return cll(*args, **kwargs)
    except RuntimeError:
        return None

evaluate_all_lazy_load_args_in_client_methods(cls)

Class wrapper to evaluate lazy loader arguments of all methods.

Parameters:

Name Type Description Default
cls Type[Client]

The class to wrap.

required

Returns:

Type Description
Type[Client]

Wrapped class.

Source code in src/zenml/client_lazy_loader.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def evaluate_all_lazy_load_args_in_client_methods(
    cls: Type["Client"],
) -> Type["Client"]:
    """Class wrapper to evaluate lazy loader arguments of all methods.

    Args:
        cls: The class to wrap.

    Returns:
        Wrapped class.
    """
    import inspect

    def _evaluate_args(
        func: Callable[..., Any], is_instance_method: bool
    ) -> Any:
        def _inner(*args: Any, **kwargs: Any) -> Any:
            args_ = list(args)
            if not is_instance_method:
                from zenml.client import Client

                if args and isinstance(args[0], Client):
                    args_ = list(args[1:])

            for i in range(len(args_)):
                if isinstance(args_[i], dict):
                    with contextlib.suppress(ValueError):
                        args_[i] = ClientLazyLoader(**args_[i]).evaluate()
                elif isinstance(args_[i], ClientLazyLoader):
                    args_[i] = args_[i].evaluate()

            for k, v in kwargs.items():
                if isinstance(v, dict):
                    with contextlib.suppress(ValueError):
                        kwargs[k] = ClientLazyLoader(**v).evaluate()

            return func(*args_, **kwargs)

        return _inner

    def _decorate() -> Type["Client"]:
        for name, fn in inspect.getmembers(cls, inspect.isfunction):
            setattr(
                cls,
                name,
                _evaluate_args(fn, "self" in inspect.getfullargspec(fn).args),
            )
        return cls

    return _decorate()