Skip to content

Tensorboard

zenml.integrations.tensorboard special

Initialization for TensorBoard integration.

TensorBoardIntegration (Integration)

Definition of TensorBoard integration for ZenML.

Source code in zenml/integrations/tensorboard/__init__.py
class TensorBoardIntegration(Integration):
    """Definition of TensorBoard integration for ZenML."""

    NAME = TENSORBOARD
    REQUIREMENTS = []

    @classmethod
    def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
        """Defines platform specific requirements for the integration.

        Args:
            target_os: The target operating system.

        Returns:
            A list of requirements.
        """
        if sys.version_info > (3, 11):
            tf_version = "2.13"
        else:
            # Capping tensorflow to 2.11 for Python 3.10 and below because it
            # is not compatible with Pytorch
            # (see https://github.com/pytorch/pytorch/issues/99637).
            tf_version = "2.11"

        requirements = [
            f"tensorboard=={tf_version}",
            "protobuf>=3.6.0,<4.0.0",
        ]
        return requirements

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.tensorboard import services  # noqa

activate() classmethod

Activates the integration.

Source code in zenml/integrations/tensorboard/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.tensorboard import services  # noqa

get_requirements(target_os=None) classmethod

Defines platform specific requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in zenml/integrations/tensorboard/__init__.py
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Defines platform specific requirements for the integration.

    Args:
        target_os: The target operating system.

    Returns:
        A list of requirements.
    """
    if sys.version_info > (3, 11):
        tf_version = "2.13"
    else:
        # Capping tensorflow to 2.11 for Python 3.10 and below because it
        # is not compatible with Pytorch
        # (see https://github.com/pytorch/pytorch/issues/99637).
        tf_version = "2.11"

    requirements = [
        f"tensorboard=={tf_version}",
        "protobuf>=3.6.0,<4.0.0",
    ]
    return requirements

services special

Initialization for TensorBoard services.

tensorboard_service

Implementation of the TensorBoard service.

TensorboardService (LocalDaemonService) pydantic-model

TensorBoard service.

This can be used to start a local TensorBoard server for one or more models.

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the TensorBoard service class

config TensorboardServiceConfig

service configuration

endpoint LocalDaemonServiceEndpoint

optional service endpoint

Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
class TensorboardService(LocalDaemonService):
    """TensorBoard service.

    This can be used to start a local TensorBoard server for one or more models.

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the TensorBoard service class
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="tensorboard",
        type="visualization",
        flavor="tensorboard",
        description="TensorBoard visualization service",
    )

    config: TensorboardServiceConfig
    endpoint: LocalDaemonServiceEndpoint

    def __init__(
        self,
        config: Union[TensorboardServiceConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        """Initialization for TensorBoard service.

        Args:
            config: service configuration
            **attrs: additional attributes
        """
        # ensure that the endpoint is created before the service is initialized
        # TODO [ENG-697]: implement a service factory or builder for TensorBoard
        #   deployment services
        if (
            isinstance(config, TensorboardServiceConfig)
            and "endpoint" not in attrs
        ):
            endpoint = LocalDaemonServiceEndpoint(
                config=LocalDaemonServiceEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path="",
                        use_head_request=True,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        """Initialize and run the TensorBoard server."""
        logger.info(
            "Starting TensorBoard service as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()

        try:
            tensorboard = program.TensorBoard(
                plugins=default.get_plugins(),
                subcommands=[uploader_subcommand.UploaderSubcommand()],
            )
            tensorboard.configure(
                logdir=self.config.logdir,
                port=self.endpoint.status.port,
                host="localhost",
                max_reload_threads=self.config.max_reload_threads,
                reload_interval=self.config.reload_interval,
            )
            tensorboard.main()
        except KeyboardInterrupt:
            logger.info(
                "TensorBoard service stopped. Resuming normal execution."
            )
__init__(self, config, **attrs) special

Initialization for TensorBoard service.

Parameters:

Name Type Description Default
config Union[zenml.integrations.tensorboard.services.tensorboard_service.TensorboardServiceConfig, Dict[str, Any]]

service configuration

required
**attrs Any

additional attributes

{}
Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
def __init__(
    self,
    config: Union[TensorboardServiceConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialization for TensorBoard service.

    Args:
        config: service configuration
        **attrs: additional attributes
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-697]: implement a service factory or builder for TensorBoard
    #   deployment services
    if (
        isinstance(config, TensorboardServiceConfig)
        and "endpoint" not in attrs
    ):
        endpoint = LocalDaemonServiceEndpoint(
            config=LocalDaemonServiceEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path="",
                    use_head_request=True,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
run(self)

Initialize and run the TensorBoard server.

Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
def run(self) -> None:
    """Initialize and run the TensorBoard server."""
    logger.info(
        "Starting TensorBoard service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()

    try:
        tensorboard = program.TensorBoard(
            plugins=default.get_plugins(),
            subcommands=[uploader_subcommand.UploaderSubcommand()],
        )
        tensorboard.configure(
            logdir=self.config.logdir,
            port=self.endpoint.status.port,
            host="localhost",
            max_reload_threads=self.config.max_reload_threads,
            reload_interval=self.config.reload_interval,
        )
        tensorboard.main()
    except KeyboardInterrupt:
        logger.info(
            "TensorBoard service stopped. Resuming normal execution."
        )
TensorboardServiceConfig (LocalDaemonServiceConfig) pydantic-model

TensorBoard service configuration.

Attributes:

Name Type Description
logdir str

location of TensorBoard log files.

max_reload_threads int

the max number of threads that TensorBoard can use to reload runs. Each thread reloads one run at a time.

reload_interval int

how often the backend should load more data, in seconds. Set to 0 to load just once at startup.

Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
class TensorboardServiceConfig(LocalDaemonServiceConfig):
    """TensorBoard service configuration.

    Attributes:
        logdir: location of TensorBoard log files.
        max_reload_threads: the max number of threads that TensorBoard can use
            to reload runs. Each thread reloads one run at a time.
        reload_interval: how often the backend should load more data, in
            seconds. Set to 0 to load just once at startup.
    """

    logdir: str
    max_reload_threads: int = 1
    reload_interval: int = 5

visualizers special

Initialization for TensorBoard visualizer.

tensorboard_visualizer

Implementation of a TensorBoard visualizer step.

TensorboardVisualizer

The implementation of a TensorBoard Visualizer.

Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
class TensorboardVisualizer:
    """The implementation of a TensorBoard Visualizer."""

    @classmethod
    def find_running_tensorboard_server(
        cls, logdir: str
    ) -> Optional[TensorBoardInfo]:
        """Find a local TensorBoard server instance.

        Finds when it is running for the supplied logdir location and return its
        TCP port.

        Args:
            logdir: The logdir location where the TensorBoard server is running.

        Returns:
            The TensorBoardInfo describing the running TensorBoard server or
            None if no server is running for the supplied logdir location.
        """
        for server in get_all():
            if (
                server.logdir == logdir
                and server.pid
                and psutil.pid_exists(server.pid)
            ):
                return server
        return None

    def visualize(
        self,
        object: StepRunResponseModel,
        height: int = 800,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Start a TensorBoard server.

        Allows for the visualization of all models logged as artifacts by the
        indicated step. The server will monitor and display all the models
        logged by past and future step runs.

        Args:
            object: StepRunResponseModel fetched from get_step().
            height: Height of the generated visualization.
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        for _, artifact_view in object.outputs.items():
            # filter out anything but model artifacts
            if artifact_view.type == ArtifactType.MODEL:
                logdir = os.path.dirname(artifact_view.uri)

                # first check if a TensorBoard server is already running for
                # the same logdir location and use that one
                running_server = self.find_running_tensorboard_server(logdir)
                if running_server:
                    self.visualize_tensorboard(running_server.port, height)
                    return

                if sys.platform == "win32":
                    # Daemon service functionality is currently not supported on Windows
                    print(
                        "You can run:\n"
                        f"[italic green]    tensorboard --logdir {logdir}"
                        "[/italic green]\n"
                        "...to visualize the TensorBoard logs for your trained model."
                    )
                else:
                    # start a new TensorBoard server
                    service = TensorboardService(
                        TensorboardServiceConfig(
                            logdir=logdir,
                        )
                    )
                    service.start(timeout=60)
                    if service.endpoint.status.port:
                        self.visualize_tensorboard(
                            service.endpoint.status.port, height
                        )
                return

    def visualize_tensorboard(
        self,
        port: int,
        height: int,
    ) -> None:
        """Generate a visualization of a TensorBoard.

        Args:
            port: the TCP port where the TensorBoard server is listening for
                requests.
            height: Height of the generated visualization.
        """
        if Environment.in_notebook():
            notebook.display(port, height=height)
            return

        print(
            "You can visit:\n"
            f"[italic green]    http://localhost:{port}/[/italic green]\n"
            "...to visualize the TensorBoard logs for your trained model."
        )

    def stop(
        self,
        object: StepRunResponseModel,
    ) -> None:
        """Stop the TensorBoard server previously started for a pipeline step.

        Args:
            object: StepRunResponseModel fetched from get_step().
        """
        for _, artifact_view in object.outputs.items():
            # filter out anything but model artifacts
            if artifact_view.type == ArtifactType.MODEL:
                logdir = os.path.dirname(artifact_view.uri)

                # first check if a TensorBoard server is already running for
                # the same logdir location and use that one
                running_server = self.find_running_tensorboard_server(logdir)
                if not running_server:
                    return

                logger.debug(
                    "Stopping tensorboard server with PID '%d' ...",
                    running_server.pid,
                )
                try:
                    p = psutil.Process(running_server.pid)
                except psutil.Error:
                    logger.error(
                        "Could not find process for PID '%d' ...",
                        running_server.pid,
                    )
                    continue
                p.kill()
                return
find_running_tensorboard_server(logdir) classmethod

Find a local TensorBoard server instance.

Finds when it is running for the supplied logdir location and return its TCP port.

Parameters:

Name Type Description Default
logdir str

The logdir location where the TensorBoard server is running.

required

Returns:

Type Description
Optional[collections.TensorBoardInfo]

The TensorBoardInfo describing the running TensorBoard server or None if no server is running for the supplied logdir location.

Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
@classmethod
def find_running_tensorboard_server(
    cls, logdir: str
) -> Optional[TensorBoardInfo]:
    """Find a local TensorBoard server instance.

    Finds when it is running for the supplied logdir location and return its
    TCP port.

    Args:
        logdir: The logdir location where the TensorBoard server is running.

    Returns:
        The TensorBoardInfo describing the running TensorBoard server or
        None if no server is running for the supplied logdir location.
    """
    for server in get_all():
        if (
            server.logdir == logdir
            and server.pid
            and psutil.pid_exists(server.pid)
        ):
            return server
    return None
stop(self, object)

Stop the TensorBoard server previously started for a pipeline step.

Parameters:

Name Type Description Default
object StepRunResponseModel

StepRunResponseModel fetched from get_step().

required
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def stop(
    self,
    object: StepRunResponseModel,
) -> None:
    """Stop the TensorBoard server previously started for a pipeline step.

    Args:
        object: StepRunResponseModel fetched from get_step().
    """
    for _, artifact_view in object.outputs.items():
        # filter out anything but model artifacts
        if artifact_view.type == ArtifactType.MODEL:
            logdir = os.path.dirname(artifact_view.uri)

            # first check if a TensorBoard server is already running for
            # the same logdir location and use that one
            running_server = self.find_running_tensorboard_server(logdir)
            if not running_server:
                return

            logger.debug(
                "Stopping tensorboard server with PID '%d' ...",
                running_server.pid,
            )
            try:
                p = psutil.Process(running_server.pid)
            except psutil.Error:
                logger.error(
                    "Could not find process for PID '%d' ...",
                    running_server.pid,
                )
                continue
            p.kill()
            return
visualize(self, object, height=800, *args, **kwargs)

Start a TensorBoard server.

Allows for the visualization of all models logged as artifacts by the indicated step. The server will monitor and display all the models logged by past and future step runs.

Parameters:

Name Type Description Default
object StepRunResponseModel

StepRunResponseModel fetched from get_step().

required
height int

Height of the generated visualization.

800
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize(
    self,
    object: StepRunResponseModel,
    height: int = 800,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Start a TensorBoard server.

    Allows for the visualization of all models logged as artifacts by the
    indicated step. The server will monitor and display all the models
    logged by past and future step runs.

    Args:
        object: StepRunResponseModel fetched from get_step().
        height: Height of the generated visualization.
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    for _, artifact_view in object.outputs.items():
        # filter out anything but model artifacts
        if artifact_view.type == ArtifactType.MODEL:
            logdir = os.path.dirname(artifact_view.uri)

            # first check if a TensorBoard server is already running for
            # the same logdir location and use that one
            running_server = self.find_running_tensorboard_server(logdir)
            if running_server:
                self.visualize_tensorboard(running_server.port, height)
                return

            if sys.platform == "win32":
                # Daemon service functionality is currently not supported on Windows
                print(
                    "You can run:\n"
                    f"[italic green]    tensorboard --logdir {logdir}"
                    "[/italic green]\n"
                    "...to visualize the TensorBoard logs for your trained model."
                )
            else:
                # start a new TensorBoard server
                service = TensorboardService(
                    TensorboardServiceConfig(
                        logdir=logdir,
                    )
                )
                service.start(timeout=60)
                if service.endpoint.status.port:
                    self.visualize_tensorboard(
                        service.endpoint.status.port, height
                    )
            return
visualize_tensorboard(self, port, height)

Generate a visualization of a TensorBoard.

Parameters:

Name Type Description Default
port int

the TCP port where the TensorBoard server is listening for requests.

required
height int

Height of the generated visualization.

required
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize_tensorboard(
    self,
    port: int,
    height: int,
) -> None:
    """Generate a visualization of a TensorBoard.

    Args:
        port: the TCP port where the TensorBoard server is listening for
            requests.
        height: Height of the generated visualization.
    """
    if Environment.in_notebook():
        notebook.display(port, height=height)
        return

    print(
        "You can visit:\n"
        f"[italic green]    http://localhost:{port}/[/italic green]\n"
        "...to visualize the TensorBoard logs for your trained model."
    )
get_step(pipeline_name, step_name)

Get the StepRunResponseModel for the specified pipeline and step name.

Parameters:

Name Type Description Default
pipeline_name str

The name of the pipeline.

required
step_name str

The name of the step.

required

Returns:

Type Description
StepRunResponseModel

The StepRunResponseModel for the specified pipeline and step name.

Exceptions:

Type Description
RuntimeError

If the step is not found.

Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def get_step(pipeline_name: str, step_name: str) -> StepRunResponseModel:
    """Get the StepRunResponseModel for the specified pipeline and step name.

    Args:
        pipeline_name: The name of the pipeline.
        step_name: The name of the step.

    Returns:
        The StepRunResponseModel for the specified pipeline and step name.

    Raises:
        RuntimeError: If the step is not found.
    """
    pipeline = Client().get_pipeline(pipeline_name)
    if pipeline is None:
        raise RuntimeError(
            f"No pipeline with name `{pipeline_name}` was found"
        )

    last_run = pipeline.runs[0]
    step = last_run.steps[step_name]
    if step is None:
        raise RuntimeError(
            f"No pipeline step with name `{step_name}` was found in "
            f"pipeline `{pipeline_name}`"
        )
    return step
stop_tensorboard_server(pipeline_name, step_name)

Stop the TensorBoard server previously started for a pipeline step.

Parameters:

Name Type Description Default
pipeline_name str

the name of the pipeline

required
step_name str

pipeline step name

required
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def stop_tensorboard_server(pipeline_name: str, step_name: str) -> None:
    """Stop the TensorBoard server previously started for a pipeline step.

    Args:
        pipeline_name: the name of the pipeline
        step_name: pipeline step name
    """
    step = get_step(pipeline_name, step_name)
    TensorboardVisualizer().stop(step)
visualize_tensorboard(pipeline_name, step_name)

Start a TensorBoard server.

Allows for the visualization of all models logged as output by the named pipeline step. The server will monitor and display all the models logged by past and future step runs.

Parameters:

Name Type Description Default
pipeline_name str

the name of the pipeline

required
step_name str

pipeline step name

required
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize_tensorboard(pipeline_name: str, step_name: str) -> None:
    """Start a TensorBoard server.

    Allows for the visualization of all models logged as output by the named
    pipeline step. The server will monitor and display all the models logged by
    past and future step runs.

    Args:
        pipeline_name: the name of the pipeline
        step_name: pipeline step name
    """
    step = get_step(pipeline_name, step_name)
    TensorboardVisualizer().visualize(step)