Utils
zenml.utils
special
The utils
module contains utility functions handling analytics, reading and
writing YAML data as well as other general purpose functions.
analytics_utils
Analytics code for ZenML
get_environment()
Returns a string representing the execution environment of the pipeline.
Currently, one of docker
, paperspace
, 'colab', or native
Source code in zenml/utils/analytics_utils.py
def get_environment() -> str:
"""Returns a string representing the execution environment of the pipeline.
Currently, one of `docker`, `paperspace`, 'colab', or `native`"""
if in_docker():
return "docker"
elif in_google_colab():
return "colab"
elif in_paperspace_gradient():
return "paperspace"
else:
return "native"
get_segment_key()
Get key for authorizing to Segment backend.
Returns:
Type | Description |
---|---|
str |
Segment key as a string. |
Source code in zenml/utils/analytics_utils.py
def get_segment_key() -> str:
"""Get key for authorizing to Segment backend.
Returns:
Segment key as a string.
"""
if IS_DEBUG_ENV:
return SEGMENT_KEY_DEV
else:
return SEGMENT_KEY_PROD
get_system_info()
Returns system info as a dict.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dict of system information. |
Source code in zenml/utils/analytics_utils.py
def get_system_info() -> Dict[str, Any]:
"""Returns system info as a dict.
Returns:
A dict of system information.
"""
system = platform.system()
if system == "Windows":
release, version, csd, ptype = platform.win32_ver()
return {
"os": "windows",
"windows_version_release": release,
"windows_version": version,
"windows_version_service_pack": csd,
"windows_version_os_type": ptype,
}
if system == "Darwin":
return {"os": "mac", "mac_version": platform.mac_ver()[0]}
if system == "Linux":
return {
"os": "linux",
"linux_distro": distro.id(),
"linux_distro_like": distro.like(),
"linux_distro_version": distro.version(),
}
# We don't collect data for any other system.
return {"os": "unknown"}
in_docker()
Returns: True if running in a Docker container, else False
Source code in zenml/utils/analytics_utils.py
def in_docker() -> bool:
"""Returns: True if running in a Docker container, else False"""
# TODO [ENG-167]: Make this more reliable and add test.
try:
with open("/proc/1/cgroup", "rt") as ifh:
info = ifh.read()
return "docker" in info or "kubepod" in info
except (FileNotFoundError, Exception):
return False
in_google_colab()
Returns: True if running in a Google Colab env, else False
Source code in zenml/utils/analytics_utils.py
def in_google_colab() -> bool:
"""Returns: True if running in a Google Colab env, else False"""
if "COLAB_GPU" in os.environ:
return True
return False
in_paperspace_gradient()
Returns: True if running in a Paperspace Gradient env, else False
Source code in zenml/utils/analytics_utils.py
def in_paperspace_gradient() -> bool:
"""Returns: True if running in a Paperspace Gradient env, else False"""
if "PAPERSPACE_NOTEBOOK_REPO_ID" in os.environ:
return True
return False
parametrized(dec)
This is a meta-decorator, that is, a decorator for decorators. As a decorator is a function, it actually works as a regular decorator with arguments:
Source code in zenml/utils/analytics_utils.py
def parametrized(
dec: Callable[..., Callable[..., Any]]
) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]]:
"""This is a meta-decorator, that is, a decorator for decorators.
As a decorator is a function, it actually works as a regular decorator
with arguments:"""
def layer(
*args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Internal layer"""
def repl(f: Callable[..., Any]) -> Callable[..., Any]:
"""Internal repl"""
return dec(f, *args, **kwargs)
return repl
return layer
track(*args, **kwargs)
Internal layer
Source code in zenml/utils/analytics_utils.py
def layer(
*args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Internal layer"""
def repl(f: Callable[..., Any]) -> Callable[..., Any]:
"""Internal repl"""
return dec(f, *args, **kwargs)
return repl
track_event(event, metadata=None)
Track segment event if user opted-in.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
str |
Name of event to track in segment. |
required |
metadata |
Optional[Dict[str, Any]] |
Dict of metadata to track. |
None |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False is not. |
Source code in zenml/utils/analytics_utils.py
def track_event(event: str, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Track segment event if user opted-in.
Args:
event: Name of event to track in segment.
metadata: Dict of metadata to track.
Returns:
True if event is sent successfully, False is not.
"""
try:
import analytics
from zenml.config.global_config import GlobalConfig
if analytics.write_key is None:
analytics.write_key = get_segment_key()
assert (
analytics.write_key is not None
), "Analytics key not set but trying to make telemetry call."
# Set this to 1 to avoid backoff loop
analytics.max_retries = 1
gc = GlobalConfig()
logger.debug(
f"Attempting analytics: User: {gc.user_id}, "
f"Event: {event},"
f"Metadata: {metadata}"
)
if not gc.analytics_opt_in and event not in [
OPT_OUT_ANALYTICS,
OPT_IN_ANALYTICS,
]:
return False
if metadata is None:
metadata = {}
# add basics
metadata.update(get_system_info())
metadata.update(
{
"environment": get_environment(),
"version": __version__,
}
)
analytics.track(str(gc.user_id), event, metadata)
logger.debug(
f"Analytics sent: User: {gc.user_id}, Event: {event}, Metadata: "
f"{metadata}"
)
return True
except Exception as e:
# We should never fail main thread
logger.debug(f"Analytics failed due to: {e}")
return False
daemon
Utility functions to start/stop daemon processes.
This is only implemented for UNIX systems and therefore doesn't work on Windows. Based on https://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
check_if_daemon_is_running(pid_file)
Checks whether a daemon process indicated by the PID file is running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file |
str |
Path to file containing the PID of the daemon process to check. |
required |
Source code in zenml/utils/daemon.py
def check_if_daemon_is_running(pid_file: str) -> bool:
"""Checks whether a daemon process indicated by the PID file is running.
Args:
pid_file: Path to file containing the PID of the daemon
process to check.
"""
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
except (IOError, FileNotFoundError):
return False
return psutil.pid_exists(pid)
run_as_daemon(daemon_function, pid_file, log_file=None, working_directory='/')
Runs a function as a daemon process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
daemon_function |
Callable[..., Any] |
The function to run as a daemon. |
required |
pid_file |
str |
Path to file in which to store the PID of the daemon process. |
required |
log_file |
Optional[str] |
Optional file to which the daemons stdout/stderr will be redirected to. |
None |
working_directory |
str |
Working directory for the daemon process, defaults to the root directory. |
'/' |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If the PID file already exists. |
Source code in zenml/utils/daemon.py
def run_as_daemon(
daemon_function: Callable[..., Any],
pid_file: str,
log_file: Optional[str] = None,
working_directory: str = "/",
) -> None:
"""Runs a function as a daemon process.
Args:
daemon_function: The function to run as a daemon.
pid_file: Path to file in which to store the PID of the daemon process.
log_file: Optional file to which the daemons stdout/stderr will be
redirected to.
working_directory: Working directory for the daemon process, defaults
to the root directory.
Raises:
FileExistsError: If the PID file already exists.
"""
# convert to absolute path as we will change working directory later
pid_file = os.path.abspath(pid_file)
if log_file:
log_file = os.path.abspath(log_file)
# check if PID file exists
if os.path.exists(pid_file):
raise FileExistsError(
f"The PID file '{pid_file}' already exists, either the daemon "
f"process is already running or something went wrong."
)
# first fork
try:
pid = os.fork()
if pid > 0:
# this is the process that called `run_as_daemon` so we
# simply return so it can keep running
return
except OSError as e:
logger.error("Unable to fork (error code: %d)", e.errno)
sys.exit(1)
# decouple from parent environment
os.chdir(working_directory)
os.setsid()
os.umask(0)
# second fork
try:
pid = os.fork()
if pid > 0:
# this is the parent of the future daemon process, kill it
# so the daemon gets adopted by the init process
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Unable to fork (error code: {e.errno})")
sys.exit(1)
# redirect standard file descriptors to devnull (or the given logfile)
devnull = "/dev/null"
if hasattr(os, "devnull"):
devnull = os.devnull
devnull_fd = os.open(devnull, os.O_RDWR)
log_fd = os.open(log_file, os.O_CREAT | os.O_RDWR) if log_file else None
out_fd = log_fd or devnull_fd
os.dup2(devnull_fd, sys.stdin.fileno())
os.dup2(out_fd, sys.stdout.fileno())
os.dup2(out_fd, sys.stderr.fileno())
# write the PID file
with open(pid_file, "w+") as f:
f.write(f"{os.getpid()}\n")
# register actions in case this process exits/gets killed
def sigterm(signum: int, frame: Optional[types.FrameType]) -> None:
"""Removes the PID file."""
os.remove(pid_file)
def cleanup() -> None:
"""Removes the PID file."""
os.remove(pid_file)
signal.signal(signal.SIGTERM, sigterm)
atexit.register(cleanup)
# finally run the actual daemon code
daemon_function()
stop_daemon(pid_file, kill_children=True)
Stops a daemon process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file |
str |
Path to file containing the PID of the daemon process to kill. |
required |
kill_children |
bool |
If |
True |
Source code in zenml/utils/daemon.py
def stop_daemon(pid_file: str, kill_children: bool = True) -> None:
"""Stops a daemon process.
Args:
pid_file: Path to file containing the PID of the daemon process to kill.
kill_children: If `True`, all child processes of the daemon process
will be killed as well.
"""
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
except (IOError, FileNotFoundError):
logger.warning("Daemon PID file '%s' does not exist.", pid_file)
return
if psutil.pid_exists(pid):
process = psutil.Process(pid)
if kill_children:
for child in process.children(recursive=True):
child.kill()
process.kill()
else:
logger.warning("PID from '%s' does not exist.", pid_file)
networking_utils
find_available_port()
Finds a local unoccupied port.
Source code in zenml/utils/networking_utils.py
def find_available_port() -> int:
"""Finds a local unoccupied port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
_, port = s.getsockname()
return cast(int, port)
port_available(port)
Checks if a local port is available.
Source code in zenml/utils/networking_utils.py
def port_available(port: int) -> bool:
"""Checks if a local port is available."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", port))
except socket.error as e:
logger.debug("Port %d unavailable: %s", port, e)
return False
return True
source_utils
These utils are predicated on the following definitions:
- class_source: This is a python-import type path to a class, e.g. some.mod.class
- module_source: This is a python-import type path to a module, e.g. some.mod
- file_path, relative_path, absolute_path: These are file system paths.
- source: This is a class_source or module_source. If it is a class_source, it can also be optionally pinned.
- pin: Whatever comes after the
@
symbol from a source, usually the git sha or the version of zenml as a string.
create_zenml_pin()
Creates a ZenML pin for source pinning from release version.
Source code in zenml/utils/source_utils.py
def create_zenml_pin() -> str:
"""Creates a ZenML pin for source pinning from release version."""
return f"{APP_NAME}_{__version__}"
get_absolute_path_from_module_source(module)
Get a directory path from module source.
E.g. zenml.core.step
will return full/path/to/zenml/core/step
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module |
str |
A module e.g. |
required |
Source code in zenml/utils/source_utils.py
def get_absolute_path_from_module_source(module: str) -> str:
"""Get a directory path from module source.
E.g. `zenml.core.step` will return `full/path/to/zenml/core/step`.
Args:
module: A module e.g. `zenml.core.step`.
"""
mod = importlib.import_module(module)
return mod.__path__[0] # type: ignore[no-any-return, attr-defined]
get_class_source_from_source(source)
Gets class source from source, i.e. module.path@version, returns version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
source pointing to potentially pinned sha. |
required |
Source code in zenml/utils/source_utils.py
def get_class_source_from_source(source: str) -> str:
"""Gets class source from source, i.e. module.path@version, returns version.
Args:
source: source pointing to potentially pinned sha.
"""
# source need not even be pinned
return source.split("@")[0]
get_module_source_from_class(class_)
Takes class input and returns module_source. If class is already string then returns the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_ |
Union[Type[Any], str] |
object of type class. |
required |
Source code in zenml/utils/source_utils.py
def get_module_source_from_class(
class_: Union[Type[Any], str]
) -> Optional[str]:
"""Takes class input and returns module_source. If class is already string
then returns the same.
Args:
class_: object of type class.
"""
if isinstance(class_, str):
module_source = class_
else:
# Infer it from the class provided
if not inspect.isclass(class_):
raise AssertionError("step_type is neither string nor class.")
module_source = class_.__module__ + "." + class_.__name__
return module_source
get_module_source_from_file_path(file_path)
Gets module_source from a file_path. E.g. /home/myrepo/step/trainer.py
returns myrepo.step.trainer
if myrepo
is the root of the repo.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Absolute file path to a file within the module. |
required |
Source code in zenml/utils/source_utils.py
def get_module_source_from_file_path(file_path: str) -> str:
"""Gets module_source from a file_path. E.g. `/home/myrepo/step/trainer.py`
returns `myrepo.step.trainer` if `myrepo` is the root of the repo.
Args:
file_path: Absolute file path to a file within the module.
"""
from zenml.core.repo import Repository
repo_path = Repository().path
# Replace repo_path with file_path to get relative path left over
relative_file_path = file_path.replace(repo_path, "")[1:]
# Kick out the .py and replace `/` with `.` to get the module source
relative_file_path = relative_file_path.replace(".py", "")
module_source = relative_file_path.replace("/", ".")
return module_source
get_module_source_from_source(source)
Gets module source from source. E.g. some.module.file.class@version
,
returns some.module
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
source pointing to potentially pinned sha. |
required |
Source code in zenml/utils/source_utils.py
def get_module_source_from_source(source: str) -> str:
"""Gets module source from source. E.g. `some.module.file.class@version`,
returns `some.module`.
Args:
source: source pointing to potentially pinned sha.
"""
class_source = get_class_source_from_source(source)
return ".".join(class_source.split(".")[:-2])
get_relative_path_from_module_source(module_source)
Get a directory path from module, relative to root of repository.
E.g. zenml.core.step will return zenml/core/step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_source |
str |
A module e.g. zenml.core.step |
required |
Source code in zenml/utils/source_utils.py
def get_relative_path_from_module_source(module_source: str) -> str:
"""Get a directory path from module, relative to root of repository.
E.g. zenml.core.step will return zenml/core/step.
Args:
module_source: A module e.g. zenml.core.step
"""
return module_source.replace(".", "/")
import_class_by_path(class_path)
Imports a class based on a given path
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_path |
str |
str, class_source e.g. this.module.Class |
required |
Returns: the given class
Source code in zenml/utils/source_utils.py
def import_class_by_path(class_path: str) -> Type[Any]:
"""Imports a class based on a given path
Args:
class_path: str, class_source e.g. this.module.Class
Returns: the given class
"""
classname = class_path.split(".")[-1]
modulename = ".".join(class_path.split(".")[0:-1])
mod = importlib.import_module(modulename)
return getattr(mod, classname) # type: ignore[no-any-return]
import_python_file(file_path)
Imports a python file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to python file that should be imported. |
required |
Returns:
Type | Description |
---|---|
module |
The imported module. |
Source code in zenml/utils/source_utils.py
def import_python_file(file_path: str) -> types.ModuleType:
"""Imports a python file.
Args:
file_path: Path to python file that should be imported.
Returns:
The imported module.
"""
# Add directory of python file to PYTHONPATH so we can import it
file_path = os.path.abspath(file_path)
sys.path.append(os.path.dirname(file_path))
module_name = os.path.splitext(os.path.basename(file_path))[0]
return importlib.import_module(module_name)
is_inside_repository(file_path)
Returns whether a file is inside a zenml repository.
Source code in zenml/utils/source_utils.py
def is_inside_repository(file_path: str) -> bool:
"""Returns whether a file is inside a zenml repository."""
from zenml.core.repo import Repository
repo_path = pathlib.Path(Repository().path).resolve()
absolute_file_path = pathlib.Path(file_path).resolve()
return repo_path in absolute_file_path.parents
is_standard_pin(pin)
Returns True
if pin is valid ZenML pin, else False.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pin |
str |
potential ZenML pin like 'zenml_0.1.1' |
required |
Source code in zenml/utils/source_utils.py
def is_standard_pin(pin: str) -> bool:
"""Returns `True` if pin is valid ZenML pin, else False.
Args:
pin: potential ZenML pin like 'zenml_0.1.1'
"""
if pin.startswith(f"{APP_NAME}_"):
return True
return False
is_standard_source(source)
Returns True
if source is a standard ZenML source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class[@pin]. |
required |
Source code in zenml/utils/source_utils.py
def is_standard_source(source: str) -> bool:
"""Returns `True` if source is a standard ZenML source.
Args:
source: class_source e.g. this.module.Class[@pin].
"""
if source.split(".")[0] == "zenml":
return True
return False
is_third_party_module(file_path)
Returns whether a file belongs to a third party package.
Source code in zenml/utils/source_utils.py
def is_third_party_module(file_path: str) -> bool:
"""Returns whether a file belongs to a third party package."""
absolute_file_path = pathlib.Path(file_path).resolve()
for path in site.getsitepackages() + [site.getusersitepackages()]:
if pathlib.Path(path).resolve() in absolute_file_path.parents:
return True
return False
load_source_path_class(source)
Loads a Python class from the source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class[@sha] |
required |
Source code in zenml/utils/source_utils.py
def load_source_path_class(source: str) -> Type[Any]:
"""Loads a Python class from the source.
Args:
source: class_source e.g. this.module.Class[@sha]
"""
if "@" in source:
source = source.split("@")[0]
logger.debug(
"Unpinned step found with no git sha. Attempting to "
"load class from current repository state."
)
class_ = import_class_by_path(source)
return class_
resolve_class(class_)
Resolves a class into a serializable source string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_ |
Type[Any] |
A Python Class reference. |
required |
Returns: source_path e.g. this.module.Class.
Source code in zenml/utils/source_utils.py
def resolve_class(class_: Type[Any]) -> str:
"""Resolves a class into a serializable source string.
Args:
class_: A Python Class reference.
Returns: source_path e.g. this.module.Class.
"""
initial_source = class_.__module__ + "." + class_.__name__
if is_standard_source(initial_source):
return resolve_standard_source(initial_source)
try:
file_path = inspect.getfile(class_)
except TypeError:
# builtin file
return initial_source
if (
initial_source.startswith("__main__")
or not is_inside_repository(file_path)
or is_third_party_module(file_path)
):
return initial_source
# Regular user file inside the repository -> get the full module
# path relative to the repository
module_source = get_module_source_from_file_path(file_path)
# ENG-123 Sanitize for Windows OS
# module_source = module_source.replace("\\", ".")
return module_source + "." + class_.__name__
resolve_standard_source(source)
Creates a ZenML pin for source pinning from release version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class. |
required |
Source code in zenml/utils/source_utils.py
def resolve_standard_source(source: str) -> str:
"""Creates a ZenML pin for source pinning from release version.
Args:
source: class_source e.g. this.module.Class.
"""
if "@" in source:
raise AssertionError(f"source {source} is already pinned.")
pin = create_zenml_pin()
return f"{source}@{pin}"
string_utils
get_human_readable_filesize(bytes_)
Convert a file size in bytes into a human-readable string.
Source code in zenml/utils/string_utils.py
def get_human_readable_filesize(bytes_: int) -> str:
"""Convert a file size in bytes into a human-readable string."""
size = abs(float(bytes_))
for unit in ["B", "KiB", "MiB", "GiB"]:
if size < 1024.0 or unit == "GiB":
break
size /= 1024.0
return f"{size:.2f} {unit}"
get_human_readable_time(seconds)
Convert seconds into a human-readable string.
Source code in zenml/utils/string_utils.py
def get_human_readable_time(seconds: float) -> str:
"""Convert seconds into a human-readable string."""
prefix = "-" if seconds < 0 else ""
seconds = abs(seconds)
int_seconds = int(seconds)
days, int_seconds = divmod(int_seconds, 86400)
hours, int_seconds = divmod(int_seconds, 3600)
minutes, int_seconds = divmod(int_seconds, 60)
if days > 0:
time_string = f"{days}d{hours}h{minutes}m{int_seconds}s"
elif hours > 0:
time_string = f"{hours}h{minutes}m{int_seconds}s"
elif minutes > 0:
time_string = f"{minutes}m{int_seconds}s"
else:
time_string = f"{seconds:.3f}s"
return prefix + time_string
yaml_utils
is_yaml(file_path)
Returns True if file_path is YAML, else False
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
Returns:
Type | Description |
---|---|
bool |
True if is yaml, else False. |
Source code in zenml/utils/yaml_utils.py
def is_yaml(file_path: str) -> bool:
"""Returns True if file_path is YAML, else False
Args:
file_path: Path to YAML file.
Returns:
True if is yaml, else False.
"""
if file_path.endswith("yaml") or file_path.endswith("yml"):
return True
return False
read_json(file_path)
Read JSON on file path and returns contents as dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to JSON file. |
required |
Source code in zenml/utils/yaml_utils.py
def read_json(file_path: str) -> Any:
"""Read JSON on file path and returns contents as dict.
Args:
file_path: Path to JSON file.
"""
if fileio.file_exists(file_path):
contents = zenml.io.utils.read_file_contents_as_string(file_path)
return json.loads(contents)
else:
raise FileNotFoundError(f"{file_path} does not exist.")
read_yaml(file_path)
Read YAML on file path and returns contents as dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
Returns:
Type | Description |
---|---|
Any |
Contents of the file in a dict. |
Source code in zenml/utils/yaml_utils.py
def read_yaml(file_path: str) -> Any:
"""Read YAML on file path and returns contents as dict.
Args:
file_path: Path to YAML file.
Returns:
Contents of the file in a dict.
Raises:
FileNotFoundError if file does not exist.
"""
if fileio.file_exists(file_path):
contents = zenml.io.utils.read_file_contents_as_string(file_path)
return yaml.load(contents, Loader=yaml.FullLoader)
else:
raise FileNotFoundError(f"{file_path} does not exist.")
write_json(file_path, contents)
Write contents as JSON format to file_path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to JSON file. |
required |
contents |
Dict[str, Any] |
Contents of JSON file as dict. |
required |
Returns:
Type | Description |
---|---|
None |
Contents of the file in a dict. |
Source code in zenml/utils/yaml_utils.py
def write_json(file_path: str, contents: Dict[str, Any]) -> None:
"""Write contents as JSON format to file_path.
Args:
file_path: Path to JSON file.
contents: Contents of JSON file as dict.
Returns:
Contents of the file in a dict.
Raises:
FileNotFoundError if directory does not exist.
"""
if not fileio.is_remote(file_path):
dir_ = str(Path(file_path).parent)
if not fileio.is_dir(dir_):
# If it is a local path and it doesn't exist, raise Exception.
raise FileNotFoundError(f"Directory {dir_} does not exist.")
zenml.io.utils.write_file_contents_as_string(
file_path, json.dumps(contents)
)
write_yaml(file_path, contents)
Write contents as YAML format to file_path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
contents |
Dict[Any, Any] |
Contents of YAML file as dict. |
required |
Source code in zenml/utils/yaml_utils.py
def write_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
"""Write contents as YAML format to file_path.
Args:
file_path: Path to YAML file.
contents: Contents of YAML file as dict.
Raises:
FileNotFoundError if directory does not exist.
"""
if not fileio.is_remote(file_path):
dir_ = str(Path(file_path).parent)
if not fileio.is_dir(dir_):
raise FileNotFoundError(f"Directory {dir_} does not exist.")
zenml.io.utils.write_file_contents_as_string(file_path, yaml.dump(contents))