Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ansible / community / okd / plugins / module_utils / openshift_registry.py
Size: Mime:
#!/usr/bin/env python

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import traceback
from urllib.parse import urlparse

from ansible.module_utils._text import to_native

try:
    from ansible_collections.kubernetes.core.plugins.module_utils.common import (
        K8sAnsibleMixin,
        get_api_client,
    )
    HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
    HAS_KUBERNETES_COLLECTION = False
    k8s_collection_import_exception = e
    K8S_COLLECTION_ERROR = traceback.format_exc()

from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
    parse_docker_image_ref,
)

try:
    from requests import request
    from requests.auth import HTTPBasicAuth
    HAS_REQUESTS_MODULE = True
except ImportError as e:
    HAS_REQUESTS_MODULE = False
    requests_import_exception = e
    REQUESTS_MODULE_ERROR = traceback.format_exc()


class OpenShiftRegistry(K8sAnsibleMixin):
    def __init__(self, module):
        self.module = module
        self.fail_json = self.module.fail_json
        self.exit_json = self.module.exit_json

        if not HAS_KUBERNETES_COLLECTION:
            self.fail_json(
                msg="The kubernetes.core collection must be installed",
                exception=K8S_COLLECTION_ERROR,
                error=to_native(k8s_collection_import_exception),
            )

        super(OpenShiftRegistry, self).__init__(self.module)

        self.params = self.module.params
        self.check_mode = self.module.check_mode
        self.client = get_api_client(self.module)

        self.check = self.params.get("check")

    def list_image_streams(self, namespace=None):
        kind = "ImageStream"
        api_version = "image.openshift.io/v1"

        params = dict(
            kind=kind,
            api_version=api_version,
            namespace=namespace
        )
        result = self.kubernetes_facts(**params)
        imagestream = []
        if len(result["resources"]) > 0:
            imagestream = result["resources"]
        return imagestream

    def find_registry_info(self):

        def _determine_registry(image_stream):
            public, internal = None, None
            docker_repo = image_stream["status"].get("publicDockerImageRepository")
            if docker_repo:
                ref, err = parse_docker_image_ref(docker_repo, self.module)
                public = ref["hostname"]

            docker_repo = image_stream["status"].get("dockerImageRepository")
            if docker_repo:
                ref, err = parse_docker_image_ref(docker_repo, self.module)
                internal = ref["hostname"]
            return internal, public

        # Try to determine registry hosts from Image Stream from 'openshift' namespace
        for stream in self.list_image_streams(namespace="openshift"):
            internal, public = _determine_registry(stream)
            if not public and not internal:
                self.fail_json(msg="The integrated registry has not been configured")
            return internal, public

        # Unable to determine registry from 'openshift' namespace, trying with all namespace
        for stream in self.list_image_streams():
            internal, public = _determine_registry(stream)
            if not public and not internal:
                self.fail_json(msg="The integrated registry has not been configured")
            return internal, public

        self.fail_json(msg="No Image Streams could be located to retrieve registry info.")

    def info(self):
        result = {}
        result["internal_hostname"], result["public_hostname"] = self.find_registry_info()

        if self.check:
            public_registry = result["public_hostname"]
            if not public_registry:
                result["check"] = dict(
                    reached=False,
                    msg="Registry does not have a public hostname."
                )
            else:
                headers = {
                    'Content-Type': 'application/json'
                }
                params = {
                    'method': 'GET',
                    'verify': False
                }
                if self.client.configuration.api_key:
                    headers.update(self.client.configuration.api_key)
                elif self.client.configuration.username and self.client.configuration.password:
                    if not HAS_REQUESTS_MODULE:
                        result["check"] = dict(
                            reached=False,
                            msg="The requests python package is missing, try `pip install requests`",
                            error=requests_import_exception
                        )
                        self.exit_json(**result)
                    params.update(
                        dict(auth=HTTPBasicAuth(self.client.configuration.username, self.client.configuration.password))
                    )

                # verify ssl
                host = urlparse(public_registry)
                if len(host.scheme) == 0:
                    registry_url = "https://" + public_registry

                if registry_url.startswith("https://") and self.client.configuration.ssl_ca_cert:
                    params.update(
                        dict(verify=self.client.configuration.ssl_ca_cert)
                    )
                params.update(
                    dict(headers=headers)
                )
                last_bad_status, last_bad_reason = None, None
                for path in ("/", "/healthz"):
                    params.update(
                        dict(url=registry_url + path)
                    )
                    response = request(**params)
                    if response.status_code == 200:
                        result["check"] = dict(
                            reached=True,
                            msg="The local client can contact the integrated registry."
                        )
                        self.exit_json(**result)
                    last_bad_reason = response.reason
                    last_bad_status = response.status_code

                result["check"] = dict(
                    reached=False,
                    msg="Unable to contact the integrated registry using local client. Status=%d, Reason=%s" % (
                        last_bad_status, last_bad_reason
                    )
                )

        self.exit_json(**result)