Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ansible / community / okd / plugins / module_utils / openshift_import_image.py
Size: Mime:
#!/usr/bin/env python

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import traceback
import copy

from ansible.module_utils._text import to_native
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import string_types

try:
    from ansible_collections.kubernetes.core.plugins.module_utils.common import (
        K8sAnsibleMixin,
        get_api_client,
    )
    HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
    HAS_KUBERNETES_COLLECTION = False
    k8s_collection_import_exception = e
    K8S_COLLECTION_ERROR = traceback.format_exc()

try:
    from kubernetes.dynamic.exceptions import DynamicApiError
except ImportError:
    pass

from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
    parse_docker_image_ref,
)

err_stream_not_found_ref = "NotFound reference"


def follow_imagestream_tag_reference(stream, tag):
    multiple = False

    def _imagestream_has_tag():
        for ref in stream["spec"].get("tags", []):
            if ref["name"] == tag:
                return ref
        return None

    def _imagestream_split_tag(name):
        parts = name.split(":")
        name = parts[0]
        tag = ""
        if len(parts) > 1:
            tag = parts[1]
        if len(tag) == 0:
            tag = "latest"
        return name, tag, len(parts) == 2

    content = []
    err_cross_stream_ref = "tag %s points to an imagestreamtag from another ImageStream" % tag
    while True:
        if tag in content:
            return tag, None, multiple, "tag %s on the image stream is a reference to same tag" % tag
        content.append(tag)
        tag_ref = _imagestream_has_tag()
        if not tag_ref:
            return None, None, multiple, err_stream_not_found_ref

        if not tag_ref.get("from") or tag_ref["from"]["kind"] != "ImageStreamTag":
            return tag, tag_ref, multiple, None

        if tag_ref["from"]["namespace"] != "" and tag_ref["from"]["namespace"] != stream["metadata"]["namespace"]:
            return tag, None, multiple, err_cross_stream_ref

        # The reference needs to be followed with two format patterns:
        # a) sameis:sometag and b) sometag
        if ":" in tag_ref["from"]["name"]:
            name, tagref, result = _imagestream_split_tag(tag_ref["from"]["name"])
            if not result:
                return tag, None, multiple, "tag %s points to an invalid imagestreamtag" % tag
            if name != stream["metadata"]["namespace"]:
                # anotheris:sometag - this should not happen.
                return tag, None, multiple, err_cross_stream_ref
            # sameis:sometag - follow the reference as sometag
            tag = tagref
        else:
            tag = tag_ref["from"]["name"]
        multiple = True


