Zen Server
zenml.zen_server
special
Initialization for ZenServer.
The ZenServer is a simple webserver to let you collaborate on stacks via
the network. It can be spun up in a background daemon from the command line
using zenml server up
and managed from the same command line group.
Using the ZenServer's stacks in your project just requires setting up a
profile with rest
store-type pointed to the url of the server.
zen_server
Zen Server implementation.
ZenServer (LocalDaemonService)
pydantic-model
Service daemon that can be used to start a local ZenServer.
Attributes:
Name | Type | Description |
---|---|---|
config |
ZenServerConfig |
service configuration |
endpoint |
ZenServerEndpoint |
optional service endpoint |
Source code in zenml/zen_server/zen_server.py
class ZenServer(LocalDaemonService):
"""Service daemon that can be used to start a local ZenServer.
Attributes:
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="zen_server",
type="zenml",
flavor="zenml",
description="ZenServer to manage stacks, users and pipelines",
)
config: ZenServerConfig
endpoint: ZenServerEndpoint
def __init__(
self,
config: Union[ZenServerConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the ZenServer.
Args:
config: service configuration.
attrs: additional attributes.
"""
# ensure that the endpoint is created before the service is initialized
if isinstance(config, ZenServerConfig) and "endpoint" not in attrs:
endpoint_uri_path = ZEN_SERVER_URL_PATH
healthcheck_uri_path = ZEN_SERVER_HEALTHCHECK_URL_PATH
use_head_request = True
endpoint = ZenServerEndpoint(
config=ZenServerEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
ip_address=config.ip_address,
port=config.port,
zen_server_uri_path=endpoint_uri_path,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=healthcheck_uri_path,
use_head_request=use_head_request,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
"""Run the ZenServer.
Raises:
ValueError: if unable to find the profile.
"""
profile = GlobalConfiguration().get_profile(self.config.profile_name)
if profile is None:
raise ValueError(
f"Could not find profile with name {self.config.profile_name}."
)
if profile.store_type == StoreType.REST:
raise ValueError(
"Service cannot be started with REST store type. Make sure you "
"specify a profile with a non-networked persistence backend "
"when trying to start the ZenServer. (use command line flag "
"`--profile=$PROFILE_NAME` or set the env variable "
f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
"other than the currently active one)"
)
logger.info(
"Starting ZenServer as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
# this is the only way to pass information into the FastAPI app??
os.environ["ZENML_PROFILE_NAME"] = self.config.profile_name
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host=self.config.ip_address,
port=self.endpoint.status.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenServer stopped. Resuming normal execution.")
@property
def zen_server_uri(self) -> Optional[str]:
"""Get the URI where the service responsible for the ZenServer is running.
Returns:
The URI where the service can be contacted for requests,
or None, if the service isn't running.
"""
if not self.is_running:
return None
return self.endpoint.endpoint_uri
zen_server_uri: Optional[str]
property
readonly
Get the URI where the service responsible for the ZenServer is running.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the service can be contacted for requests, or None, if the service isn't running. |
__init__(self, config, **attrs)
special
Initialize the ZenServer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.zen_server.zen_server.ZenServerConfig, Dict[str, Any]] |
service configuration. |
required |
attrs |
Any |
additional attributes. |
{} |
Source code in zenml/zen_server/zen_server.py
def __init__(
self,
config: Union[ZenServerConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the ZenServer.
Args:
config: service configuration.
attrs: additional attributes.
"""
# ensure that the endpoint is created before the service is initialized
if isinstance(config, ZenServerConfig) and "endpoint" not in attrs:
endpoint_uri_path = ZEN_SERVER_URL_PATH
healthcheck_uri_path = ZEN_SERVER_HEALTHCHECK_URL_PATH
use_head_request = True
endpoint = ZenServerEndpoint(
config=ZenServerEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
ip_address=config.ip_address,
port=config.port,
zen_server_uri_path=endpoint_uri_path,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=healthcheck_uri_path,
use_head_request=use_head_request,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
run(self)
Run the ZenServer.
Exceptions:
Type | Description |
---|---|
ValueError |
if unable to find the profile. |
Source code in zenml/zen_server/zen_server.py
def run(self) -> None:
"""Run the ZenServer.
Raises:
ValueError: if unable to find the profile.
"""
profile = GlobalConfiguration().get_profile(self.config.profile_name)
if profile is None:
raise ValueError(
f"Could not find profile with name {self.config.profile_name}."
)
if profile.store_type == StoreType.REST:
raise ValueError(
"Service cannot be started with REST store type. Make sure you "
"specify a profile with a non-networked persistence backend "
"when trying to start the ZenServer. (use command line flag "
"`--profile=$PROFILE_NAME` or set the env variable "
f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
"other than the currently active one)"
)
logger.info(
"Starting ZenServer as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
# this is the only way to pass information into the FastAPI app??
os.environ["ZENML_PROFILE_NAME"] = self.config.profile_name
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host=self.config.ip_address,
port=self.endpoint.status.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenServer stopped. Resuming normal execution.")
ZenServerConfig (LocalDaemonServiceConfig)
pydantic-model
ZenServer deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
ip_address |
str |
The IP address where the ZenServer will listen for connections |
port |
int |
Port at which the the ZenServer is accepting connections |
profile_name |
str |
name of the Profile to use to store data. |
Source code in zenml/zen_server/zen_server.py
class ZenServerConfig(LocalDaemonServiceConfig):
"""ZenServer deployment configuration.
Attributes:
ip_address: The IP address where the ZenServer will listen for
connections
port: Port at which the the ZenServer is accepting connections
profile_name: name of the Profile to use to store data.
"""
ip_address: str = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
port: int = 8000
profile_name: str = Field(
default_factory=lambda: Repository().active_profile_name
)
ZenServerEndpoint (LocalDaemonServiceEndpoint)
pydantic-model
A service endpoint exposed by the ZenServer daemon.
Attributes:
Name | Type | Description |
---|---|---|
config |
ZenServerEndpointConfig |
service endpoint configuration |
monitor |
HTTPEndpointHealthMonitor |
optional service endpoint health monitor |
Source code in zenml/zen_server/zen_server.py
class ZenServerEndpoint(LocalDaemonServiceEndpoint):
"""A service endpoint exposed by the ZenServer daemon.
Attributes:
config: service endpoint configuration
monitor: optional service endpoint health monitor
"""
config: ZenServerEndpointConfig
monitor: HTTPEndpointHealthMonitor
@property
def endpoint_uri(self) -> Optional[str]:
"""Return the endpoint URI.
Returns:
The endpoint URI or None if the endpoint is not available.
"""
uri = self.status.uri
if not uri:
return None
return f"{uri}{self.config.zen_server_uri_path}"
endpoint_uri: Optional[str]
property
readonly
Return the endpoint URI.
Returns:
Type | Description |
---|---|
Optional[str] |
The endpoint URI or None if the endpoint is not available. |
ZenServerEndpointConfig (LocalDaemonServiceEndpointConfig)
pydantic-model
ZenServer endpoint configuration.
Attributes:
Name | Type | Description |
---|---|---|
zen_server_uri_path |
str |
URI path for the ZenServer |
Source code in zenml/zen_server/zen_server.py
class ZenServerEndpointConfig(LocalDaemonServiceEndpointConfig):
"""ZenServer endpoint configuration.
Attributes:
zen_server_uri_path: URI path for the ZenServer
"""
zen_server_uri_path: str
zen_server_api
Zen Server API.
ErrorModel (BaseModel)
pydantic-model
Base class for error responses.
Source code in zenml/zen_server/zen_server_api.py
class ErrorModel(BaseModel):
"""Base class for error responses."""
detail: Any
add_user_to_team(name, user)
async
Adds a user to a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the team. |
required |
user |
User |
User to add to the team. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(TEAMS + "/{name}/users", responses={404: error_response})
async def add_user_to_team(name: str, user: User) -> None:
"""Adds a user to a team.
Args:
name: Name of the team.
user: User to add to the team.
Raises:
not_found: when none are found
"""
try:
zen_store.add_user_to_team(team_name=name, user_name=user.name)
except KeyError as error:
raise not_found(error) from error
assign_role(data)
async
Assigns a role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[str, Any] |
Data containing the role assignment. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
ROLE_ASSIGNMENTS,
responses={404: error_response},
)
async def assign_role(data: Dict[str, Any]) -> None:
"""Assigns a role.
Args:
data: Data containing the role assignment.
Raises:
not_found: when none are found
"""
role_name = data["role_name"]
entity_name = data["entity_name"]
project_name = data.get("project_name")
is_user = data.get("is_user", True)
try:
zen_store.assign_role(
role_name=role_name,
entity_name=entity_name,
project_name=project_name,
is_user=is_user,
)
except KeyError as error:
raise not_found(error) from error
authorize(credentials=Depends(HTTPBasic))
Authorizes any request to the ZenServer.
Right now this method only checks if the username provided as part of http basic auth credentials is registered in the ZenStore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
credentials |
HTTPBasicCredentials |
HTTP basic auth credentials passed to the request. |
Depends(HTTPBasic) |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the username is not registered in the ZenStore. |
Source code in zenml/zen_server/zen_server_api.py
def authorize(credentials: HTTPBasicCredentials = Depends(security)) -> None:
"""Authorizes any request to the ZenServer.
Right now this method only checks if the username provided as part of http
basic auth credentials is registered in the ZenStore.
Args:
credentials: HTTP basic auth credentials passed to the request.
Raises:
HTTPException: If the username is not registered in the ZenStore.
"""
try:
zen_store.get_user(credentials.username)
except KeyError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username.",
)
conflict(error)
Convert an Exception to a HTTP 409 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 409. |
Source code in zenml/zen_server/zen_server_api.py
def conflict(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 409 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 409.
"""
return HTTPException(status_code=409, detail=error_detail(error))
create_flavor(flavor)
async
Creates a flavor.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor |
FlavorWrapper |
Flavor to create. |
required |
Returns:
Type | Description |
---|---|
FlavorWrapper |
The created flavor. |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
FLAVORS,
response_model=FlavorWrapper,
responses={409: error_response},
)
async def create_flavor(flavor: FlavorWrapper) -> FlavorWrapper:
"""Creates a flavor.
# noqa: DAR401
Args:
flavor: Flavor to create.
Returns:
The created flavor.
"""
try:
return zen_store.create_flavor(
name=flavor.name,
source=flavor.source,
stack_component_type=flavor.type,
)
except EntityExistsError as error:
raise conflict(error) from error
create_project(project)
async
Creates a project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
Project |
Project to create. |
required |
Returns:
Type | Description |
---|---|
Project |
The created project. |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
PROJECTS,
response_model=Project,
responses={409: error_response},
)
async def create_project(project: Project) -> Project:
"""Creates a project.
# noqa: DAR401
Args:
project: Project to create.
Returns:
The created project.
"""
try:
return zen_store.create_project(
project_name=project.name, description=project.description
)
except EntityExistsError as error:
raise conflict(error) from error
create_role(role)
async
Creates a role.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
Role |
Role to create. |
required |
Returns:
Type | Description |
---|---|
Role |
The created role. |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
ROLES,
response_model=Role,
responses={409: error_response},
)
async def create_role(role: Role) -> Role:
"""Creates a role.
# noqa: DAR401
Args:
role: Role to create.
Returns:
The created role.
"""
try:
return zen_store.create_role(role.name)
except EntityExistsError as error:
raise conflict(error) from error
create_team(team)
async
Creates a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team |
Team |
Team to create. |
required |
Returns:
Type | Description |
---|---|
Team |
The created team. |
Exceptions:
Type | Description |
---|---|
conflict |
when the team already exists |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
TEAMS,
response_model=Team,
responses={409: error_response},
)
async def create_team(team: Team) -> Team:
"""Creates a team.
Args:
team: Team to create.
Returns:
The created team.
Raises:
conflict: when the team already exists
"""
try:
return zen_store.create_team(team.name)
except EntityExistsError as error:
raise conflict(error) from error
create_user(user)
async
Creates a user.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
User |
User to create. |
required |
Returns:
Type | Description |
---|---|
User |
The created user. |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
USERS,
response_model=User,
responses={409: error_response},
)
async def create_user(user: User) -> User:
"""Creates a user.
# noqa: DAR401
Args:
user: User to create.
Returns:
The created user.
"""
try:
return zen_store.create_user(user.name)
except EntityExistsError as error:
raise conflict(error) from error
delete_project(name)
async
Deletes a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the project. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(PROJECTS + "/{name}", responses={404: error_response})
async def delete_project(name: str) -> None:
"""Deletes a project.
Args:
name: Name of the project.
Raises:
not_found: when none are found
"""
try:
zen_store.delete_project(project_name=name)
except KeyError as error:
raise not_found(error) from error
delete_role(name)
async
Deletes a role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the role. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(ROLES + "/{name}", responses={404: error_response})
async def delete_role(name: str) -> None:
"""Deletes a role.
Args:
name: Name of the role.
Raises:
not_found: when none are found
"""
try:
zen_store.delete_role(role_name=name)
except KeyError as error:
raise not_found(error) from error
delete_store_association(artifact_store_uuid, metadata_store_uuid)
async
Deletes a store association.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_store_uuid |
UUID |
The UUID of the selected artifact store. |
required |
metadata_store_uuid |
UUID |
The UUID of the selected metadata store. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(
STORE_ASSOCIATIONS + "/{artifact_store_uuid}/{metadata_store_uuid}",
responses={404: error_response},
)
async def delete_store_association(
artifact_store_uuid: UUID,
metadata_store_uuid: UUID,
) -> None:
"""Deletes a store association.
Args:
artifact_store_uuid: The UUID of the selected artifact store.
metadata_store_uuid: The UUID of the selected metadata store.
Raises:
not_found: when none are found
"""
try:
zen_store.delete_store_association_for_artifact_and_metadata_store(
artifact_store_uuid=artifact_store_uuid,
metadata_store_uuid=metadata_store_uuid,
)
except KeyError as error:
raise not_found(error) from error
delete_team(name)
async
Deletes a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the team. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(TEAMS + "/{name}", responses={404: error_response})
async def delete_team(name: str) -> None:
"""Deletes a team.
Args:
name: Name of the team.
Raises:
not_found: when none are found
"""
try:
zen_store.delete_team(team_name=name)
except KeyError as error:
raise not_found(error) from error
delete_user(name)
async
Deletes a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the user. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(USERS + "/{name}", responses={404: error_response})
async def delete_user(name: str) -> None:
"""Deletes a user.
Args:
name: Name of the user.
Raises:
not_found: when none are found
"""
try:
zen_store.delete_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
deregister_stack(name)
async
Deregisters a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the stack to deregister. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(STACKS + "/{name}", responses={404: error_response})
async def deregister_stack(name: str) -> None:
"""Deregisters a stack.
Args:
name: Name of the stack to deregister.
Raises:
not_found: when none are found
"""
try:
zen_store.deregister_stack(name)
except KeyError as error:
raise not_found(error) from error
deregister_stack_component(component_type, name)
async
Deregisters a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
Type of the stack component. |
required |
name |
str |
Name of the stack component. |
required |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
conflict |
when the stack component is still in use |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(
STACK_COMPONENTS + "/{component_type}/{name}",
responses={404: error_response, 409: error_response},
)
async def deregister_stack_component(
component_type: StackComponentType, name: str
) -> None:
"""Deregisters a stack component.
Args:
component_type: Type of the stack component.
name: Name of the stack component.
Returns:
None
Raises:
not_found: when none are found
conflict: when the stack component is still in use
"""
try:
return zen_store.deregister_stack_component(component_type, name=name)
except KeyError as error:
raise not_found(error) from error
except ValueError as error:
raise conflict(error) from error
error_detail(error)
Convert an Exception to API representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of strings representing the error. |
Source code in zenml/zen_server/zen_server_api.py
def error_detail(error: Exception) -> List[str]:
"""Convert an Exception to API representation.
Args:
error: Exception to convert.
Returns:
List of strings representing the error.
"""
return [type(error).__name__] + [str(a) for a in error.args]
flavors()
async
Get all flavors.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
All flavors. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(FLAVORS, response_model=List[FlavorWrapper])
async def flavors() -> List[FlavorWrapper]:
"""Get all flavors.
Returns:
All flavors.
"""
return zen_store.flavors
get_flavor_by_type(component_type)
async
Returns all flavors of a given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
Type of the component. |
required |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
The requested flavors. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(FLAVORS + "/{component_type}", responses={404: error_response})
async def get_flavor_by_type(
component_type: StackComponentType,
) -> List[FlavorWrapper]:
"""Returns all flavors of a given type.
Args:
component_type: Type of the component.
Returns:
The requested flavors.
Raises:
not_found: when none are found.
"""
try:
return zen_store.get_flavors_by_type(component_type=component_type)
except KeyError as error:
raise not_found(error) from error
get_flavor_by_type_and_name(component_type, name)
async
Returns a flavor of a given type and name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
Type of the component |
required |
name |
str |
Name of the flavor. |
required |
Returns:
Type | Description |
---|---|
FlavorWrapper |
The requested flavor. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
FLAVORS + "/{component_type}/{name}", responses={404: error_response}
)
async def get_flavor_by_type_and_name(
component_type: StackComponentType, name: str
) -> FlavorWrapper:
"""Returns a flavor of a given type and name.
Args:
component_type: Type of the component
name: Name of the flavor.
Returns:
The requested flavor.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_flavor_by_name_and_type(
component_type=component_type, flavor_name=name
)
except KeyError as error:
raise not_found(error) from error
get_project(project_name)
async
Get a project for given name.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name |
str |
Name of the project. |
required |
Returns:
Type | Description |
---|---|
Project |
The requested project. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
PROJECTS + "/{project_name}",
response_model=Project,
responses={404: error_response},
)
async def get_project(project_name: str) -> Project:
"""Get a project for given name.
# noqa: DAR401
Args:
project_name: Name of the project.
Returns:
The requested project.
"""
try:
return zen_store.get_project(project_name)
except KeyError as error:
raise not_found(error) from error
get_role(name)
async
Gets a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the role. |
required |
Returns:
Type | Description |
---|---|
Role |
The requested role. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(ROLES + "/{name}", responses={404: error_response})
async def get_role(name: str) -> Role:
"""Gets a specific role.
Args:
name: Name of the role.
Returns:
The requested role.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_role(role_name=name)
except KeyError as error:
raise not_found(error) from error
get_stack(name)
async
Returns the requested stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the stack. |
required |
Returns:
Type | Description |
---|---|
StackWrapper |
The requested stack. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
STACKS + "/{name}",
response_model=StackWrapper,
responses={404: error_response},
)
async def get_stack(name: str) -> StackWrapper:
"""Returns the requested stack.
Args:
name: Name of the stack.
Returns:
The requested stack.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_stack(name)
except KeyError as error:
raise not_found(error) from error
get_stack_component(component_type, name)
async
Returns the requested stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
Type of the stack component. |
required |
name |
str |
Name of the stack component. |
required |
Returns:
Type | Description |
---|---|
ComponentWrapper |
The requested stack component. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
STACK_COMPONENTS + "/{component_type}/{name}",
response_model=ComponentWrapper,
responses={404: error_response},
)
async def get_stack_component(
component_type: StackComponentType, name: str
) -> ComponentWrapper:
"""Returns the requested stack component.
Args:
component_type: Type of the stack component.
name: Name of the stack component.
Returns:
The requested stack component.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_stack_component(component_type, name=name)
except KeyError as error:
raise not_found(error) from error
get_stack_components(component_type)
async
Returns all stack components for the requested type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
Type of the stack components. |
required |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.component_wrapper.ComponentWrapper] |
All stack components for the requested type. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
STACK_COMPONENTS + "/{component_type}",
response_model=List[ComponentWrapper],
)
async def get_stack_components(
component_type: StackComponentType,
) -> List[ComponentWrapper]:
"""Returns all stack components for the requested type.
Args:
component_type: Type of the stack components.
Returns:
All stack components for the requested type.
"""
return zen_store.get_stack_components(component_type)
get_stack_configuration(name)
async
Returns the configuration for the requested stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the stack. |
required |
Returns:
Type | Description |
---|---|
Dict[zenml.enums.StackComponentType, str] |
Configuration for the requested stack. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
STACK_CONFIGURATIONS + "/{name}",
response_model=Dict[StackComponentType, str],
responses={404: error_response},
)
async def get_stack_configuration(name: str) -> Dict[StackComponentType, str]:
"""Returns the configuration for the requested stack.
Args:
name: Name of the stack.
Returns:
Configuration for the requested stack.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_stack_configuration(name)
except KeyError as error:
raise not_found(error) from error
get_team(name)
async
Gets a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the team. |
required |
Returns:
Type | Description |
---|---|
Team |
The requested team. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(TEAMS + "/{name}", responses={404: error_response})
async def get_team(name: str) -> Team:
"""Gets a specific team.
Args:
name: Name of the team.
Returns:
The requested team.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_team(team_name=name)
except KeyError as error:
raise not_found(error) from error
get_user(name)
async
Gets a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the user. |
required |
Returns:
Type | Description |
---|---|
User |
The requested user. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(USERS + "/{name}", responses={404: error_response})
async def get_user(name: str) -> User:
"""Gets a specific user.
Args:
name: Name of the user.
Returns:
The requested user.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
health()
async
Get health status of the server.
Returns:
Type | Description |
---|---|
str |
String representing the health status of the server. |
Source code in zenml/zen_server/zen_server_api.py
@app.head("/health")
@app.get("/health")
async def health() -> str:
"""Get health status of the server.
Returns:
String representing the health status of the server.
"""
return "OK"
not_found(error)
Convert an Exception to a HTTP 404 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 404. |
Source code in zenml/zen_server/zen_server_api.py
def not_found(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 404 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 404.
"""
return HTTPException(status_code=404, detail=error_detail(error))
pipeline_run(pipeline_name, run_name, project_name=None)
async
Returns a single pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline. |
required |
run_name |
str |
Name of the run. |
required |
project_name |
Optional[str] |
Name of the project. |
None |
Returns:
Type | Description |
---|---|
PipelineRunWrapper |
The requested pipeline run. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
PIPELINE_RUNS + "/{pipeline_name}/{run_name}",
response_model=PipelineRunWrapper,
responses={404: error_response},
)
async def pipeline_run(
pipeline_name: str, run_name: str, project_name: Optional[str] = None
) -> PipelineRunWrapper:
"""Returns a single pipeline run.
Args:
pipeline_name: Name of the pipeline.
run_name: Name of the run.
project_name: Name of the project.
Returns:
The requested pipeline run.
Raises:
not_found: when none are found.
"""
try:
return zen_store.get_pipeline_run(
pipeline_name=pipeline_name,
run_name=run_name,
project_name=project_name,
)
except KeyError as error:
raise not_found(error) from error
pipeline_runs(pipeline_name, project_name=None)
async
Returns all runs for a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline. |
required |
project_name |
Optional[str] |
Name of the project. |
None |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.pipeline_models.PipelineRunWrapper] |
All runs for a pipeline. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
PIPELINE_RUNS + "/{pipeline_name}", response_model=List[PipelineRunWrapper]
)
async def pipeline_runs(
pipeline_name: str, project_name: Optional[str] = None
) -> List[PipelineRunWrapper]:
"""Returns all runs for a pipeline.
Args:
pipeline_name: Name of the pipeline.
project_name: Name of the project.
Returns:
All runs for a pipeline.
"""
return zen_store.get_pipeline_runs(
pipeline_name=pipeline_name, project_name=project_name
)
projects()
async
Returns all projects.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.Project] |
All projects. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(PROJECTS, response_model=List[Project])
async def projects() -> List[Project]:
"""Returns all projects.
Returns:
All projects.
"""
return zen_store.projects
register_pipeline_run(pipeline_run)
async
Registers a pipeline run.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunWrapper |
Pipeline run to register. |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
PIPELINE_RUNS,
responses={409: error_response},
)
async def register_pipeline_run(pipeline_run: PipelineRunWrapper) -> None:
"""Registers a pipeline run.
# noqa: DAR401
Args:
pipeline_run: Pipeline run to register.
Returns:
None
"""
try:
return zen_store.register_pipeline_run(pipeline_run)
except EntityExistsError as error:
raise conflict(error) from error
register_stack(stack)
async
Registers a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
StackWrapper |
Stack to register. |
required |
Exceptions:
Type | Description |
---|---|
conflict |
when a stack with the same name is already registered |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(
STACKS,
responses={409: error_response},
)
async def register_stack(stack: StackWrapper) -> None:
"""Registers a stack.
Args:
stack: Stack to register.
Raises:
conflict: when a stack with the same name is already registered
"""
try:
zen_store.register_stack(stack)
except (StackExistsError, StackComponentExistsError) as error:
raise conflict(error) from error
register_stack_component(component)
async
Registers a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
ComponentWrapper |
Stack component to register. |
required |
Exceptions:
Type | Description |
---|---|
conflict |
when the component already exists. |
Source code in zenml/zen_server/zen_server_api.py
@authed.post(STACK_COMPONENTS, responses={409: error_response})
async def register_stack_component(
component: ComponentWrapper,
) -> None:
"""Registers a stack component.
Args:
component: Stack component to register.
Raises:
conflict: when the component already exists.
"""
try:
zen_store.register_stack_component(component)
except StackComponentExistsError as error:
raise conflict(error) from error
remove_user_from_team(team_name, user_name)
async
Removes a user from a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name |
str |
Name of the team. |
required |
user_name |
str |
Name of the user. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(
TEAMS + "/{team_name}/users/{user_name}", responses={404: error_response}
)
async def remove_user_from_team(team_name: str, user_name: str) -> None:
"""Removes a user from a team.
Args:
team_name: Name of the team.
user_name: Name of the user.
Raises:
not_found: when none are found
"""
try:
zen_store.remove_user_from_team(
team_name=team_name, user_name=user_name
)
except KeyError as error:
raise not_found(error) from error
revoke_role(data)
async
Revokes a role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[str, Any] |
Data containing the role assignment. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.delete(ROLE_ASSIGNMENTS, responses={404: error_response})
async def revoke_role(data: Dict[str, Any]) -> None:
"""Revokes a role.
Args:
data: Data containing the role assignment.
Raises:
not_found: when none are found
"""
role_name = data["role_name"]
entity_name = data["entity_name"]
project_name = data.get("project_name")
is_user = data.get("is_user", True)
try:
zen_store.revoke_role(
role_name=role_name,
entity_name=entity_name,
project_name=project_name,
is_user=is_user,
)
except KeyError as error:
raise not_found(error) from error
role_assignments()
async
Returns all role assignments.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.RoleAssignment] |
All role assignments. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(ROLE_ASSIGNMENTS, response_model=List[RoleAssignment])
async def role_assignments() -> List[RoleAssignment]:
"""Returns all role assignments.
Returns:
All role assignments.
"""
return zen_store.role_assignments
role_assignments_for_team(name, project_name=None)
async
Gets all role assignments for a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the team. |
required |
project_name |
Optional[str] |
Name of the project. |
None |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.RoleAssignment] |
All role assignments for the team. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
TEAMS + "/{name}/role_assignments",
response_model=List[RoleAssignment],
responses={404: error_response},
)
async def role_assignments_for_team(
name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
"""Gets all role assignments for a team.
Args:
name: Name of the team.
project_name: Name of the project.
Returns:
All role assignments for the team.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_role_assignments_for_team(
team_name=name, project_name=project_name
)
except KeyError as error:
raise not_found(error) from error
role_assignments_for_user(name, project_name=None)
async
Returns all role assignments for a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the user. |
required |
project_name |
Optional[str] |
Name of the project. |
None |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.RoleAssignment] |
All role assignments for the user. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
USERS + "/{name}/role_assignments",
response_model=List[RoleAssignment],
responses={404: error_response},
)
async def role_assignments_for_user(
name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
"""Returns all role assignments for a user.
Args:
name: Name of the user.
project_name: Name of the project.
Returns:
All role assignments for the user.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_role_assignments_for_user(
user_name=name, project_name=project_name, include_team_roles=False
)
except KeyError as error:
raise not_found(error) from error
roles()
async
Returns all roles.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.Role] |
All roles. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(ROLES, response_model=List[Role])
async def roles() -> List[Role]:
"""Returns all roles.
Returns:
All roles.
"""
return zen_store.roles
service_info()
async
Returns the profile configuration for this service.
Returns:
Type | Description |
---|---|
ProfileConfiguration |
Profile configuration for this service. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get("/", response_model=ProfileConfiguration)
async def service_info() -> ProfileConfiguration:
"""Returns the profile configuration for this service.
Returns:
Profile configuration for this service.
"""
return profile
stack_configurations()
async
Returns configurations for all stacks.
Returns:
Type | Description |
---|---|
Dict[str, Dict[zenml.enums.StackComponentType, str]] |
Configurations for all stacks. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
STACK_CONFIGURATIONS,
response_model=Dict[str, Dict[StackComponentType, str]],
)
async def stack_configurations() -> Dict[str, Dict[StackComponentType, str]]:
"""Returns configurations for all stacks.
Returns:
Configurations for all stacks.
"""
return zen_store.stack_configurations
stacks()
async
Returns all stacks.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.stack_wrapper.StackWrapper] |
All stacks. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(STACKS, response_model=List[StackWrapper])
async def stacks() -> List[StackWrapper]:
"""Returns all stacks.
Returns:
All stacks.
"""
return zen_store.stacks
stacks_empty()
async
Returns whether stacks are registered or not.
Returns:
Type | Description |
---|---|
bool |
True if there are no stacks registered, False otherwise. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(STACKS_EMPTY, response_model=bool)
async def stacks_empty() -> bool:
"""Returns whether stacks are registered or not.
Returns:
True if there are no stacks registered, False otherwise.
"""
return zen_store.stacks_empty
store_associations()
async
Returns all store associations.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.stack_wrapper.StoreAssociation] |
All store associations. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(STORE_ASSOCIATIONS, response_model=List[StoreAssociation])
async def store_associations() -> List[StoreAssociation]:
"""Returns all store associations.
Returns:
All store associations.
"""
return zen_store.store_associations
teams()
async
Returns all teams.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.Team] |
All teams. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(TEAMS, response_model=List[Team])
async def teams() -> List[Team]:
"""Returns all teams.
Returns:
All teams.
"""
return zen_store.teams
teams_for_user(name)
async
Returns all teams for a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the user. |
required |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.Team] |
All teams for the user. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
USERS + "/{name}/teams",
response_model=List[Team],
responses={404: error_response},
)
async def teams_for_user(name: str) -> List[Team]:
"""Returns all teams for a user.
Args:
name: Name of the user.
Returns:
All teams for the user.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_teams_for_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
update_stack(stack, name)
async
Updates a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
StackWrapper |
Stack to update. |
required |
name |
str |
Name of the stack. |
required |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.put(
STACKS + "/{name}",
responses={404: error_response},
)
async def update_stack(stack: StackWrapper, name: str) -> None:
"""Updates a stack.
Args:
stack: Stack to update.
name: Name of the stack.
Raises:
not_found: when none are found
"""
try:
zen_store.update_stack(name, stack)
except DoesNotExistException as error:
raise not_found(error) from error
update_stack_component(name, component_type, component)
async
Updates a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the stack component. |
required |
component_type |
StackComponentType |
Type of the stack component. |
required |
component |
ComponentWrapper |
Stack component to update. |
required |
Returns:
Type | Description |
---|---|
Dict[str, str] |
Updated stack component. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.put(
STACK_COMPONENTS + "/{component_type}/{name}",
response_model=Dict[str, str],
responses={404: error_response},
)
async def update_stack_component(
name: str,
component_type: StackComponentType,
component: ComponentWrapper,
) -> Dict[str, str]:
"""Updates a stack component.
Args:
name: Name of the stack component.
component_type: Type of the stack component.
component: Stack component to update.
Returns:
Updated stack component.
Raises:
not_found: when none are found
"""
try:
return zen_store.update_stack_component(name, component_type, component)
except KeyError as error:
raise not_found(error) from error
users()
async
Returns all users.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.User] |
All users. |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(USERS, response_model=List[User])
async def users() -> List[User]:
"""Returns all users.
Returns:
All users.
"""
return zen_store.users
users_for_team(name)
async
Returns all users for a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the team. |
required |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.user_management_models.User] |
All users for the team. |
Exceptions:
Type | Description |
---|---|
not_found |
when none are found |
Source code in zenml/zen_server/zen_server_api.py
@authed.get(
TEAMS + "/{name}/users",
response_model=List[User],
responses={404: error_response},
)
async def users_for_team(name: str) -> List[User]:
"""Returns all users for a team.
Args:
name: Name of the team.
Returns:
All users for the team.
Raises:
not_found: when none are found
"""
try:
return zen_store.get_users_for_team(team_name=name)
except KeyError as error:
raise not_found(error) from error