Skip to content

Github

zenml.integrations.github special

Initialization of the GitHub ZenML integration.

GitHubIntegration (Integration)

Definition of GitHub integration for ZenML.

Source code in zenml/integrations/github/__init__.py
class GitHubIntegration(Integration):
    """Definition of GitHub integration for ZenML."""

    NAME = GITHUB
    REQUIREMENTS: List[str] = ["pygithub"]


    @classmethod
    def plugin_flavors(cls) -> List[Type[BasePluginFlavor]]:
        """Declare the event flavors for the github integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.github.plugins import GithubWebhookEventSourceFlavor

        return [GithubWebhookEventSourceFlavor]

plugin_flavors() classmethod

Declare the event flavors for the github integration.

Returns:

Type Description
List[Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/github/__init__.py
@classmethod
def plugin_flavors(cls) -> List[Type[BasePluginFlavor]]:
    """Declare the event flavors for the github integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.github.plugins import GithubWebhookEventSourceFlavor

    return [GithubWebhookEventSourceFlavor]

code_repositories special

Initialization of the ZenML GitHub code repository.

github_code_repository

GitHub code repository.

GitHubCodeRepository (BaseCodeRepository)

GitHub code repository.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
class GitHubCodeRepository(BaseCodeRepository):
    """GitHub code repository."""

    @property
    def config(self) -> GitHubCodeRepositoryConfig:
        """Returns the `GitHubCodeRepositoryConfig` config.

        Returns:
            The configuration.
        """
        return GitHubCodeRepositoryConfig(**self._config)

    @property
    def github_repo(self) -> Repository:
        """The GitHub repository object from the GitHub API.

        Returns:
            The GitHub repository.

        Raises:
            RuntimeError: If the repository cannot be found.
        """
        try:
            github_repository = self._github_session.get_repo(
                f"{self.config.owner}/{self.config.repository}"
            )
        except GithubException as e:
            raise RuntimeError(
                f"An error occurred while getting the repository: {str(e)}"
            )
        return github_repository

    def check_github_repo_public(self, owner: str, repo: str) -> None:
        """Checks if a GitHub repository is public.

        Args:
            owner: The owner of the repository.
            repo: The name of the repository.

        Raises:
            RuntimeError: If the repository is not public.
        """
        url = f"https://api.github.com/repos/{owner}/{repo}"
        response = requests.get(url, timeout=7)

        try:
            if response.status_code == 200:
                pass
            else:
                raise RuntimeError(
                    "It is not possible to access this repository as it does not appear to be public."
                    "Access to private repositories is only possible when a token is provided. Please provide a token and try again"
                )
        except Exception as e:
            raise RuntimeError(
                f"An error occurred while checking if repository is public: {str(e)}"
            )

    def login(
        self,
    ) -> None:
        """Logs in to GitHub using the token provided in the config.

        Raises:
            RuntimeError: If the login fails.
        """
        try:
            self._github_session = Github(self.config.token)
            if self.config.token:
                user = self._github_session.get_user().login
                logger.debug(f"Logged in as {user}")
            else:
                self.check_github_repo_public(
                    self.config.owner, self.config.repository
                )
        except Exception as e:
            raise RuntimeError(f"An error occurred while logging in: {str(e)}")

    def download_files(
        self, commit: str, directory: str, repo_sub_directory: Optional[str]
    ) -> None:
        """Downloads files from a commit to a local directory.

        Args:
            commit: The commit to download.
            directory: The directory to download to.
            repo_sub_directory: The sub directory to download from.

        Raises:
            RuntimeError: If the repository sub directory is invalid.
        """
        contents = self.github_repo.get_contents(
            repo_sub_directory or "", ref=commit
        )
        if not isinstance(contents, List):
            raise RuntimeError("Invalid repository subdirectory.")

        os.makedirs(directory, exist_ok=True)

        for content in contents:
            local_path = os.path.join(directory, content.name)
            if content.type == "dir":
                self.download_files(
                    commit=commit,
                    directory=local_path,
                    repo_sub_directory=content.path,
                )
            else:
                try:
                    with open(local_path, "wb") as f:
                        f.write(content.decoded_content)
                except (GithubException, IOError) as e:
                    logger.error("Error processing %s: %s", content.path, e)

    def get_local_context(self, path: str) -> Optional[LocalRepositoryContext]:
        """Gets the local repository context.

        Args:
            path: The path to the local repository.

        Returns:
            The local repository context.
        """
        return LocalGitRepositoryContext.at(
            path=path,
            code_repository_id=self.id,
            remote_url_validation_callback=self.check_remote_url,
        )

    def check_remote_url(self, url: str) -> bool:
        """Checks whether the remote url matches the code repository.

        Args:
            url: The remote url.

        Returns:
            Whether the remote url is correct.
        """
        https_url = f"https://{self.config.host}/{self.config.owner}/{self.config.repository}.git"
        if url == https_url:
            return True

        ssh_regex = re.compile(
            f".*@{self.config.host}:{self.config.owner}/{self.config.repository}.git"
        )
        if ssh_regex.fullmatch(url):
            return True

        return False
