Github
zenml.integrations.github
special
Initialization of the GitHub ZenML integration.
GitHubIntegration (Integration)
Definition of GitHub integration for ZenML.
Source code in zenml/integrations/github/__init__.py
class GitHubIntegration(Integration):
"""Definition of GitHub integration for ZenML."""
NAME = GITHUB
REQUIREMENTS: List[str] = ["pygithub"]
@classmethod
def plugin_flavors(cls) -> List[Type[BasePluginFlavor]]:
"""Declare the event flavors for the github integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.github.plugins import GithubWebhookEventSourceFlavor
return [GithubWebhookEventSourceFlavor]
plugin_flavors()
classmethod
Declare the event flavors for the github integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/github/__init__.py
@classmethod
def plugin_flavors(cls) -> List[Type[BasePluginFlavor]]:
"""Declare the event flavors for the github integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.github.plugins import GithubWebhookEventSourceFlavor
return [GithubWebhookEventSourceFlavor]
code_repositories
special
Initialization of the ZenML GitHub code repository.
github_code_repository
GitHub code repository.
GitHubCodeRepository (BaseCodeRepository)
GitHub code repository.
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
class GitHubCodeRepository(BaseCodeRepository):
"""GitHub code repository."""
@property
def config(self) -> GitHubCodeRepositoryConfig:
"""Returns the `GitHubCodeRepositoryConfig` config.
Returns:
The configuration.
"""
return GitHubCodeRepositoryConfig(**self._config)
@property
def github_repo(self) -> Repository:
"""The GitHub repository object from the GitHub API.
Returns:
The GitHub repository.
Raises:
RuntimeError: If the repository cannot be found.
"""
try:
github_repository = self._github_session.get_repo(
f"{self.config.owner}/{self.config.repository}"
)
except GithubException as e:
raise RuntimeError(
f"An error occurred while getting the repository: {str(e)}"
)
return github_repository
def check_github_repo_public(self, owner: str, repo: str) -> None:
"""Checks if a GitHub repository is public.
Args:
owner: The owner of the repository.
repo: The name of the repository.
Raises:
RuntimeError: If the repository is not public.
"""
url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(url, timeout=7)
try:
if response.status_code == 200:
pass
else:
raise RuntimeError(
"It is not possible to access this repository as it does not appear to be public."
"Access to private repositories is only possible when a token is provided. Please provide a token and try again"
)
except Exception as e:
raise RuntimeError(
f"An error occurred while checking if repository is public: {str(e)}"
)
def login(
self,
) -> None:
"""Logs in to GitHub using the token provided in the config.
Raises:
RuntimeError: If the login fails.
"""
try:
self._github_session = Github(self.config.token)
if self.config.token:
user = self._github_session.get_user().login
logger.debug(f"Logged in as {user}")
else:
self.check_github_repo_public(
self.config.owner, self.config.repository
)
except Exception as e:
raise RuntimeError(f"An error occurred while logging in: {str(e)}")
def download_files(
self, commit: str, directory: str, repo_sub_directory: Optional[str]
) -> None:
"""Downloads files from a commit to a local directory.
Args:
commit: The commit to download.
directory: The directory to download to.
repo_sub_directory: The sub directory to download from.
Raises:
RuntimeError: If the repository sub directory is invalid.
"""
contents = self.github_repo.get_contents(
repo_sub_directory or "", ref=commit
)
if not isinstance(contents, List):
raise RuntimeError("Invalid repository subdirectory.")
os.makedirs(directory, exist_ok=True)
for content in contents:
local_path = os.path.join(directory, content.name)
if content.type == "dir":
self.download_files(
commit=commit,
directory=local_path,
repo_sub_directory=content.path,
)
else:
try:
with open(local_path, "wb") as f:
f.write(content.decoded_content)
except (GithubException, IOError, AssertionError) as e:
logger.error("Error processing %s: %s", content.path, e)
def get_local_context(self, path: str) -> Optional[LocalRepositoryContext]:
"""Gets the local repository context.
Args:
path: The path to the local repository.
Returns:
The local repository context.
"""
return LocalGitRepositoryContext.at(
path=path,
code_repository_id=self.id,
remote_url_validation_callback=self.check_remote_url,
)
def check_remote_url(self, url: str) -> bool:
"""Checks whether the remote url matches the code repository.
Args:
url: The remote url.
Returns:
Whether the remote url is correct.
"""
https_url = f"https://{self.config.host}/{self.config.owner}/{self.config.repository}.git"
if url == https_url:
return True
ssh_regex = re.compile(
f".*@{self.config.host}:{self.config.owner}/{self.config.repository}.git"
)
if ssh_regex.fullmatch(url):
return True
return False
config: GitHubCodeRepositoryConfig
property
readonly
Returns the GitHubCodeRepositoryConfig
config.
Returns:
Type | Description |
---|---|
GitHubCodeRepositoryConfig |
The configuration. |
github_repo: github.Repository.Repository
property
readonly
The GitHub repository object from the GitHub API.
Returns:
Type | Description |
---|---|
github.Repository.Repository |
The GitHub repository. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the repository cannot be found. |
check_github_repo_public(self, owner, repo)
Checks if a GitHub repository is public.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
owner |
str |
The owner of the repository. |
required |
repo |
str |
The name of the repository. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the repository is not public. |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def check_github_repo_public(self, owner: str, repo: str) -> None:
"""Checks if a GitHub repository is public.
Args:
owner: The owner of the repository.
repo: The name of the repository.
Raises:
RuntimeError: If the repository is not public.
"""
url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(url, timeout=7)
try:
if response.status_code == 200:
pass
else:
raise RuntimeError(
"It is not possible to access this repository as it does not appear to be public."
"Access to private repositories is only possible when a token is provided. Please provide a token and try again"
)
except Exception as e:
raise RuntimeError(
f"An error occurred while checking if repository is public: {str(e)}"
)
check_remote_url(self, url)
Checks whether the remote url matches the code repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The remote url. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the remote url is correct. |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def check_remote_url(self, url: str) -> bool:
"""Checks whether the remote url matches the code repository.
Args:
url: The remote url.
Returns:
Whether the remote url is correct.
"""
https_url = f"https://{self.config.host}/{self.config.owner}/{self.config.repository}.git"
if url == https_url:
return True
ssh_regex = re.compile(
f".*@{self.config.host}:{self.config.owner}/{self.config.repository}.git"
)
if ssh_regex.fullmatch(url):
return True
return False
download_files(self, commit, directory, repo_sub_directory)
Downloads files from a commit to a local directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
commit |
str |
The commit to download. |
required |
directory |
str |
The directory to download to. |
required |
repo_sub_directory |
Optional[str] |
The sub directory to download from. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the repository sub directory is invalid. |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def download_files(
self, commit: str, directory: str, repo_sub_directory: Optional[str]
) -> None:
"""Downloads files from a commit to a local directory.
Args:
commit: The commit to download.
directory: The directory to download to.
repo_sub_directory: The sub directory to download from.
Raises:
RuntimeError: If the repository sub directory is invalid.
"""
contents = self.github_repo.get_contents(
repo_sub_directory or "", ref=commit
)
if not isinstance(contents, List):
raise RuntimeError("Invalid repository subdirectory.")
os.makedirs(directory, exist_ok=True)
for content in contents:
local_path = os.path.join(directory, content.name)
if content.type == "dir":
self.download_files(
commit=commit,
directory=local_path,
repo_sub_directory=content.path,
)
else:
try:
with open(local_path, "wb") as f:
f.write(content.decoded_content)
except (GithubException, IOError, AssertionError) as e:
logger.error("Error processing %s: %s", content.path, e)
get_local_context(self, path)
Gets the local repository context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the local repository. |
required |
Returns:
Type | Description |
---|---|
Optional[zenml.code_repositories.local_repository_context.LocalRepositoryContext] |
The local repository context. |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def get_local_context(self, path: str) -> Optional[LocalRepositoryContext]:
"""Gets the local repository context.
Args:
path: The path to the local repository.
Returns:
The local repository context.
"""
return LocalGitRepositoryContext.at(
path=path,
code_repository_id=self.id,
remote_url_validation_callback=self.check_remote_url,
)
login(self)
Logs in to GitHub using the token provided in the config.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the login fails. |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def login(
self,
) -> None:
"""Logs in to GitHub using the token provided in the config.
Raises:
RuntimeError: If the login fails.
"""
try:
self._github_session = Github(self.config.token)
if self.config.token:
user = self._github_session.get_user().login
logger.debug(f"Logged in as {user}")
else:
self.check_github_repo_public(
self.config.owner, self.config.repository
)
except Exception as e:
raise RuntimeError(f"An error occurred while logging in: {str(e)}")
GitHubCodeRepositoryConfig (BaseCodeRepositoryConfig)
Config for GitHub code repositories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
The URL of the GitHub instance. |
required | |
owner |
The owner of the repository. |
required | |
repository |
The name of the repository. |
required | |
host |
The host of the repository. |
required | |
token |
The token to access the repository. |
required |
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
class GitHubCodeRepositoryConfig(BaseCodeRepositoryConfig):
"""Config for GitHub code repositories.
Args:
url: The URL of the GitHub instance.
owner: The owner of the repository.
repository: The name of the repository.
host: The host of the repository.
token: The token to access the repository.
"""
url: Optional[str]
owner: str
repository: str
host: Optional[str] = "github.com"
token: Optional[str] = SecretField(default=None)
plugins
special
Github event flavors.
event_sources
special
github_webhook_event_source
Implementation of the github webhook event source.
Commit (BaseModel)
Github Event.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Commit(BaseModel):
"""Github Event."""
id: str
message: str
url: str
author: User
GithubEvent (BaseEvent)
Push Event from Github.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubEvent(BaseEvent):
"""Push Event from Github."""
ref: str
before: str
after: str
repository: Repository
commits: List[Commit]
head_commit: Optional[Commit] = None
tags: Optional[List[Tag]] = None
pull_requests: Optional[List[PullRequest]] = None
model_config = ConfigDict(extra="allow")
@property
def branch(self) -> Optional[str]:
"""The branch the event happened on.
Returns:
The branch name.
"""
if self.ref.startswith("refs/heads/"):
return "/".join(self.ref.split("/")[2:])
return None
@property
def event_type(self) -> Union[GithubEventType, str]:
"""The type of github event.
Args:
The type of the event based on github specific fields.
Returns:
The type of the event.
"""
if self.ref.startswith("refs/heads/"):
return GithubEventType.PUSH_EVENT
elif self.ref.startswith("refs/tags/"):
return GithubEventType.TAG_EVENT
elif self.pull_requests and len(self.pull_requests) > 0:
return GithubEventType.PR_EVENT
else:
return "unknown"
branch: Optional[str]
property
readonly
The branch the event happened on.
Returns:
Type | Description |
---|---|
Optional[str] |
The branch name. |
event_type: Union[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubEventType, str]
property
readonly
The type of github event.
Returns:
Type | Description |
---|---|
Union[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubEventType, str] |
The type of the event. |
GithubEventType (StrEnum)
Collection of all possible Github Events.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubEventType(StrEnum):
"""Collection of all possible Github Events."""
PUSH_EVENT = "push_event"
TAG_EVENT = "tag_event"
PR_EVENT = "pull_request_event"
GithubWebhookEventFilterConfiguration (WebhookEventFilterConfig)
Configuration for github event filters.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventFilterConfiguration(WebhookEventFilterConfig):
"""Configuration for github event filters."""
repo: Optional[str] = None
branch: Optional[str] = None
event_type: Optional[GithubEventType] = None
def event_matches_filter(self, event: BaseEvent) -> bool:
"""Checks the filter against the inbound event.
Args:
event: The incoming event
Returns:
Whether the event matches the filter
"""
if not isinstance(event, GithubEvent):
return False
if self.event_type and event.event_type != self.event_type:
# Mismatch for the action
return False
if self.repo and event.repository.full_name != self.repo:
# Mismatch for the repository
return False
if self.branch and event.branch != self.branch:
# Mismatch for the branch
return False
return True
event_matches_filter(self, event)
Checks the filter against the inbound event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The incoming event |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the event matches the filter |
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
def event_matches_filter(self, event: BaseEvent) -> bool:
"""Checks the filter against the inbound event.
Args:
event: The incoming event
Returns:
Whether the event matches the filter
"""
if not isinstance(event, GithubEvent):
return False
if self.event_type and event.event_type != self.event_type:
# Mismatch for the action
return False
if self.repo and event.repository.full_name != self.repo:
# Mismatch for the repository
return False
if self.branch and event.branch != self.branch:
# Mismatch for the branch
return False
return True
GithubWebhookEventSourceConfiguration (WebhookEventSourceConfig)
Configuration for github source filters.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventSourceConfiguration(WebhookEventSourceConfig):
"""Configuration for github source filters."""
webhook_secret: Optional[str] = Field(
default=None,
title="The webhook secret for the event source.",
)
webhook_secret_id: Optional[UUID] = Field(
default=None,
description="The ID of the secret containing the webhook secret.",
)
rotate_secret: Optional[bool] = Field(
default=None, description="Set to rotate the webhook secret."
)
GithubWebhookEventSourceHandler (BaseWebhookEventSourceHandler)
Handler for all github events.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventSourceHandler(BaseWebhookEventSourceHandler):
"""Handler for all github events."""
@property
def config_class(self) -> Type[GithubWebhookEventSourceConfiguration]:
"""Returns the webhook event source configuration class.
Returns:
The configuration.
"""
return GithubWebhookEventSourceConfiguration
@property
def filter_class(self) -> Type[GithubWebhookEventFilterConfiguration]:
"""Returns the webhook event filter configuration class.
Returns:
The event filter configuration class.
"""
return GithubWebhookEventFilterConfiguration
@property
def flavor_class(self) -> Type[BaseWebhookEventSourceFlavor]:
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
from zenml.integrations.github.plugins.github_webhook_event_source_flavor import (
GithubWebhookEventSourceFlavor,
)
return GithubWebhookEventSourceFlavor
def _interpret_event(self, event: Dict[str, Any]) -> GithubEvent:
"""Converts the generic event body into a event-source specific pydantic model.
Args:
event: The generic event body
Returns:
An instance of the event source specific pydantic model.
Raises:
ValueError: If the event body can not be parsed into the pydantic model.
"""
try:
github_event = GithubEvent(**event)
except ValueError:
raise ValueError("Event did not match the pydantic model.")
else:
return github_event
def _load_payload(
self, raw_body: bytes, headers: Dict[str, str]
) -> Dict[str, Any]:
"""Converts the raw body of the request into a python dictionary.
For github webhooks users can optionally choose to urlencode the
messages. The body will look something like this:
b'payload=%7B%22...%7D%7D'. In this case the header will contain the
following field {'content-type': 'application/x-www-form-urlencoded'}.
Args:
raw_body: The raw event body.
headers: The request headers.
Returns:
An instance of the event source specific pydantic model.
"""
content_type = headers.get("content-type", "")
if content_type == "application/x-www-form-urlencoded":
string_body = urllib.parse.unquote_plus(raw_body.decode())
# Body looks like this: "payload={}", removing the prefix
raw_body = string_body[8:].encode()
return super()._load_payload(raw_body=raw_body, headers=headers)
def _get_webhook_secret(
self, event_source: EventSourceResponse
) -> Optional[str]:
"""Get the webhook secret for the event source.
Args:
event_source: The event source to retrieve the secret for.
Returns:
The webhook secret associated with the event source, or None if a
secret is not applicable.
Raises:
AuthorizationException: If the secret value could not be retrieved.
"""
# Temporary solution to get the secret value for the Event Source
config = self.validate_event_source_configuration(
event_source.configuration
)
assert isinstance(config, GithubWebhookEventSourceConfiguration)
webhook_secret_id = config.webhook_secret_id
if webhook_secret_id is None:
raise AuthorizationException(
f"Webhook secret ID is missing from the event source "
f"configuration for event source '{event_source.id}'."
)
try:
return self.zen_store.get_secret(
secret_id=webhook_secret_id
).secret_values["webhook_secret"]
except KeyError:
logger.exception(
f"Could not retrieve secret value for webhook secret id "
f"'{webhook_secret_id}'"
)
raise AuthorizationException(
"Could not retrieve webhook signature."
)
def _validate_event_source_request(
self, event_source: EventSourceRequest, config: EventSourceConfig
) -> None:
"""Validate an event source request before it is created in the database.
The `webhook_secret`, `webhook_secret_id`, and `rotate_secret`
fields are not allowed in the request.
Args:
event_source: Event source request.
config: Event source configuration instantiated from the request.
Raises:
ValueError: If any of the disallowed fields are present in the
request.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
for field in ["webhook_secret", "webhook_secret_id", "rotate_secret"]:
if getattr(config, field) is not None:
raise ValueError(
f"The `{field}` field is not allowed in the event source "
"request."
)
def _process_event_source_request(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source request after it is created in the database.
Generates a webhook secret and stores it in a secret in the database,
then attaches the secret ID to the event source configuration.
Args:
event_source: Newly created event source
config: Event source configuration instantiated from the response.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert event_source.user is not None, (
"User is not set for event source"
)
secret_key_value = random_str(12)
webhook_secret = SecretRequest(
name=f"event_source-{str(event_source.id)}-{random_str(4)}".lower(),
values={"webhook_secret": secret_key_value},
workspace=event_source.workspace.id,
user=event_source.user.id,
scope=SecretScope.WORKSPACE,
)
secret = self.zen_store.create_secret(webhook_secret)
# Store the secret ID in the event source configuration in the database
event_source_update = EventSourceUpdate.from_response(event_source)
assert event_source_update.configuration is not None
event_source_update.configuration["webhook_secret_id"] = str(secret.id)
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
# Set the webhook secret in the configuration returned to the user
config.webhook_secret = secret_key_value
# Remove hidden field from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _validate_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
event_source_update: EventSourceUpdate,
config_update: EventSourceConfig,
) -> None:
"""Validate an event source update before it is reflected in the database.
Ensure the webhook secret ID is preserved in the updated event source
configuration.
Args:
event_source: Original event source before the update.
config: Event source configuration instantiated from the original
event source.
event_source_update: Event source update request.
config_update: Event source configuration instantiated from the
updated event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert isinstance(config_update, GithubWebhookEventSourceConfiguration)
config_update.webhook_secret_id = config.webhook_secret_id
def _process_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
previous_event_source: EventSourceResponse,
previous_config: EventSourceConfig,
) -> None:
"""Process an event source after it is updated in the database.
If the `rotate_secret` field is set to `True`, the webhook secret is
rotated and the new secret ID is attached to the event source
configuration.
Args:
event_source: Event source after the update.
config: Event source configuration instantiated from the updated
event source.
previous_event_source: Original event source before the update.
previous_config: Event source configuration instantiated from the
original event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert isinstance(
previous_config, GithubWebhookEventSourceConfiguration
)
assert config.webhook_secret_id is not None
if config.rotate_secret:
# In case the secret is being rotated
secret_key_value = random_str(12)
webhook_secret = SecretUpdate(
values={"webhook_secret": secret_key_value}
)
self.zen_store.update_secret(
secret_id=config.webhook_secret_id,
secret_update=webhook_secret,
)
# Remove the `rotate_secret` field from the configuration stored
# in the database
event_source_update = EventSourceUpdate.from_response(event_source)
assert event_source_update.configuration is not None
event_source_update.configuration.pop("rotate_secret")
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
# Set the new secret in the configuration returned to the user
config.webhook_secret = secret_key_value
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _process_event_source_delete(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
force: Optional[bool] = False,
) -> None:
"""Process an event source before it is deleted from the database.
Deletes the associated secret from the database.
Args:
event_source: Event source before the deletion.
config: Validated instantiated event source configuration before
the deletion.
force: Whether to force deprovision the event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
if config.webhook_secret_id is not None:
try:
self.zen_store.delete_secret(
secret_id=config.webhook_secret_id
)
except KeyError:
pass
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _process_event_source_response(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source response before it is returned to the user.
Removes hidden fields from the configuration.
Args:
event_source: Event source response.
config: Event source configuration instantiated from the response.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
config.webhook_secret = None
config_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration]
property
readonly
Returns the webhook event source configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration] |
The configuration. |
filter_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration]
property
readonly
Returns the webhook event filter configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration] |
The event filter configuration class. |
flavor_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor] |
The flavor class of the plugin. |
PullRequest (BaseModel)
Github Pull Request.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class PullRequest(BaseModel):
"""Github Pull Request."""
id: str
number: int
title: str
author: User
merged: bool
merge_commit: Commit
Repository (BaseModel)
Github Repository.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Repository(BaseModel):
"""Github Repository."""
id: int
name: str
full_name: str
html_url: str
Tag (BaseModel)
Github Tag.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Tag(BaseModel):
"""Github Tag."""
id: str
name: str
commit: Commit
User (BaseModel)
Github User.
Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class User(BaseModel):
"""Github User."""
name: str
email: str
username: str
github_webhook_event_source_flavor
Github webhook event source flavor.
GithubWebhookEventSourceFlavor (BaseWebhookEventSourceFlavor)
Enables users to configure github event sources.
Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceFlavor(BaseWebhookEventSourceFlavor):
"""Enables users to configure github event sources."""
FLAVOR: ClassVar[str] = GITHUB_EVENT_FLAVOR
PLUGIN_CLASS: ClassVar[Type[GithubWebhookEventSourceHandler]] = (
GithubWebhookEventSourceHandler
)
# EventPlugin specific
EVENT_SOURCE_CONFIG_CLASS: ClassVar[
Type[GithubWebhookEventSourceConfiguration]
] = GithubWebhookEventSourceConfiguration
EVENT_FILTER_CONFIG_CLASS: ClassVar[
Type[GithubWebhookEventFilterConfiguration]
] = GithubWebhookEventFilterConfiguration
EVENT_FILTER_CONFIG_CLASS (WebhookEventFilterConfig)
Configuration for github event filters.
Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventFilterConfiguration(WebhookEventFilterConfig):
"""Configuration for github event filters."""
repo: Optional[str] = None
branch: Optional[str] = None
event_type: Optional[GithubEventType] = None
def event_matches_filter(self, event: BaseEvent) -> bool:
"""Checks the filter against the inbound event.
Args:
event: The incoming event
Returns:
Whether the event matches the filter
"""
if not isinstance(event, GithubEvent):
return False
if self.event_type and event.event_type != self.event_type:
# Mismatch for the action
return False
if self.repo and event.repository.full_name != self.repo:
# Mismatch for the repository
return False
if self.branch and event.branch != self.branch:
# Mismatch for the branch
return False
return True
event_matches_filter(self, event)
Checks the filter against the inbound event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The incoming event |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the event matches the filter |
Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
def event_matches_filter(self, event: BaseEvent) -> bool:
"""Checks the filter against the inbound event.
Args:
event: The incoming event
Returns:
Whether the event matches the filter
"""
if not isinstance(event, GithubEvent):
return False
if self.event_type and event.event_type != self.event_type:
# Mismatch for the action
return False
if self.repo and event.repository.full_name != self.repo:
# Mismatch for the repository
return False
if self.branch and event.branch != self.branch:
# Mismatch for the branch
return False
return True
EVENT_SOURCE_CONFIG_CLASS (WebhookEventSourceConfig)
Configuration for github source filters.
Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceConfiguration(WebhookEventSourceConfig):
"""Configuration for github source filters."""
webhook_secret: Optional[str] = Field(
default=None,
title="The webhook secret for the event source.",
)
webhook_secret_id: Optional[UUID] = Field(
default=None,
description="The ID of the secret containing the webhook secret.",
)
rotate_secret: Optional[bool] = Field(
default=None, description="Set to rotate the webhook secret."
)
PLUGIN_CLASS (BaseWebhookEventSourceHandler)
Handler for all github events.
Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceHandler(BaseWebhookEventSourceHandler):
"""Handler for all github events."""
@property
def config_class(self) -> Type[GithubWebhookEventSourceConfiguration]:
"""Returns the webhook event source configuration class.
Returns:
The configuration.
"""
return GithubWebhookEventSourceConfiguration
@property
def filter_class(self) -> Type[GithubWebhookEventFilterConfiguration]:
"""Returns the webhook event filter configuration class.
Returns:
The event filter configuration class.
"""
return GithubWebhookEventFilterConfiguration
@property
def flavor_class(self) -> Type[BaseWebhookEventSourceFlavor]:
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
from zenml.integrations.github.plugins.github_webhook_event_source_flavor import (
GithubWebhookEventSourceFlavor,
)
return GithubWebhookEventSourceFlavor
def _interpret_event(self, event: Dict[str, Any]) -> GithubEvent:
"""Converts the generic event body into a event-source specific pydantic model.
Args:
event: The generic event body
Returns:
An instance of the event source specific pydantic model.
Raises:
ValueError: If the event body can not be parsed into the pydantic model.
"""
try:
github_event = GithubEvent(**event)
except ValueError:
raise ValueError("Event did not match the pydantic model.")
else:
return github_event
def _load_payload(
self, raw_body: bytes, headers: Dict[str, str]
) -> Dict[str, Any]:
"""Converts the raw body of the request into a python dictionary.
For github webhooks users can optionally choose to urlencode the
messages. The body will look something like this:
b'payload=%7B%22...%7D%7D'. In this case the header will contain the
following field {'content-type': 'application/x-www-form-urlencoded'}.
Args:
raw_body: The raw event body.
headers: The request headers.
Returns:
An instance of the event source specific pydantic model.
"""
content_type = headers.get("content-type", "")
if content_type == "application/x-www-form-urlencoded":
string_body = urllib.parse.unquote_plus(raw_body.decode())
# Body looks like this: "payload={}", removing the prefix
raw_body = string_body[8:].encode()
return super()._load_payload(raw_body=raw_body, headers=headers)
def _get_webhook_secret(
self, event_source: EventSourceResponse
) -> Optional[str]:
"""Get the webhook secret for the event source.
Args:
event_source: The event source to retrieve the secret for.
Returns:
The webhook secret associated with the event source, or None if a
secret is not applicable.
Raises:
AuthorizationException: If the secret value could not be retrieved.
"""
# Temporary solution to get the secret value for the Event Source
config = self.validate_event_source_configuration(
event_source.configuration
)
assert isinstance(config, GithubWebhookEventSourceConfiguration)
webhook_secret_id = config.webhook_secret_id
if webhook_secret_id is None:
raise AuthorizationException(
f"Webhook secret ID is missing from the event source "
f"configuration for event source '{event_source.id}'."
)
try:
return self.zen_store.get_secret(
secret_id=webhook_secret_id
).secret_values["webhook_secret"]
except KeyError:
logger.exception(
f"Could not retrieve secret value for webhook secret id "
f"'{webhook_secret_id}'"
)
raise AuthorizationException(
"Could not retrieve webhook signature."
)
def _validate_event_source_request(
self, event_source: EventSourceRequest, config: EventSourceConfig
) -> None:
"""Validate an event source request before it is created in the database.
The `webhook_secret`, `webhook_secret_id`, and `rotate_secret`
fields are not allowed in the request.
Args:
event_source: Event source request.
config: Event source configuration instantiated from the request.
Raises:
ValueError: If any of the disallowed fields are present in the
request.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
for field in ["webhook_secret", "webhook_secret_id", "rotate_secret"]:
if getattr(config, field) is not None:
raise ValueError(
f"The `{field}` field is not allowed in the event source "
"request."
)
def _process_event_source_request(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source request after it is created in the database.
Generates a webhook secret and stores it in a secret in the database,
then attaches the secret ID to the event source configuration.
Args:
event_source: Newly created event source
config: Event source configuration instantiated from the response.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert event_source.user is not None, (
"User is not set for event source"
)
secret_key_value = random_str(12)
webhook_secret = SecretRequest(
name=f"event_source-{str(event_source.id)}-{random_str(4)}".lower(),
values={"webhook_secret": secret_key_value},
workspace=event_source.workspace.id,
user=event_source.user.id,
scope=SecretScope.WORKSPACE,
)
secret = self.zen_store.create_secret(webhook_secret)
# Store the secret ID in the event source configuration in the database
event_source_update = EventSourceUpdate.from_response(event_source)
assert event_source_update.configuration is not None
event_source_update.configuration["webhook_secret_id"] = str(secret.id)
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
# Set the webhook secret in the configuration returned to the user
config.webhook_secret = secret_key_value
# Remove hidden field from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _validate_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
event_source_update: EventSourceUpdate,
config_update: EventSourceConfig,
) -> None:
"""Validate an event source update before it is reflected in the database.
Ensure the webhook secret ID is preserved in the updated event source
configuration.
Args:
event_source: Original event source before the update.
config: Event source configuration instantiated from the original
event source.
event_source_update: Event source update request.
config_update: Event source configuration instantiated from the
updated event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert isinstance(config_update, GithubWebhookEventSourceConfiguration)
config_update.webhook_secret_id = config.webhook_secret_id
def _process_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
previous_event_source: EventSourceResponse,
previous_config: EventSourceConfig,
) -> None:
"""Process an event source after it is updated in the database.
If the `rotate_secret` field is set to `True`, the webhook secret is
rotated and the new secret ID is attached to the event source
configuration.
Args:
event_source: Event source after the update.
config: Event source configuration instantiated from the updated
event source.
previous_event_source: Original event source before the update.
previous_config: Event source configuration instantiated from the
original event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
assert isinstance(
previous_config, GithubWebhookEventSourceConfiguration
)
assert config.webhook_secret_id is not None
if config.rotate_secret:
# In case the secret is being rotated
secret_key_value = random_str(12)
webhook_secret = SecretUpdate(
values={"webhook_secret": secret_key_value}
)
self.zen_store.update_secret(
secret_id=config.webhook_secret_id,
secret_update=webhook_secret,
)
# Remove the `rotate_secret` field from the configuration stored
# in the database
event_source_update = EventSourceUpdate.from_response(event_source)
assert event_source_update.configuration is not None
event_source_update.configuration.pop("rotate_secret")
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
# Set the new secret in the configuration returned to the user
config.webhook_secret = secret_key_value
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _process_event_source_delete(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
force: Optional[bool] = False,
) -> None:
"""Process an event source before it is deleted from the database.
Deletes the associated secret from the database.
Args:
event_source: Event source before the deletion.
config: Validated instantiated event source configuration before
the deletion.
force: Whether to force deprovision the event source.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
if config.webhook_secret_id is not None:
try:
self.zen_store.delete_secret(
secret_id=config.webhook_secret_id
)
except KeyError:
pass
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
def _process_event_source_response(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source response before it is returned to the user.
Removes hidden fields from the configuration.
Args:
event_source: Event source response.
config: Event source configuration instantiated from the response.
"""
assert isinstance(config, GithubWebhookEventSourceConfiguration)
# Remove hidden fields from the response
config.rotate_secret = None
config.webhook_secret_id = None
config.webhook_secret = None
config_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration]
property
readonly
Returns the webhook event source configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration] |
The configuration. |
filter_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration]
property
readonly
Returns the webhook event filter configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration] |
The event filter configuration class. |
flavor_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor] |
The flavor class of the plugin. |