class OpenShiftImportImage(K8sAnsibleMixin):
    def __init__(self, module):
        self.module = module
        self.fail_json = self.module.fail_json
        self.exit_json = self.module.exit_json

        if not HAS_KUBERNETES_COLLECTION:
            self.fail_json(
                msg="The kubernetes.core collection must be installed",
                exception=K8S_COLLECTION_ERROR,
                error=to_native(k8s_collection_import_exception),
            )

        super(OpenShiftImportImage, self).__init__(self.module)

        self.params = self.module.params
        self.check_mode = self.module.check_mode
        self.client = get_api_client(self.module)

        self._rest_client = None
        self.registryhost = self.params.get('registry_url')
        self.changed = False

        ref_policy = self.params.get("reference_policy")
        ref_policy_type = None
        if ref_policy == "source":
            ref_policy_type = "Source"
        elif ref_policy == "local":
            ref_policy_type = "Local"

        self.ref_policy = {
            "type": ref_policy_type
        }

        self.validate_certs = self.params.get("validate_registry_certs")
        self.cluster_resources = {}

    def create_image_stream_import(self, stream):
        isi = {
            "apiVersion": "image.openshift.io/v1",
            "kind": "ImageStreamImport",
            "metadata": {
                "name": stream["metadata"]["name"],
                "namespace": stream["metadata"]["namespace"],
                "resourceVersion": stream["metadata"].get("resourceVersion")
            },
            "spec": {
                "import": True
            }
        }

        annotations = stream.get("annotations", {})
        insecure = boolean(annotations.get("openshift.io/image.insecureRepository", True))
        if self.validate_certs is not None:
            insecure = not self.validate_certs
        return isi, insecure

    def create_image_stream_import_all(self, stream, source):
        isi, insecure = self.create_image_stream_import(stream)
        isi["spec"]["repository"] = {
            "from": {
                "kind": "DockerImage",
                "name": source,
            },
            "importPolicy": {
                "insecure": insecure,
                "scheduled": self.params.get("scheduled")
            },
            "referencePolicy": self.ref_policy,
        }
        return isi

    def create_image_stream_import_tags(self, stream, tags):
        isi, streamInsecure = self.create_image_stream_import(stream)
        for k in tags:
            insecure = streamInsecure
            scheduled = self.params.get("scheduled")

            old_tag = None
            for t in stream.get("spec", {}).get("tags", []):
                if t["name"] == k:
                    old_tag = t
                    break

            if old_tag:
                insecure = insecure or old_tag["importPolicy"].get("insecure")
                scheduled = scheduled or old_tag["importPolicy"].get("scheduled")

            images = isi["spec"].get("images", [])
            images.append({
                "from": {
                    "kind": "DockerImage",
                    "name": tags.get(k),
                },
                "to": {
                    "name": k
                },
                "importPolicy": {
                    "insecure": insecure,
                    "scheduled": scheduled
                },
                "referencePolicy": self.ref_policy,
            })
            isi["spec"]["images"] = images
        return isi

    def create_image_stream(self, ref):
        """
            Create new ImageStream and accompanying ImageStreamImport
        """
        source = self.params.get("source")
        if not source:
            source = ref["source"]

        stream = dict(
            apiVersion="image.openshift.io/v1",
            kind="ImageStream",
            metadata=dict(
                name=ref["name"],
                namespace=self.params.get("namespace"),
            ),
        )
        if self.params.get("all") and not ref["tag"]:
            spec = dict(
                dockerImageRepository=source
            )
            isi = self.create_image_stream_import_all(stream, source)
        else:
            spec = dict(
                tags=[
                    {
                        "from": {
                            "kind": "DockerImage",
                            "name": source
                        },
                        "referencePolicy": self.ref_policy
                    }
                ]
            )
            tags = {ref["tag"]: source}
            isi = self.create_image_stream_import_tags(stream, tags)
        stream.update(
            dict(spec=spec)
        )
        return stream, isi

    def import_all(self, istream):
        stream = copy.deepcopy(istream)
        # Update ImageStream appropriately
        source = self.params.get("source")
        docker_image_repo = stream["spec"].get("dockerImageRepository")
        if not source:
            if docker_image_repo:
                source = docker_image_repo
            else:
                tags = {}
                for t in stream["spec"].get("tags", []):
                    if t.get("from") and t["from"].get("kind") == "DockerImage":
                        tags[t.get("name")] = t["from"].get("name")
                if tags == {}:
                    msg = "image stream %s/%s does not have tags pointing to external container images" % (
                        stream["metadata"]["namespace"], stream["metadata"]["name"]
                    )
                    self.fail_json(msg=msg)
                isi = self.create_image_stream_import_tags(stream, tags)
                return stream, isi

        if source != docker_image_repo:
            stream["spec"]["dockerImageRepository"] = source
        isi = self.create_image_stream_import_all(stream, source)
        return stream, isi

    def import_tag(self, stream, tag):
        source = self.params.get("source")

        # Follow any referential tags to the destination
        final_tag, existing, multiple, err = follow_imagestream_tag_reference(stream, tag)
        if err:
            if err == err_stream_not_found_ref:
                # Create a new tag
                if not source and tag == "latest":
                    source = stream["spec"].get("dockerImageRepository")
                # if the from is still empty this means there's no such tag defined
                # nor we can't create any from .spec.dockerImageRepository
                if not source:
                    msg = "the tag %s does not exist on the image stream - choose an existing tag to import" % tag
                    self.fail_json(msg=msg)
                existing = {
                    "from": {
                        "kind": "DockerImage",
                        "name": source,
                    }
                }
            else:
                self.fail_json(msg=err)
        else:
            # Disallow re-importing anything other than DockerImage
            if existing.get("from", {}) and existing["from"].get("kind") != "DockerImage":
                msg = "tag {tag} points to existing {kind}/={name}, it cannot be re-imported.".format(
                    tag=tag, kind=existing["from"]["kind"], name=existing["from"]["name"]
                )
            # disallow changing an existing tag
            if not existing.get("from", {}):
                msg = "tag %s already exists - you cannot change the source using this module." % tag
                self.fail_json(msg=msg)
            if source and source != existing["from"]["name"]:
                if multiple:
                    msg = "the tag {0} points to the tag {1} which points to {2} you cannot change the source using this module".format(
                        tag, final_tag, existing["from"]["name"]
                    )
                else:
                    msg = "the tag %s points to %s you cannot change the source using this module." % (tag, final_tag)
                self.fail_json(msg=msg)

            # Set the target item to import
            source = existing["from"].get("name")
            if multiple:
                tag = final_tag

            # Clear the legacy annotation
            tag_to_delete = "openshift.io/image.dockerRepositoryCheck"
            if existing["annotations"] and tag_to_delete in existing["annotations"]:
                del existing["annotations"][tag_to_delete]

            # Reset the generation
            existing["generation"] = 0

        new_stream = copy.deepcopy(stream)
        new_stream["spec"]["tags"] = []
        for t in stream["spec"]["tags"]:
            if t["name"] == tag:
                new_stream["spec"]["tags"].append(existing)
            else:
                new_stream["spec"]["tags"].append(t)

        # Create accompanying ImageStreamImport
        tags = {tag: source}
        isi = self.create_image_stream_import_tags(new_stream, tags)
        return new_stream, isi

    def create_image_import(self, ref):
        kind = "ImageStream"
        api_version = "image.openshift.io/v1"

        # Find existing Image Stream
        params = dict(
            kind=kind,
            api_version=api_version,
            name=ref.get("name"),
            namespace=self.params.get("namespace")
        )
        result = self.kubernetes_facts(**params)
        if not result["api_found"]:
            msg = 'Failed to find API for resource with apiVersion "{0}" and kind "{1}"'.format(
                api_version, kind
            ),
            self.fail_json(msg=msg)
        imagestream = None
        if len(result["resources"]) > 0:
            imagestream = result["resources"][0]

        stream, isi = None, None
        if not imagestream:
            stream, isi = self.create_image_stream(ref)
        elif self.params.get("all") and not ref["tag"]:
            # importing the entire repository
            stream, isi = self.import_all(imagestream)
        else:
            # importing a single tag
            stream, isi = self.import_tag(imagestream, ref["tag"])
        return isi

    def parse_image_reference(self, image_ref):
        result, err = parse_docker_image_ref(image_ref, self.module)
        if result.get("digest"):
            self.fail_json(msg="Cannot import by ID, error with definition: %s" % image_ref)
        tag = result.get("tag") or None
        if not self.params.get("all") and not tag:
            tag = "latest"
        source = self.params.get("source")
        if not source:
            source = image_ref
        return dict(name=result.get("name"), tag=tag, source=image_ref)

    def execute_module(self):

        names = []
        name = self.params.get("name")
        if isinstance(name, string_types):
            names.append(name)
        elif isinstance(name, list):
            names = name
        else:
            self.fail_json(msg="Parameter name should be provided as list or string.")

        images_refs = [self.parse_image_reference(x) for x in names]
        images_imports = []
        for ref in images_refs:
            isi = self.create_image_import(ref)
            images_imports.append(isi)

        # Create image import
        kind = "ImageStreamImport"
        api_version = "image.openshift.io/v1"
        namespace = self.params.get("namespace")
        try:
            resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
            result = []
            for isi in images_imports:
                if not self.check_mode:
                    isi = resource.create(isi, namespace=namespace).to_dict()
                result.append(isi)
            self.exit_json(changed=True, result=result)
        except DynamicApiError as exc:
            msg = "Failed to create object {kind}/{namespace}/{name} due to: {error}".format(
                kind=kind, namespace=namespace, name=isi["metadata"]["name"], error=exc
            )
            self.fail_json(
                msg=msg,
                error=exc.status,
                status=exc.status,
                reason=exc.reason,
            )
        except Exception as exc:
            msg = "Failed to create object {kind}/{namespace}/{name} due to: {error}".format(
                kind=kind, namespace=namespace, name=isi["metadata"]["name"], error=exc
            )
            self.fail_json(msg=msg)