config: GitHubCodeRepositoryConfig property readonly

Returns the GitHubCodeRepositoryConfig config.

Returns:

Type Description
GitHubCodeRepositoryConfig

The configuration.

github_repo: github.Repository.Repository property readonly

The GitHub repository object from the GitHub API.

Returns:

Type Description
github.Repository.Repository

The GitHub repository.

Exceptions:

Type Description
RuntimeError

If the repository cannot be found.

check_github_repo_public(self, owner, repo)

Checks if a GitHub repository is public.

Parameters:

Name Type Description Default
owner str

The owner of the repository.

required
repo str

The name of the repository.

required

Exceptions:

Type Description
RuntimeError

If the repository is not public.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def check_github_repo_public(self, owner: str, repo: str) -> None:
    """Checks if a GitHub repository is public.

    Args:
        owner: The owner of the repository.
        repo: The name of the repository.

    Raises:
        RuntimeError: If the repository is not public.
    """
    url = f"https://api.github.com/repos/{owner}/{repo}"
    response = requests.get(url, timeout=7)

    try:
        if response.status_code == 200:
            pass
        else:
            raise RuntimeError(
                "It is not possible to access this repository as it does not appear to be public."
                "Access to private repositories is only possible when a token is provided. Please provide a token and try again"
            )
    except Exception as e:
        raise RuntimeError(
            f"An error occurred while checking if repository is public: {str(e)}"
        )
check_remote_url(self, url)

Checks whether the remote url matches the code repository.

Parameters:

Name Type Description Default
url str

The remote url.

required

Returns:

Type Description
bool

Whether the remote url is correct.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def check_remote_url(self, url: str) -> bool:
    """Checks whether the remote url matches the code repository.

    Args:
        url: The remote url.

    Returns:
        Whether the remote url is correct.
    """
    https_url = f"https://{self.config.host}/{self.config.owner}/{self.config.repository}.git"
    if url == https_url:
        return True

    ssh_regex = re.compile(
        f".*@{self.config.host}:{self.config.owner}/{self.config.repository}.git"
    )
    if ssh_regex.fullmatch(url):
        return True

    return False
download_files(self, commit, directory, repo_sub_directory)

Downloads files from a commit to a local directory.

Parameters:

Name Type Description Default
commit str

The commit to download.

required
directory str

The directory to download to.

required
repo_sub_directory Optional[str]

The sub directory to download from.

required

Exceptions:

Type Description
RuntimeError

If the repository sub directory is invalid.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def download_files(
    self, commit: str, directory: str, repo_sub_directory: Optional[str]
) -> None:
    """Downloads files from a commit to a local directory.

    Args:
        commit: The commit to download.
        directory: The directory to download to.
        repo_sub_directory: The sub directory to download from.

    Raises:
        RuntimeError: If the repository sub directory is invalid.
    """
    contents = self.github_repo.get_contents(
        repo_sub_directory or "", ref=commit
    )
    if not isinstance(contents, List):
        raise RuntimeError("Invalid repository subdirectory.")

    os.makedirs(directory, exist_ok=True)

    for content in contents:
        local_path = os.path.join(directory, content.name)
        if content.type == "dir":
            self.download_files(
                commit=commit,
                directory=local_path,
                repo_sub_directory=content.path,
            )
        else:
            try:
                with open(local_path, "wb") as f:
                    f.write(content.decoded_content)
            except (GithubException, IOError) as e:
                logger.error("Error processing %s: %s", content.path, e)
get_local_context(self, path)

Gets the local repository context.

Parameters:

Name Type Description Default
path str

The path to the local repository.

required

Returns:

Type Description
Optional[zenml.code_repositories.local_repository_context.LocalRepositoryContext]

The local repository context.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def get_local_context(self, path: str) -> Optional[LocalRepositoryContext]:
    """Gets the local repository context.

    Args:
        path: The path to the local repository.

    Returns:
        The local repository context.
    """
    return LocalGitRepositoryContext.at(
        path=path,
        code_repository_id=self.id,
        remote_url_validation_callback=self.check_remote_url,
    )
login(self)

Logs in to GitHub using the token provided in the config.

Exceptions:

Type Description
RuntimeError

If the login fails.

Source code in zenml/integrations/github/code_repositories/github_code_repository.py
def login(
    self,
) -> None:
    """Logs in to GitHub using the token provided in the config.

    Raises:
        RuntimeError: If the login fails.
    """
    try:
        self._github_session = Github(self.config.token)
        if self.config.token:
            user = self._github_session.get_user().login
            logger.debug(f"Logged in as {user}")
        else:
            self.check_github_repo_public(
                self.config.owner, self.config.repository
            )
    except Exception as e:
        raise RuntimeError(f"An error occurred while logging in: {str(e)}")
GitHubCodeRepositoryConfig (BaseCodeRepositoryConfig) pydantic-model

Config for GitHub code repositories.

Parameters:

Name Type Description Default
url

The URL of the GitHub instance.

required
owner

The owner of the repository.

required
repository

The name of the repository.

required
host

The host of the repository.

required
token

The token to access the repository.

required
Source code in zenml/integrations/github/code_repositories/github_code_repository.py
class GitHubCodeRepositoryConfig(BaseCodeRepositoryConfig):
    """Config for GitHub code repositories.

    Args:
        url: The URL of the GitHub instance.
        owner: The owner of the repository.
        repository: The name of the repository.
        host: The host of the repository.
        token: The token to access the repository.
    """

    url: Optional[str]
    owner: str
    repository: str
    host: Optional[str] = "github.com"
    token: Optional[str] = SecretField()

plugins special

Github event flavors.

event_sources special

github_webhook_event_source

Implementation of the github webhook event source.

Commit (BaseModel) pydantic-model

Github Event.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Commit(BaseModel):
    """Github Event."""

    id: str
    message: str
    url: str
    author: User
GithubEvent (BaseEvent) pydantic-model

Push Event from Github.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubEvent(BaseEvent):
    """Push Event from Github."""

    ref: str
    before: str
    after: str
    repository: Repository
    commits: List[Commit]
    head_commit: Optional[Commit]
    tags: Optional[List[Tag]]
    pull_requests: Optional[List[PullRequest]]

    class Config:
        """Pydantic configuration class."""

        extra = Extra.allow

    @property
    def branch(self) -> Optional[str]:
        """The branch the event happened on.

        Returns:
            The branch name.
        """
        if self.ref.startswith("refs/heads/"):
            return "/".join(self.ref.split("/")[2:])
        return None

    @property
    def event_type(self) -> Union[GithubEventType, str]:
        """The type of github event.

        Args:
            The type of the event based on github specific fields.

        Returns:
            The type of the event.
        """
        if self.ref.startswith("refs/heads/"):
            return GithubEventType.PUSH_EVENT
        elif self.ref.startswith("refs/tags/"):
            return GithubEventType.TAG_EVENT
        elif self.pull_requests and len(self.pull_requests) > 0:
            return GithubEventType.PR_EVENT
        else:
            return "unknown"
branch: Optional[str] property readonly

The branch the event happened on.

Returns:

Type Description
Optional[str]

The branch name.

event_type: Union[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubEventType, str] property readonly

The type of github event.

Returns:

Type Description
Union[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubEventType, str]

The type of the event.

Config

Pydantic configuration class.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Config:
    """Pydantic configuration class."""

    extra = Extra.allow
GithubEventType (StrEnum)

Collection of all possible Github Events.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubEventType(StrEnum):
    """Collection of all possible Github Events."""

    PUSH_EVENT = "push_event"
    TAG_EVENT = "tag_event"
    PR_EVENT = "pull_request_event"
GithubWebhookEventFilterConfiguration (WebhookEventFilterConfig) pydantic-model

Configuration for github event filters.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventFilterConfiguration(WebhookEventFilterConfig):
    """Configuration for github event filters."""

    repo: Optional[str]
    branch: Optional[str]
    event_type: Optional[GithubEventType]

    def event_matches_filter(self, event: BaseEvent) -> bool:
        """Checks the filter against the inbound event.

        Args:
            event: The incoming event

        Returns:
            Whether the event matches the filter
        """
        if not isinstance(event, GithubEvent):
            return False
        if self.event_type and event.event_type != self.event_type:
            # Mismatch for the action
            return False
        if self.repo and event.repository.full_name != self.repo:
            # Mismatch for the repository
            return False
        if self.branch and event.branch != self.branch:
            # Mismatch for the branch
            return False
        return True
event_matches_filter(self, event)

Checks the filter against the inbound event.

Parameters:

Name Type Description Default
event BaseEvent

The incoming event

required

Returns:

Type Description
bool

Whether the event matches the filter

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
def event_matches_filter(self, event: BaseEvent) -> bool:
    """Checks the filter against the inbound event.

    Args:
        event: The incoming event

    Returns:
        Whether the event matches the filter
    """
    if not isinstance(event, GithubEvent):
        return False
    if self.event_type and event.event_type != self.event_type:
        # Mismatch for the action
        return False
    if self.repo and event.repository.full_name != self.repo:
        # Mismatch for the repository
        return False
    if self.branch and event.branch != self.branch:
        # Mismatch for the branch
        return False
    return True
GithubWebhookEventSourceConfiguration (WebhookEventSourceConfig) pydantic-model

Configuration for github source filters.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventSourceConfiguration(WebhookEventSourceConfig):
    """Configuration for github source filters."""

    webhook_secret: Optional[str] = Field(
        default=None,
        title="The webhook secret for the event source.",
    )
    webhook_secret_id: Optional[UUID] = Field(
        default=None,
        description="The ID of the secret containing the webhook secret.",
    )
    rotate_secret: Optional[bool] = Field(
        default=None, description="Set to rotate the webhook secret."
    )
rotate_secret: bool pydantic-field

Set to rotate the webhook secret.

webhook_secret_id: UUID pydantic-field

The ID of the secret containing the webhook secret.

GithubWebhookEventSourceHandler (BaseWebhookEventSourceHandler)

Handler for all github events.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class GithubWebhookEventSourceHandler(BaseWebhookEventSourceHandler):
    """Handler for all github events."""

    @property
    def config_class(self) -> Type[GithubWebhookEventSourceConfiguration]:
        """Returns the webhook event source configuration class.

        Returns:
            The configuration.
        """
        return GithubWebhookEventSourceConfiguration

    @property
    def filter_class(self) -> Type[GithubWebhookEventFilterConfiguration]:
        """Returns the webhook event filter configuration class.

        Returns:
            The event filter configuration class.
        """
        return GithubWebhookEventFilterConfiguration

    @property
    def flavor_class(self) -> Type[BaseWebhookEventSourceFlavor]:
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """
        from zenml.integrations.github.plugins.github_webhook_event_source_flavor import (
            GithubWebhookEventSourceFlavor,
        )

        return GithubWebhookEventSourceFlavor

    def _interpret_event(self, event: Dict[str, Any]) -> GithubEvent:
        """Converts the generic event body into a event-source specific pydantic model.

        Args:
            event: The generic event body

        Returns:
            An instance of the event source specific pydantic model.

        Raises:
            ValueError: If the event body can not be parsed into the pydantic model.
        """
        try:
            github_event = GithubEvent(**event)
        except ValueError:
            raise ValueError("Event did not match the pydantic model.")
        else:
            return github_event

    def _load_payload(
        self, raw_body: bytes, headers: Dict[str, str]
    ) -> Dict[str, Any]:
        """Converts the raw body of the request into a python dictionary.

        For github webhooks users can optionally choose to urlencode the
        messages. The body will look something like this:
        b'payload=%7B%22...%7D%7D'. In this case the header will contain the
        following field {'content-type': 'application/x-www-form-urlencoded'}.

        Args:
            raw_body: The raw event body.
            headers: The request headers.

        Returns:
            An instance of the event source specific pydantic model.
        """
        content_type = headers.get("content-type", "")
        if content_type == "application/x-www-form-urlencoded":
            string_body = urllib.parse.unquote_plus(raw_body.decode())
            # Body looks like this: "payload={}", removing the prefix
            raw_body = string_body[8:].encode()

        return super()._load_payload(raw_body=raw_body, headers=headers)

    def _get_webhook_secret(
        self, event_source: EventSourceResponse
    ) -> Optional[str]:
        """Get the webhook secret for the event source.

        Args:
            event_source: The event source to retrieve the secret for.

        Returns:
            The webhook secret associated with the event source, or None if a
            secret is not applicable.

        Raises:
            AuthorizationException: If the secret value could not be retrieved.
        """
        # Temporary solution to get the secret value for the Event Source
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        webhook_secret_id = config.webhook_secret_id
        if webhook_secret_id is None:
            raise AuthorizationException(
                f"Webhook secret ID is missing from the event source "
                f"configuration for event source '{event_source.id}'."
            )
        try:
            return self.zen_store.get_secret(
                secret_id=webhook_secret_id
            ).secret_values["webhook_secret"]
        except KeyError:
            logger.exception(
                f"Could not retrieve secret value for webhook secret id "
                f"'{webhook_secret_id}'"
            )
            raise AuthorizationException(
                "Could not retrieve webhook signature."
            )

    def _validate_event_source_request(
        self, event_source: EventSourceRequest, config: EventSourceConfig
    ) -> None:
        """Validate an event source request before it is created in the database.

        The `webhook_secret`, `webhook_secret_id`, and `rotate_secret`
        fields are not allowed in the request.

        Args:
            event_source: Event source request.
            config: Event source configuration instantiated from the request.

        Raises:
            ValueError: If any of the disallowed fields are present in the
                request.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        for field in ["webhook_secret", "webhook_secret_id", "rotate_secret"]:
            if getattr(config, field) is not None:
                raise ValueError(
                    f"The `{field}` field is not allowed in the event source "
                    "request."
                )

    def _process_event_source_request(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source request after it is created in the database.

        Generates a webhook secret and stores it in a secret in the database,
        then attaches the secret ID to the event source configuration.

        Args:
            event_source: Newly created event source
            config: Event source configuration instantiated from the response.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert (
            event_source.user is not None
        ), "User is not set for event source"

        secret_key_value = random_str(12)
        webhook_secret = SecretRequest(
            name=f"event_source-{str(event_source.id)}-{random_str(4)}".lower(),
            values={"webhook_secret": secret_key_value},
            workspace=event_source.workspace.id,
            user=event_source.user.id,
            scope=SecretScope.WORKSPACE,
        )
        secret = self.zen_store.create_secret(webhook_secret)

        # Store the secret ID in the event source configuration in the database
        event_source_update = EventSourceUpdate.from_response(event_source)
        assert event_source_update.configuration is not None
        event_source_update.configuration["webhook_secret_id"] = str(secret.id)

        self.zen_store.update_event_source(
            event_source_id=event_source.id,
            event_source_update=event_source_update,
        )

        # Set the webhook secret in the configuration returned to the user
        config.webhook_secret = secret_key_value
        # Remove hidden field from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _validate_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        event_source_update: EventSourceUpdate,
        config_update: EventSourceConfig,
    ) -> None:
        """Validate an event source update before it is reflected in the database.

        Ensure the webhook secret ID is preserved in the updated event source
        configuration.

        Args:
            event_source: Original event source before the update.
            config: Event source configuration instantiated from the original
                event source.
            event_source_update: Event source update request.
            config_update: Event source configuration instantiated from the
                updated event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert isinstance(config_update, GithubWebhookEventSourceConfiguration)

        config_update.webhook_secret_id = config.webhook_secret_id

    def _process_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        previous_event_source: EventSourceResponse,
        previous_config: EventSourceConfig,
    ) -> None:
        """Process an event source after it is updated in the database.

        If the `rotate_secret` field is set to `True`, the webhook secret is
        rotated and the new secret ID is attached to the event source
        configuration.

        Args:
            event_source: Event source after the update.
            config: Event source configuration instantiated from the updated
                event source.
            previous_event_source: Original event source before the update.
            previous_config: Event source configuration instantiated from the
                original event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert isinstance(
            previous_config, GithubWebhookEventSourceConfiguration
        )
        assert config.webhook_secret_id is not None

        if config.rotate_secret:
            # In case the secret is being rotated
            secret_key_value = random_str(12)
            webhook_secret = SecretUpdate(  # type: ignore[call-arg]
                values={"webhook_secret": secret_key_value}
            )
            self.zen_store.update_secret(
                secret_id=config.webhook_secret_id,
                secret_update=webhook_secret,
            )

            # Remove the `rotate_secret` field from the configuration stored
            # in the database
            event_source_update = EventSourceUpdate.from_response(event_source)
            assert event_source_update.configuration is not None
            event_source_update.configuration.pop("rotate_secret")
            self.zen_store.update_event_source(
                event_source_id=event_source.id,
                event_source_update=event_source_update,
            )

            # Set the new secret in the configuration returned to the user
            config.webhook_secret = secret_key_value

        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _process_event_source_delete(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        force: Optional[bool] = False,
    ) -> None:
        """Process an event source before it is deleted from the database.

        Deletes the associated secret from the database.

        Args:
            event_source: Event source before the deletion.
            config: Validated instantiated event source configuration before
                the deletion.
            force: Whether to force deprovision the event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        if config.webhook_secret_id is not None:
            try:
                self.zen_store.delete_secret(
                    secret_id=config.webhook_secret_id
                )
            except KeyError:
                pass

        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _process_event_source_response(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source response before it is returned to the user.

        Removes hidden fields from the configuration.

        Args:
            event_source: Event source response.
            config: Event source configuration instantiated from the response.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None
        config.webhook_secret = None
config_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration] property readonly

Returns the webhook event source configuration class.

Returns:

Type Description
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration]

The configuration.

filter_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration] property readonly

Returns the webhook event filter configuration class.

Returns:

Type Description
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration]

The event filter configuration class.

flavor_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor]

The flavor class of the plugin.

PullRequest (BaseModel) pydantic-model

Github Pull Request.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class PullRequest(BaseModel):
    """Github Pull Request."""

    id: str
    number: int
    title: str
    author: User
    merged: bool
    merge_commit: Commit
Repository (BaseModel) pydantic-model

Github Repository.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Repository(BaseModel):
    """Github Repository."""

    id: int
    name: str
    full_name: str
    html_url: str
Tag (BaseModel) pydantic-model

Github Tag.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class Tag(BaseModel):
    """Github Tag."""

    id: str
    name: str
    commit: Commit
User (BaseModel) pydantic-model

Github User.

Source code in zenml/integrations/github/plugins/event_sources/github_webhook_event_source.py
class User(BaseModel):
    """Github User."""

    name: str
    email: str
    username: str

github_webhook_event_source_flavor

Example file of what an event Plugin could look like.

GithubWebhookEventSourceFlavor (BaseWebhookEventSourceFlavor)

Enables users to configure github event sources.

Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceFlavor(BaseWebhookEventSourceFlavor):
    """Enables users to configure github event sources."""

    FLAVOR: ClassVar[str] = GITHUB_EVENT_FLAVOR
    PLUGIN_CLASS: ClassVar[Type[GithubWebhookEventSourceHandler]] = (
        GithubWebhookEventSourceHandler
    )

    # EventPlugin specific
    EVENT_SOURCE_CONFIG_CLASS: ClassVar[
        Type[GithubWebhookEventSourceConfiguration]
    ] = GithubWebhookEventSourceConfiguration
    EVENT_FILTER_CONFIG_CLASS: ClassVar[
        Type[GithubWebhookEventFilterConfiguration]
    ] = GithubWebhookEventFilterConfiguration
EVENT_FILTER_CONFIG_CLASS (WebhookEventFilterConfig) pydantic-model

Configuration for github event filters.

Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventFilterConfiguration(WebhookEventFilterConfig):
    """Configuration for github event filters."""

    repo: Optional[str]
    branch: Optional[str]
    event_type: Optional[GithubEventType]

    def event_matches_filter(self, event: BaseEvent) -> bool:
        """Checks the filter against the inbound event.

        Args:
            event: The incoming event

        Returns:
            Whether the event matches the filter
        """
        if not isinstance(event, GithubEvent):
            return False
        if self.event_type and event.event_type != self.event_type:
            # Mismatch for the action
            return False
        if self.repo and event.repository.full_name != self.repo:
            # Mismatch for the repository
            return False
        if self.branch and event.branch != self.branch:
            # Mismatch for the branch
            return False
        return True
event_matches_filter(self, event)

Checks the filter against the inbound event.

Parameters:

Name Type Description Default
event BaseEvent

The incoming event

required

Returns:

Type Description
bool

Whether the event matches the filter

Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
def event_matches_filter(self, event: BaseEvent) -> bool:
    """Checks the filter against the inbound event.

    Args:
        event: The incoming event

    Returns:
        Whether the event matches the filter
    """
    if not isinstance(event, GithubEvent):
        return False
    if self.event_type and event.event_type != self.event_type:
        # Mismatch for the action
        return False
    if self.repo and event.repository.full_name != self.repo:
        # Mismatch for the repository
        return False
    if self.branch and event.branch != self.branch:
        # Mismatch for the branch
        return False
    return True
EVENT_SOURCE_CONFIG_CLASS (WebhookEventSourceConfig) pydantic-model

Configuration for github source filters.

Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceConfiguration(WebhookEventSourceConfig):
    """Configuration for github source filters."""

    webhook_secret: Optional[str] = Field(
        default=None,
        title="The webhook secret for the event source.",
    )
    webhook_secret_id: Optional[UUID] = Field(
        default=None,
        description="The ID of the secret containing the webhook secret.",
    )
    rotate_secret: Optional[bool] = Field(
        default=None, description="Set to rotate the webhook secret."
    )
rotate_secret: bool pydantic-field

Set to rotate the webhook secret.

webhook_secret_id: UUID pydantic-field

The ID of the secret containing the webhook secret.

PLUGIN_CLASS (BaseWebhookEventSourceHandler)

Handler for all github events.

Source code in zenml/integrations/github/plugins/github_webhook_event_source_flavor.py
class GithubWebhookEventSourceHandler(BaseWebhookEventSourceHandler):
    """Handler for all github events."""

    @property
    def config_class(self) -> Type[GithubWebhookEventSourceConfiguration]:
        """Returns the webhook event source configuration class.

        Returns:
            The configuration.
        """
        return GithubWebhookEventSourceConfiguration

    @property
    def filter_class(self) -> Type[GithubWebhookEventFilterConfiguration]:
        """Returns the webhook event filter configuration class.

        Returns:
            The event filter configuration class.
        """
        return GithubWebhookEventFilterConfiguration

    @property
    def flavor_class(self) -> Type[BaseWebhookEventSourceFlavor]:
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """
        from zenml.integrations.github.plugins.github_webhook_event_source_flavor import (
            GithubWebhookEventSourceFlavor,
        )

        return GithubWebhookEventSourceFlavor

    def _interpret_event(self, event: Dict[str, Any]) -> GithubEvent:
        """Converts the generic event body into a event-source specific pydantic model.

        Args:
            event: The generic event body

        Returns:
            An instance of the event source specific pydantic model.

        Raises:
            ValueError: If the event body can not be parsed into the pydantic model.
        """
        try:
            github_event = GithubEvent(**event)
        except ValueError:
            raise ValueError("Event did not match the pydantic model.")
        else:
            return github_event

    def _load_payload(
        self, raw_body: bytes, headers: Dict[str, str]
    ) -> Dict[str, Any]:
        """Converts the raw body of the request into a python dictionary.

        For github webhooks users can optionally choose to urlencode the
        messages. The body will look something like this:
        b'payload=%7B%22...%7D%7D'. In this case the header will contain the
        following field {'content-type': 'application/x-www-form-urlencoded'}.

        Args:
            raw_body: The raw event body.
            headers: The request headers.

        Returns:
            An instance of the event source specific pydantic model.
        """
        content_type = headers.get("content-type", "")
        if content_type == "application/x-www-form-urlencoded":
            string_body = urllib.parse.unquote_plus(raw_body.decode())
            # Body looks like this: "payload={}", removing the prefix
            raw_body = string_body[8:].encode()

        return super()._load_payload(raw_body=raw_body, headers=headers)

    def _get_webhook_secret(
        self, event_source: EventSourceResponse
    ) -> Optional[str]:
        """Get the webhook secret for the event source.

        Args:
            event_source: The event source to retrieve the secret for.

        Returns:
            The webhook secret associated with the event source, or None if a
            secret is not applicable.

        Raises:
            AuthorizationException: If the secret value could not be retrieved.
        """
        # Temporary solution to get the secret value for the Event Source
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        webhook_secret_id = config.webhook_secret_id
        if webhook_secret_id is None:
            raise AuthorizationException(
                f"Webhook secret ID is missing from the event source "
                f"configuration for event source '{event_source.id}'."
            )
        try:
            return self.zen_store.get_secret(
                secret_id=webhook_secret_id
            ).secret_values["webhook_secret"]
        except KeyError:
            logger.exception(
                f"Could not retrieve secret value for webhook secret id "
                f"'{webhook_secret_id}'"
            )
            raise AuthorizationException(
                "Could not retrieve webhook signature."
            )

    def _validate_event_source_request(
        self, event_source: EventSourceRequest, config: EventSourceConfig
    ) -> None:
        """Validate an event source request before it is created in the database.

        The `webhook_secret`, `webhook_secret_id`, and `rotate_secret`
        fields are not allowed in the request.

        Args:
            event_source: Event source request.
            config: Event source configuration instantiated from the request.

        Raises:
            ValueError: If any of the disallowed fields are present in the
                request.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        for field in ["webhook_secret", "webhook_secret_id", "rotate_secret"]:
            if getattr(config, field) is not None:
                raise ValueError(
                    f"The `{field}` field is not allowed in the event source "
                    "request."
                )

    def _process_event_source_request(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source request after it is created in the database.

        Generates a webhook secret and stores it in a secret in the database,
        then attaches the secret ID to the event source configuration.

        Args:
            event_source: Newly created event source
            config: Event source configuration instantiated from the response.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert (
            event_source.user is not None
        ), "User is not set for event source"

        secret_key_value = random_str(12)
        webhook_secret = SecretRequest(
            name=f"event_source-{str(event_source.id)}-{random_str(4)}".lower(),
            values={"webhook_secret": secret_key_value},
            workspace=event_source.workspace.id,
            user=event_source.user.id,
            scope=SecretScope.WORKSPACE,
        )
        secret = self.zen_store.create_secret(webhook_secret)

        # Store the secret ID in the event source configuration in the database
        event_source_update = EventSourceUpdate.from_response(event_source)
        assert event_source_update.configuration is not None
        event_source_update.configuration["webhook_secret_id"] = str(secret.id)

        self.zen_store.update_event_source(
            event_source_id=event_source.id,
            event_source_update=event_source_update,
        )

        # Set the webhook secret in the configuration returned to the user
        config.webhook_secret = secret_key_value
        # Remove hidden field from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _validate_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        event_source_update: EventSourceUpdate,
        config_update: EventSourceConfig,
    ) -> None:
        """Validate an event source update before it is reflected in the database.

        Ensure the webhook secret ID is preserved in the updated event source
        configuration.

        Args:
            event_source: Original event source before the update.
            config: Event source configuration instantiated from the original
                event source.
            event_source_update: Event source update request.
            config_update: Event source configuration instantiated from the
                updated event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert isinstance(config_update, GithubWebhookEventSourceConfiguration)

        config_update.webhook_secret_id = config.webhook_secret_id

    def _process_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        previous_event_source: EventSourceResponse,
        previous_config: EventSourceConfig,
    ) -> None:
        """Process an event source after it is updated in the database.

        If the `rotate_secret` field is set to `True`, the webhook secret is
        rotated and the new secret ID is attached to the event source
        configuration.

        Args:
            event_source: Event source after the update.
            config: Event source configuration instantiated from the updated
                event source.
            previous_event_source: Original event source before the update.
            previous_config: Event source configuration instantiated from the
                original event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        assert isinstance(
            previous_config, GithubWebhookEventSourceConfiguration
        )
        assert config.webhook_secret_id is not None

        if config.rotate_secret:
            # In case the secret is being rotated
            secret_key_value = random_str(12)
            webhook_secret = SecretUpdate(  # type: ignore[call-arg]
                values={"webhook_secret": secret_key_value}
            )
            self.zen_store.update_secret(
                secret_id=config.webhook_secret_id,
                secret_update=webhook_secret,
            )

            # Remove the `rotate_secret` field from the configuration stored
            # in the database
            event_source_update = EventSourceUpdate.from_response(event_source)
            assert event_source_update.configuration is not None
            event_source_update.configuration.pop("rotate_secret")
            self.zen_store.update_event_source(
                event_source_id=event_source.id,
                event_source_update=event_source_update,
            )

            # Set the new secret in the configuration returned to the user
            config.webhook_secret = secret_key_value

        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _process_event_source_delete(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        force: Optional[bool] = False,
    ) -> None:
        """Process an event source before it is deleted from the database.

        Deletes the associated secret from the database.

        Args:
            event_source: Event source before the deletion.
            config: Validated instantiated event source configuration before
                the deletion.
            force: Whether to force deprovision the event source.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        if config.webhook_secret_id is not None:
            try:
                self.zen_store.delete_secret(
                    secret_id=config.webhook_secret_id
                )
            except KeyError:
                pass

        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None

    def _process_event_source_response(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source response before it is returned to the user.

        Removes hidden fields from the configuration.

        Args:
            event_source: Event source response.
            config: Event source configuration instantiated from the response.
        """
        assert isinstance(config, GithubWebhookEventSourceConfiguration)
        # Remove hidden fields from the response
        config.rotate_secret = None
        config.webhook_secret_id = None
        config.webhook_secret = None
config_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration] property readonly

Returns the webhook event source configuration class.

Returns:

Type Description
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventSourceConfiguration]

The configuration.

filter_class: Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration] property readonly

Returns the webhook event filter configuration class.

Returns:

Type Description
Type[zenml.integrations.github.plugins.event_sources.github_webhook_event_source.GithubWebhookEventFilterConfiguration]

The event filter configuration class.

flavor_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[zenml.event_sources.webhooks.base_webhook_event_source.BaseWebhookEventSourceFlavor]

The flavor class of the plugin.